开发学院

您的位置:首页>技术文章>正文

技术文章

spring+netty实现一个最小可运行的im server

开发学院2025-08-11 20:57:40
近期想做一个基于springboot+netty的im服务器,各方收集资料后,汇总如下

  近期想做一个基于springboot+netty的im服务器,各方收集资料后,汇总如下。

  需要支持浏览器的WebSocket和来自app端的Socket连接,结构如下:

┌──────────┐         ┌──────────┐
│  Browser │         │  App     │  ← WebSocket / Socket.IO
└────┬─────┘         └────┬─────┘
     │ 1.长连接            │
     │                     │
┌────┴─────────────────────┴────┐
│  Netty WebSocket Gateway      │  ← 2.多协议编解码、心跳、黑白名单
│  (独立进程,无业务逻辑)        │
└────┬─────────────────────┬────┘
     │3. MQ/Redis 事件      │3. MQ/Redis 事件
┌────┴────┐           ┌────┴────┐
│Chat-Svc│           │User-Svc │
│(Spring)│           │(Spring) │
└────┬────┘           └────┬────┘
     │4. MyBatis-Plus      │4. MyBatis-Plus
┌────┴─────────────────────┴────┐
│        MySQL 8.0 / Redis      │
└───────────────────────────────┘


数据库设计(MySQL 8.0)

1 用户表

CREATE TABLE t_user (
  id          BIGINT PRIMARY KEY,
  username    VARCHAR(32) UNIQUE NOT NULL,
  avatar      VARCHAR(255),
  create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);

2 单聊消息表

CREATE TABLE t_msg_single (
  msg_id      BIGINT PRIMARY KEY,
  from_uid    BIGINT NOT NULL,
  to_uid      BIGINT NOT NULL,
  content     TEXT NOT NULL,
  msg_type    TINYINT DEFAULT 1 COMMENT '1=文本 2=图片 3=文件 …',
  send_time   DATETIME DEFAULT CURRENT_TIMESTAMP,
  deliver_tag TINYINT DEFAULT 0 COMMENT '0未送达 1已送达 2已读',
  INDEX idx_from_to (from_uid, to_uid),
  INDEX idx_time (send_time)
);

3 群聊基础表

CREATE TABLE t_group (
  group_id    BIGINT PRIMARY KEY,
  group_name  VARCHAR(64) NOT NULL,
  owner_uid   BIGINT NOT NULL,
  create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE t_group_member (
  id       BIGINT PRIMARY KEY AUTO_INCREMENT,
  group_id BIGINT NOT NULL,
  uid      BIGINT NOT NULL,
  UNIQUE KEY uk_group_user (group_id, uid)
);

4 群聊消息表(只存一份)

CREATE TABLE t_msg_group (
  msg_id      BIGINT PRIMARY KEY,
  group_id    BIGINT NOT NULL,
  from_uid    BIGINT NOT NULL,
  content     TEXT NOT NULL,
  msg_type    TINYINT DEFAULT 1,
  send_time   DATETIME DEFAULT CURRENT_TIMESTAMP,
  INDEX idx_group_time (group_id, send_time)
);

5 离线/未读模型(Kafka 思路)

-- 用户-群离线位点,替代“每人存一份”
CREATE TABLE t_group_offset (
  id       BIGINT PRIMARY KEY AUTO_INCREMENT,
  group_id BIGINT NOT NULL,
  uid      BIGINT NOT NULL,
  last_ack_msg_id BIGINT NOT NULL COMMENT '已读最大msg_id',
  UNIQUE KEY uk_group_user (group_id, uid)
);


Netty + Spring Boot 关键代码

1 Channel 管理与路由

@Component
public class WsChannelManager {
    // uid -> channel
    private static final MapONLINE = new ConcurrentHashMap<>();

    public static void online(Long uid, Channel ch) {
        ONLINE.put(uid, ch);
    }
    public static void offline(Long uid) {
        ONLINE.remove(uid);
    }
    public static Channel get(Long uid) {
        return ONLINE.get(uid);
    }
}


2 消息实体(JSON)

{
  "type": 1,          // 1=单聊 2=群聊 3=心跳…
  "fromUid": 1001,
  "toUid": 1002,      // 群聊时代表 groupId
  "content": "hello",
  "msgId": 9223372036854775807
}


3 Netty 解码器 & 业务 Handler

public class ChatServerHandler extends SimpleChannelInboundHandler{

    @Autowired
    private MsgService msgService;   // Spring 注入

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        MsgDTO dto = JSON.parseObject(frame.text(), MsgDTO.class);
        switch (dto.getType()) {
            case 1 -> handleSingle(ctx, dto);
            case 2 -> handleGroup(ctx, dto);
            default -> ctx.writeAndFlush(new TextWebSocketFrame("bad type"));
        }
    }

    private void handleSingle(ChannelHandlerContext ctx, MsgDTO dto) {
        // 1. 落库
        msgService.saveSingle(dto);
        // 2. 实时推送
        Channel target = WsChannelManager.get(dto.getToUid());
        if (target != null && target.isActive()) {
            target.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(dto)));
        } else {
            // 离线:可写入延迟队列/离线表,后续推送
        }
    }

    private void handleGroup(ChannelHandlerContext ctx, MsgDTO dto) {
        // 1. 落库
        msgService.saveGroup(dto);
        // 2. 广播给所有成员
        Listmembers = groupService.listMemberUid(dto.getToUid());
        members.forEach(uid -> {
            Channel ch = WsChannelManager.get(uid);
            if (ch != null && ch.isActive()) {
                ch.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(dto)));
            }
        });
    }
}


4 Service 层(事务 + 异步)

@Service
public class MsgService {

    @Transactional
    public void saveSingle(MsgDTO dto) {
        MsgSingleEntity e = new MsgSingleEntity();
        e.setMsgId(dto.getMsgId());
        e.setFromUid(dto.getFromUid());
        e.setToUid(dto.getToUid());
        e.setContent(dto.getContent());
        singleMapper.insert(e);
    }

    @Async("msgExecutor")   // Spring异步线程池,避免阻塞IO线程
    public void saveGroup(MsgDTO dto) {
        MsgGroupEntity e = new MsgGroupEntity();
        e.setMsgId(dto.getMsgId());
        e.setGroupId(dto.getToUid());
        e.setFromUid(dto.getFromUid());
        e.setContent(dto.getContent());
        groupMapper.insert(e);
    }
}


4. 离线/未读消息拉取

用户上线时,后端根据 last_ack_msg_id 查询 > 该值的所有群消息,再合并单聊 t_msg_single 中未读,一次性推送给客户端。

public ListpullOffline(Long uid) {
    Listlist = new ArrayList<>();
    // 1. 单聊离线
    list.addAll(singleMapper.selectUnread(uid));
    // 2. 群聊离线
    Listgroups = groupService.listGroupIdsByUid(uid);
    groups.forEach(gid -> {
        long ack = offsetMapper.selectAck(gid, uid);
        list.addAll(groupMapper.selectAfter(gid, ack));
    });
    return list;
}


部署 & 扩展要点

  Netty 网关 单独部署(多实例 + 无状态),通过 Redis 广播跨节点消息。

  业务服务 可按模块拆分(单聊、群聊、文件、好友关系)。

  MySQL 分库分表可按 from_uid / group_id 做 Sharding。

  消息顺序 由客户端根据 msgId(雪花算法)排序,服务端不保证全局顺序。

  消息幂等 客户端发送时携带 msgId,服务端表唯一索引保证不重复落库。