欢迎大家来到IT世界,在知识的湖畔探索吧!
Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息
零、 目录
- IM系统简介
- Netty 简介
- Netty 环境配置
- 服务端启动流程
- 客户端启动流程
- 实战: 客户端和服务端双向通信
- 数据传输载体ByteBuf介绍
- 客户端与服务端通信协议编解码
- 实现客户端登录
- 实现客户端与服务端收发消息
- pipeline与channelHandler
- 构建客户端与服务端pipeline
- 拆包粘包理论与解决方案
- channelHandler的生命周期
- 使用channelHandler的热插拔实现客户端身份校验
- 客户端互聊原理与实现
- 群聊的发起与通知
- 群聊的成员管理(加入与退出,获取成员列表)
- 群聊消息的收发及Netty性能优化
- 心跳与空闲检测
- 总结
- 扩展
一、 实现需求
- 这一小节 , 我们来实现客户端与服务端收发消息, 我们要实现的具体功能是: 在控制台输入一条消息后按回车 , 交验完客户端登录状态之后 , 把消息发送到服务端 , 服务端收到消息之后打印并向客户端回复一条消息 , 客户端收到消息后打印。
二、 收发消息对象
- 首先 ,Test_11_packet的指令集合中添加 发送消息指令为3
/** * 数据包抽象类 * * @author outman */ @Data abstract class Test_11_Packet { // 协议版本号 private byte version = 1; // 获取指定标识 public abstract byte getCommand(); // 指令集合 public interface Command { // 登录指令 public static final byte LOGIN_REQUEST = 1; // 登陆响应指令 public static final byte LOGIN_RESPONSE = 2; // 发送消息指令 public static final byte MESSAGE_REQUEST = 3; // 回复消息指令 public static final byte MESSAGE_RESPONSE = 4; } }
欢迎大家来到IT世界,在知识的湖畔探索吧!
- 我们来定义一下客户端与服务端收发消息对象 , 我们把客户端发送至服务端的消息对象定义为Test_11_MessageRequestPacket
欢迎大家来到IT世界,在知识的湖畔探索吧!/** * 2019年1月3日 * @author outman * * 发送消息对象 */ @Data class Test_11_MessageRequestPacket extends Test_11_Packet{ private String message; @Override public byte getCommand() { return Command.MESSAGE_REQUEST; } }
- 我们把服务端发送至客户端的消息对象定义为 Test_11_messageResponsePacket
/** * 2019年1月3日 * @author outman * 回复消息对象 */ @Data class Test_11_MessageResponsePacket extends Test_11_Packet{ private String message; @Override public byte getCommand() { return Command.MESSAGE_RESPONSE; } }
三、 判断登录是否成功
- 在前面一小节 , 我们在文末出了一道思考题: 如何判断客户端是否已经登录?
- 在客户端启动流程这一章节 , 我们有提到可以给客户端连接也就是channel 绑定属性 , 通过channel.attr(XXX).set(xxx)的方式 , 那么我们是否可以在登录成功之后 , 给channel绑定一个登录成功的标志 , 然后在判断是否登录成功的时候取出这个标志? 答案十肯定的。
- 我们来定义一下登录成功的标志
欢迎大家来到IT世界,在知识的湖畔探索吧!/** * 2019年1月21日 * @author outman * * 连接 属性 * */ interface Test_11_ChannelAttributes { // 连接登录标识属性 AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login"); }
- 然后我们在登录成功之后给连接绑定登录成功标识
- Test_11_clientHandler.java
/** * 有数据可读时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; // 数据包解码 Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf); // 根据不同的指令选择对应的处理逻辑 switch (packet.getCommand()) { case Test_11_Packet.Command.LOGIN_RESPONSE: Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet; System.out.println("客户端:" + new Date() + "收到服务端响应【" + loginResponsePacket.getMsg() + "】"); // 给 连接绑定登录成功标识 if(Test_11_LoginUtil.isSuccess(loginResponsePacket)) { // 登录成功 Test_11_LoginUtil.markAsLogin(ctx.channel()); System.out.println(new Date() + "登录成功"); }else { // 登录失败 System.out.println(new Date() + "登录失败,原因-->"+loginResponsePacket.getMsg()); } break; default: break; } }
- 登录相关工具类
/** * 2019年1月21日 * @author outman * * 登录相关工具类 * */ class Test_11_LoginUtil{ /** * @desc 判断登录成功 * @param loginResponsePacket * @return 是否登录成功 */ public static boolean isSuccess(Test_11_LoginResponsePacket loginResponsePacket) { boolean flag = false; if(loginResponsePacket.getCode() == Test_11_LoginResponsePacket.Code.SUCCESS) { flag = true; } return flag; } /** * @desc 标识连接登录成功 * @param channel * @return */ public static void markAsLogin(Channel channel) { channel.attr(Test_11_ChannelAttributes.LOGIN).set(true); } /** * @desc 判断是否登录 * @param channel * @return */ public static boolean hasLogin(Channel channel) { boolean flag = false; Boolean attr = channel.attr(Test_11_ChannelAttributes.LOGIN).get(); if(attr == null) return flag; return attr; } }
- 如以上代码所示 , 我们出去出LoginUtils用于设置登录成功标志位已经判断是否有标志位
四、 控制台输入消息并发送
- 在客户端启动这小节中 , 我们已经学到了客户端启动流程 , 现在 , 我们在客户端连接上服务端之后启动控制台线程 , 从控制台读取消息然后发送到服务端。
Test_11_client.java /** * @desc 连接服务端 * @param bootstrap * @param ip * @param port * @param maxRetry * @param retryIndex * 重试计数 */ private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) { bootstrap.connect(ip, port).addListener(future -> { int[] finalRetryIndex; // 初始化 重连计数 if (retryIndex.length == 0) { finalRetryIndex = new int[] { 0 }; } else { finalRetryIndex = retryIndex; } // 判断连接状态 if (future.isSuccess()) { System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】成功"); // 启动控制台线程 Channel channel = ((ChannelFuture) future).channel(); startConsoleThread(channel); } else if (maxRetry <= 0) { System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败,达到重连最大次数放弃重连"); } else { // 重连使用退避算法 int delay = 1 << finalRetryIndex[0]; System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败," + delay + "秒后执行重试"); bootstrap.config().group().schedule(() -> { connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1); }, delay, TimeUnit.SECONDS); } }); } /** * @desc 启动控制台线程 * @param channel */ private static void startConsoleThread(Channel channel) { System.out.println("客户端:启动控制台线程"); new Thread(() -> { while (Thread.interrupted()) { if (Test_11_LoginUtil.hasLogin(channel)) { System.out.println("输入消息发送至服务端"); Scanner sc = new Scanner(System.in); String msg = sc.nextLine(); Test_11_MessageRequestPacket messageRequestPacket = new Test_11_MessageRequestPacket(); messageRequestPacket.setMessage(msg); ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(channel.alloc().buffer(), messageRequestPacket); channel.writeAndFlush(byteBuf); } else { System.out.println("您还未登录,请登录..."); } } }).start(); }
- 这里我们省略了非关键代码 , 连接成功之后 , 我们调用startConsoleThread()开启控制台线程 , 然后在控制台线程中 判断只要是登陆状态 , 就允许输入消息
- 从控制台获取消息之后 , 将消息封装成消息对象 , 然后将消息编码成byteBuf ,最后通过writeAndFlush() 将消息写到服务端 , 接下来我们来实现 服务端收到消息之后是如何处理的。
五、 服务端收到消息处理
Test_11_serverHandler.java /** * 有数据可读时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { ByteBuf byteBuf = (ByteBuf) obj; // 解码 Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf); // 根据指令执行对应的处理逻辑 switch (packet.getCommand()) { case Test_11_Packet.Command.LOGIN_REQUEST: Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet; // 模拟校验成功 System.out.println("服务端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陆成功"); // 给客户端响应 Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket(); loginResponsePacket.setCode(Code.SUCCESS); loginResponsePacket.setMsg("登陆成功!"); // 编码 byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket); // 写出数据 ctx.channel().writeAndFlush(byteBuf); break; case Test_11_Packet.Command.MESSAGE_REQUEST : // 处理消息 Test_11_MessageRequestPacket messageRequestPacket = (Test_11_MessageRequestPacket)packet; System.out.println("服务端:"+ new Date() + "收到客户端消息 --> "+ messageRequestPacket.getMessage()); Test_11_MessageResponsePacket messageResponsePacket = new Test_11_MessageResponsePacket(); String msg = messageRequestPacket.getMessage(); msg = msg.replace("?", "!"); msg = msg.replace("?", "!"); messageResponsePacket.setMessage("服务端回复:【"+msg+"】"); Test_11_PacketCodec.INSTANCE.enCode(byteBuf, messageResponsePacket); ctx.channel().writeAndFlush(byteBuf); break; default: System.out.println("服务端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】"); break; } }
- 服务端在收到消息之后 , 仍然回调channelRead() 方法 , 解码之后 用一个case 分支进入消息处理流程
- 首先服务器将收到的消息打印到控制台 , 然后封装一个MessageResponsePacket , 接下来还是编码成byteBuf然后写出到客户端 , 最后我们来实现客户端收到消息的逻辑
六、 客户端消息处理
Test_11_clientHandler.java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; // 数据包解码 Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf); // 根据不同的指令选择对应的处理逻辑 switch (packet.getCommand()) { case Test_11_Packet.Command.LOGIN_RESPONSE: Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet; System.out.println("客户端:" + new Date() + "收到服务端响应【" + loginResponsePacket.getMsg() + "】"); // 给 连接绑定登录成功标识 if (Test_11_LoginUtil.isSuccess(loginResponsePacket)) { // 登录成功 Test_11_LoginUtil.markAsLogin(ctx.channel()); System.out.println("客户端:" + new Date() + "登录成功"); } else { // 登录失败 System.out.println("客户端:" + new Date() + "登录失败,原因-->" + loginResponsePacket.getMsg()); } break; case Test_11_Packet.Command.MESSAGE_RESPONSE : Test_11_MessageResponsePacket messageResponsePacket = (Test_11_MessageResponsePacket)packet; System.out.println("客户端:"+new Date()+ "收到服务端消息 --> "+ messageResponsePacket.getMessage()); break; default: break; } }
- 客户端收到消息之后 , 回调channelRead() ,仍然用一个case 分支进入到消息处理逻辑 , 这里我们仅仅是简单的打印消息
七、 执行结果
八、 完整代码
import java.lang.reflect.Method; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Scanner; import java.util.concurrent.TimeUnit; import com.alibaba.fastjson.JSONObject; import com.tj.NIO_test_maven.Test_11_LoginResponsePacket.Code; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import lombok.Data; /** * 2019年1月3日 * * @author outman * * 实现客户端和服务端收发消息 * */ public class Test_11_实现客户端与服务端收发消息 { public static void main(String[] args) { // 启动服务端 Test_11_server.start(8000); // 启动客户端 Test_11_client.start("127.0.0.1", 8000, 5); } } /** * 2019年1月3日 * * @author outman * * 服务端 */ class Test_11_server { /** * @desc 服务端启动 * @param port */ public static void start(int port) { NioEventLoopGroup bossgroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossgroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 添加服务端处理逻辑 ch.pipeline().addLast(new Test_11_serverHandler()); } }); bind(serverBootstrap, port); } /** * @desc 自动绑定递增并启动服务端 * @param serverBootstrap * @param port */ private static void bind(ServerBootstrap serverBootstrap, int port) { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】成功"); } else { System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】失败,执行递增绑定"); bind(serverBootstrap, port + 1); } }); } } /** * 2019年1月3日 * * @author outman * * 客户端 */ class Test_11_client { /** * 客户端启动 * * @param ip * 连接ip * @param port * 服务端端口 * @param maxRetry * 最大重试次数 */ public static void start(String ip, int port, int maxRetry) { NioEventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 添加 客户端处理逻辑 ch.pipeline().addLast(new Test_11_clientHandler()); } }); // 连接服务端 connect(bootstrap, ip, port, maxRetry); } /** * @desc 连接服务端 * @param bootstrap * @param ip * @param port * @param maxRetry * @param retryIndex * 重试计数 */ private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) { bootstrap.connect(ip, port).addListener(future -> { int[] finalRetryIndex; // 初始化 重连计数 if (retryIndex.length == 0) { finalRetryIndex = new int[] { 0 }; } else { finalRetryIndex = retryIndex; } // 判断连接状态 if (future.isSuccess()) { System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】成功"); // 启动控制台线程 Channel channel = ((ChannelFuture) future).channel(); startConsoleThread(channel); } else if (maxRetry <= 0) { System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败,达到重连最大次数放弃重连"); } else { // 重连使用退避算法 int delay = 1 << finalRetryIndex[0]; System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败," + delay + "秒后执行重试"); bootstrap.config().group().schedule(() -> { connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1); }, delay, TimeUnit.SECONDS); } }); } /** * @desc 启动控制台线程 * @param channel */ private static void startConsoleThread(Channel channel) { System.out.println("客户端:启动控制台线程"); new Thread(() -> { while (!Thread.interrupted()) { if (Test_11_LoginUtil.hasLogin(channel)) { System.out.println("输入消息发送至服务端"); Scanner sc = new Scanner(System.in); String msg = sc.nextLine(); Test_11_MessageRequestPacket messageRequestPacket = new Test_11_MessageRequestPacket(); messageRequestPacket.setMessage(msg); ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(channel.alloc().buffer(), messageRequestPacket); channel.writeAndFlush(byteBuf); } else { System.out.println("您还未登录,请登录..."); } } }).start(); } } /** * 客户端处理逻辑 * * @author outman */ class Test_11_clientHandler extends ChannelInboundHandlerAdapter { /** * 连接成功时触发 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端:" + new Date() + "开始登陆"); // 创建登陆对象 Test_11_LoginRequestPacket loginRequestPacket = new Test_11_LoginRequestPacket(); // 随机取ID 1~999 loginRequestPacket.setUserId((int) (Math.random() * 1000) + 1); loginRequestPacket.setUserName("outman"); loginRequestPacket.setPassword("123456"); // 编码 ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket); // 写出数据 ctx.channel().writeAndFlush(byteBuf); } /** * 有数据可读时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; // 数据包解码 Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf); // 根据不同的指令选择对应的处理逻辑 switch (packet.getCommand()) { case Test_11_Packet.Command.LOGIN_RESPONSE: Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet; System.out.println("客户端:" + new Date() + "收到服务端响应【" + loginResponsePacket.getMsg() + "】"); // 给 连接绑定登录成功标识 if (Test_11_LoginUtil.isSuccess(loginResponsePacket)) { // 登录成功 Test_11_LoginUtil.markAsLogin(ctx.channel()); System.out.println("客户端:" + new Date() + "登录成功"); } else { // 登录失败 System.out.println("客户端:" + new Date() + "登录失败,原因-->" + loginResponsePacket.getMsg()); } break; case Test_11_Packet.Command.MESSAGE_RESPONSE : Test_11_MessageResponsePacket messageResponsePacket = (Test_11_MessageResponsePacket)packet; System.out.println("客户端:"+new Date()+ "收到服务端消息 --> "+ messageResponsePacket.getMessage()); break; default: break; } } } /** * 服务端处理逻辑 * * @author outman */ class Test_11_serverHandler extends ChannelInboundHandlerAdapter { /** * 连接成功时触发 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } /** * 有数据可读时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { ByteBuf byteBuf = (ByteBuf) obj; // 解码 Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf); // 根据指令执行对应的处理逻辑 switch (packet.getCommand()) { case Test_11_Packet.Command.LOGIN_REQUEST: Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet; // 模拟校验成功 System.out.println("服务端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陆成功"); // 给客户端响应 Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket(); loginResponsePacket.setCode(Code.SUCCESS); loginResponsePacket.setMsg("登陆成功!"); // 编码 byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket); // 写出数据 ctx.channel().writeAndFlush(byteBuf); break; case Test_11_Packet.Command.MESSAGE_REQUEST : // 处理消息 Test_11_MessageRequestPacket messageRequestPacket = (Test_11_MessageRequestPacket)packet; System.out.println("服务端:"+ new Date() + "收到客户端消息 --> "+ messageRequestPacket.getMessage()); Test_11_MessageResponsePacket messageResponsePacket = new Test_11_MessageResponsePacket(); String msg = messageRequestPacket.getMessage(); msg = msg.replace("?", "!"); msg = msg.replace("?", "!"); messageResponsePacket.setMessage("服务端回复:【"+msg+"】"); Test_11_PacketCodec.INSTANCE.enCode(byteBuf, messageResponsePacket); ctx.channel().writeAndFlush(byteBuf); break; default: System.out.println("服务端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】"); break; } } } /** * 数据包抽象类 * * @author outman */ @Data abstract class Test_11_Packet { // 协议版本号 private byte version = 1; // 获取指定标识 public abstract byte getCommand(); // 指令集合 public interface Command { // 登录指令 public static final byte LOGIN_REQUEST = 1; // 登陆响应指令 public static final byte LOGIN_RESPONSE = 2; // 发送消息指令 public static final byte MESSAGE_REQUEST = 3; // 回复消息指令 public static final byte MESSAGE_RESPONSE = 4; } } /** * 序列化抽象接口 * * @author outman */ interface Test_11_Serializer { // 获取序列化算法标识 byte getSerializerAlgorithm(); // 序列化算法标识集合 interface SerializerAlgorithm { // JSON 序列化算法标识 public static final byte JSONSerializerAlgrothm = 1; } // 默认的序列化算法 public Test_11_Serializer DEFAULT = new Test_11_JSONSerializer(); // 序列化 byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet); // 反序列化 <T> T deSerialize(byte[] bs, Class<T> clazz); } /** * 数据包编解码类 * * @author outman */ class Test_11_PacketCodec { // 魔数 private static final int MAGIC_NUMBER = 0x12345678; // 单例 public static Test_11_PacketCodec INSTANCE = new Test_11_PacketCodec(); // 注册 序列化类 private Class[] serializerArray = new Class[] { Test_11_JSONSerializer.class }; // 注册抽象数据包类 private Class[] packetArray = new Class[] { Test_11_LoginRequestPacket.class, Test_11_LoginResponsePacket.class ,Test_11_MessageRequestPacket.class ,Test_11_MessageResponsePacket.class}; // 序列化算法标识 和对应的序列化类映射 private static Map<Byte, Class<? super Test_11_Serializer>> serializerMap; // 指令标识和对应的数据包抽象类映射 private static Map<Byte, Class<? super Test_11_Packet>> packetMap; // 初始化 两个映射 private Test_11_PacketCodec() { serializerMap = new HashMap<>(); Arrays.asList(serializerArray).forEach(clazz -> { try { Method method = clazz.getMethod("getSerializerAlgorithm"); byte serializerAlgorthm = (byte) method.invoke((Test_11_Serializer) clazz.newInstance()); serializerMap.put(serializerAlgorthm, clazz); } catch (Exception e) { e.printStackTrace(); } }); packetMap = new HashMap<>(); Arrays.asList(packetArray).forEach(clazz -> { try { Method method = clazz.getMethod("getCommand"); method.setAccessible(true); byte command = (byte) method.invoke((Test_11_Packet) clazz.newInstance()); packetMap.put(command, clazz); } catch (Exception e) { e.printStackTrace(); } }); } // 编码 public ByteBuf enCode(ByteBuf byteBuf, Test_11_Packet packet) { // 序列化数据包 byte[] bs = Test_11_Serializer.DEFAULT.enSerialize(byteBuf, packet); // 写入魔数 byteBuf.writeInt(MAGIC_NUMBER); // 写入协议版本号 byteBuf.writeByte(packet.getVersion()); // 写入指令标识 byteBuf.writeByte(packet.getCommand()); // 写入序列化算法标识 byteBuf.writeByte(Test_11_Serializer.DEFAULT.getSerializerAlgorithm()); // 写入数据长度 byteBuf.writeInt(bs.length); // 写入数据 byteBuf.writeBytes(bs); return byteBuf; } // 解码 public Test_11_Packet deCode(ByteBuf byteBuf) throws Exception { // 跳过魔数校验 byteBuf.skipBytes(4); // 跳过版本号校验 byteBuf.skipBytes(1); // 获取指令标识 byte command = byteBuf.readByte(); // 获取序列化算法标识 byte serializerAlgorthm = byteBuf.readByte(); // 获取数据长度 int len = byteBuf.readInt(); // 获取数据 byte[] bs = new byte[len]; byteBuf.readBytes(bs); // 获取对应的序列化算法类 Test_11_Serializer serializer = getSerializer(serializerAlgorthm); // 获取对应的数据包类 Test_11_Packet packet = getPacket(command); if (serializer != null && packet != null) { // 反序列化数据包 return serializer.deSerialize(bs, packet.getClass()); } else { throw new RuntimeException("没有找到对应的序列化实现或数据包实现"); } } private static Test_11_Packet getPacket(byte command) throws Exception { if(packetMap.get(command) == null) { throw new RuntimeException("未注册的数据包类型-->"+ command); } return (Test_11_Packet) packetMap.get(command).newInstance(); } private static Test_11_Serializer getSerializer(byte serializerAlgorthm) throws Exception { return (Test_11_Serializer) serializerMap.get(serializerAlgorthm).newInstance(); } } /** * 登录请求数据包实体类 * * @author outman */ @Data class Test_11_LoginRequestPacket extends Test_11_Packet { private int userId; private String userName; private String password; @Override public byte getCommand() { return Command.LOGIN_REQUEST; } } /** * 登录响应数据包实体类 * * @author outman */ @Data class Test_11_LoginResponsePacket extends Test_11_Packet { private int code; private String msg; @Override public byte getCommand() { return Command.LOGIN_RESPONSE; } /** * 响应码集合 */ interface Code { // 成功的响应码 public static final int SUCCESS = 10000; // 失败的响应码 public static final int FAIL = 10001; } } /** * Json序列化实现类 * * @author outman */ class Test_11_JSONSerializer implements Test_11_Serializer { @Override public byte getSerializerAlgorithm() { return SerializerAlgorithm.JSONSerializerAlgrothm; } @Override public byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet) { return JSONObject.toJSONBytes(packet); } @Override public <T> T deSerialize(byte[] bs, Class<T> clazz) { return JSONObject.parseObject(bs, clazz); } } /** * 2019年1月3日 * * @author outman * * 发送消息对象 */ @Data class Test_11_MessageRequestPacket extends Test_11_Packet { private String message; @Override public byte getCommand() { return Command.MESSAGE_REQUEST; } } /** * 2019年1月3日 * * @author outman 回复消息对象 */ @Data class Test_11_MessageResponsePacket extends Test_11_Packet { private String message; @Override public byte getCommand() { return Command.MESSAGE_RESPONSE; } } /** * 2019年1月21日 * * @author outman * * 连接 属性 * */ interface Test_11_ChannelAttributes { // 连接登录标识属性 AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login"); } /** * 2019年1月21日 * * @author outman * * 登录相关工具类 * */ class Test_11_LoginUtil { /** * @desc 判断登录成功 * @param loginResponsePacket * @return 是否登录成功 */ public static boolean isSuccess(Test_11_LoginResponsePacket loginResponsePacket) { boolean flag = false; if (loginResponsePacket.getCode() == Test_11_LoginResponsePacket.Code.SUCCESS) { flag = true; } return flag; } /** * @desc 标识连接登录成功 * @param channel * @return */ public static void markAsLogin(Channel channel) { channel.attr(Test_11_ChannelAttributes.LOGIN).set(true); } /** * @desc 判断是否登录 * @param channel * @return */ public static boolean hasLogin(Channel channel) { boolean flag = false; Boolean attr = channel.attr(Test_11_ChannelAttributes.LOGIN).get(); if (attr != null)flag = attr; return flag; } }
十、 总结
- 在本小节中 , 我们定义了收发消息的java 对象进行消息收发
- 然后我们学到了channel的attr() 的实际用法 , 可用通过给channel 绑定属性来设置某些状态 , 获取某些状态 , 不需要额外的map 来维持
- 接着我们学习了如何在控制台获取消息并发送到服务端
- 最后我们实现了服务端回复消息 , 客户端响应的逻辑 。
十一 、 思考
- 随着我们的指令越来越多 , 如何避免channelRead() 中对指令处理的switch-case 泛滥? (方法有很多)
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/51662.html