千家信息网

netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章将为大家详细讲解有关netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关
千家信息网最后更新 2025年12月03日netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能

这篇文章将为大家详细讲解有关netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否有效,以及处理登录认证的UserAuthHandler和消息处理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception {        ch.pipeline().addLast(defLoopGroup,                //编码解码器                new HttpServerCodec(),                //将多个消息转换成单一的消息对象                new HttpObjectAggregator(65536),                //支持异步发送大的码流,一般用于发送文件流                new ChunkedWriteHandler(),                //检测链路是否读空闲,配合心跳handler检测channel是否正常                new IdleStateHandler(60, 0, 0),                //处理握手和认证                new UserAuthHandler(),                //处理消息的发送                new MessageHandler()        );}

对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel

public static void addChannel(Channel channel) {        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);        System.out.println("addChannel:" + remoteAddr);        if (!channel.isActive()) {            logger.error("channel is not active, address: {}", remoteAddr);        }        UserInfo userInfo = new UserInfo();        userInfo.setAddr(remoteAddr);        userInfo.setChannel(channel);        userInfo.setTime(System.currentTimeMillis());        userInfos.put(channel, userInfo);    }

登录后,channel就变成有效的channel,无效的channel之后将会丢弃

public static boolean saveUser(Channel channel, String nick, String password) {        UserInfo userInfo = userInfos.get(channel);        if (userInfo == null) {            return false;        }        if (!channel.isActive()) {            logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);            return false;        }        // 验证用户名和密码        if (nick == null || password == null) {            return false;        }        LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();        lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);        Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);        if (account == null) {            return false;        }        // 增加一个认证用户        userCount.incrementAndGet();        userInfo.setNick(nick);        userInfo.setAuth(true);        userInfo.setId(account.getId());        userInfo.setUsername(account.getUsername());        userInfo.setGroupNumber(account.getGroupNumber());        userInfo.setTime(System.currentTimeMillis());        // 注册该用户推送消息的通道        offlineInfoTransmitStatic.registerPull(channel);        return true;    }

当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。

public static void removeChannel(Channel channel) {        try {            logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));            //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误            rwLock.writeLock().lock();            channel.close();            UserInfo userInfo = userInfos.get(channel);            if (userInfo != null) {                if (userInfo.isAuth()) {                    offlineInfoTransmitStatic.unregisterPull(channel);                    // 减去一个认证用户                    userCount.decrementAndGet();                }                userInfos.remove(channel);            }        } finally {            rwLock.writeLock().unlock();        }    }

为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。

public interface OfflineInfoTransmit {    void pushP2P(Integer userId, String message);    void pushGroup(String groupNumber, String message);    void registerPull(Channel channel);    void unregisterPull(Channel channel);}

其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:

  1. 单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能

  2. 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线

  3. 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。

  4. 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。

项目地址:https://github.com/shuangyueliao/netty-chat

关于netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

消息 中间件 用户 处理 客户 客户端 无缝 切换 在线 存储 认证 消费者 检测 消费 功能 聊天室 有效 内容 就是 文章 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 全市网络安全工作报告 网络安全的几个点 数据库主键和外键的作用 中国常用的经济类数据库有哪些 太原星软软件开发学校 先心病数据库怎么找 上海哪有软件开发培训班 网络安全法等级保护条例 徐州运营软件开发供应商 陕西电商软件开发报价 物联网软件开发是什么 将自己电脑作为远程服务器 吉林服务器机柜供应商 目前tbc哪个服务器人多 服务器的风扇叫什么 浦东网络安全总监 数据库设计图怎么看 数据库数据校验 笔记本电脑服务器连接情况异常 网络安全带来的便利 数据库主键和外键的作用 软件开发费用算劳务 中华医学期刊全文数据库如何引用文献 如何彻底卸载u8数据库 长宁区品牌软件开发代理价格 锐思咨询和锐思数据库 枣庄网络安全招聘 宜章学it软件开发哪个学校好 端游恐龙岛服务器多少钱开 计算机网络技术信号电缆分类
0