【十 三】Netty 私有协议栈开发

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

Netty 私有协议栈开发

私有协议介绍

通信协议从广义上区分可以分为公有协议和私有协议。由于私有协议的灵活性它往往会在某个公司或组织内部使用按需定制升级起来毕竟方便灵活性较好。绝大多数的私有协议传输层都是基于TCP/IP。所以利用Netty 的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。
私有协议本质上是厂商内部发展和采用的标准除非授权其他厂商一般无权使用该协议。
由于现代软件系统的复杂性一个大型软件往往被拆分为多个模块随着移动互联网的兴起网址的规模也越来越大业务的功能越来越多为了能够支撑业务的发展往往需要集群和分布式部署。这样各个模块之前就需要跨节点通信

跨节点通信

传统的跨节点通信
1通过RMI进行远程服务调用
2通过Java 的Socket+Java 序列化的方式进行跨节点调用
3利用一些开源的RPC 框架进行远程服务调用例如FaceBook的Thrift,Apache 的Avro, 阿里巴巴的 Dubbo 等
4利用标准的共有协议进行跨节点服务调用。例如 HTTP+XMLRESTFUL+JSON 或者 WebService。

跨节点的远程服务调用除了链路层的物理连接外还需要对请求和响应消息进行编解码。在请求和应答消息本身以外也需要携带一些其他控制和管理类指令例如链路建立的握手请求和响应消息链路检查的心跳消息等。这些功能组合到一起就会形成私有协议。

私有协议栈设计

该协议用于内部模块之间的通信基于TCP/IP 协议栈是一个类HTTP协议的应用层协议栈相比于传统的标准协议栈它更加轻巧灵活和实用。

私有协议栈的网络拓扑图

在这里插入图片描述
如上图所示每个Netty节点(Netty进程)之间建立长连接使用Netty协议进行通信Netty节点并没有服务端和客户端的区分谁首先发起连接谁就是客户端另一方自然就成为服务端。一个Netty节点既可以作为客户端连接其他的Netty节点也可以作为Netty服务端被其他Netty节点连接完全取决于使用者的业务场景。

协议栈功能描述

该私有协议承载了业务内部各模块之间的消息较好和服务调用主要功能如下:
1基于Netty的NIO通信框架提供高性能POJO的序列化和反序列化
2提供消息的编码解码框架可以实现POJO的序列化和反序列化
3提供基于IP地址的白名单计入认知机制
4链路的有效性校验机制
5链路的断连重连机制

协议的通信模型

在这里插入图片描述
该私有协议 双方链路建立成功之后双方可以进行全双工通信无论客户端还是服务端都可以主动发送请求消息给对方通信方式可以是TWO WAY 或者 ONE WAY。双方之间的心跳采用Ping-Pong机制当链路处于空闲状态时客户端主动发送Ping消息给服务端服务端接收到消息后发送应答消息Pong给客户端如果客户端连续发送N条消息都没有接收到服务端返回的Pong消息说明链路已经挂死或者对方处于异常状态客户端主动关闭连接间隔周期T后发起重连操作。

消息体定义

私有协议栈消息体定义分为两部分
1消息头
2消息体

消息定义表

在这里插入图片描述

私有协议消息头定义

在这里插入图片描述

私有协议支持的字段类型

在这里插入图片描述

私有协议的编解码规范

私有协议的编码

私有协议 NettyMessage 的编码规范
1crcCodejava.nio.ByteBuffer.putInt(int value)。如果采用其他缓存必须于其等价。
2lengthjava.nio.ByteBuffer.putInt(int value)。如果采用其他缓存必须于其等价。
3sessionIDjava.nio.ByteBuffer.putInt(int value)。如果采用其他缓存必须于其等价。
4typejava.nio.ByteBuffer.putLong(long value)。 如果采用其他缓存必须于其等价。
5priorityjava.nio.ByteBuffer.put(byte b)。如果采用其他缓存必须于其等价。
6attachment它的编码规则为如果attachment 长度为0表示没有可选附件则将长度编码设置为0
java.nio.ByteBuffer.putInt(0)如果大于0说明有附件需要编码具体的编码规则如下
一:首先对附件的个数进行编码java.nio.ByteBuffer.putInt(attachment.size());
二:然后对Key进行编码先编码长度再将它转换成数组之后编码内容。
7body的编码通过JBoss Marshallng 将其序列化为byte数组然后调用java.nio.ByteBuffer.put(byte[]src)将其写入ByteBuffer缓冲区中。
由于整个消息的长度必须等全部字段都编码完成之后才能确认所以最后需要更新消息头的length字段将其重新写入ByteBuffer中。

私有协议的解码

相对于NettyMessage的编码仍旧以java.nio.ByteBuffer为例给出Netty协议的解码规范
1crcCode通过java.nio.ByteBuffer.getInt()获取校验码字段其他缓冲区需要于其等价
2length:通过java.nio.ByteBuffer.getInt()获取Netty消息的长度其他缓冲区需要于其等价
3sessionID:通过java.nio.ByteBuffer.getLong()获取会话ID,其他缓冲区需要于其等价
4type通过java.nio.ByteBuffer.get()获取消息类型其他缓冲区需要于其等价
5priority通过java.nio.ByteBuffer.get获取消息优先级其他缓冲区需要于其等价
6attachment它的解码规则为首先创建一个新的attachment对象调用java.nio.ByteBuffer.getInt()获取附件长度如果为0说明附件为空。解码结束继续解码消息体如果非空则根据长度通过for循环进行解码。
7body通过JBoss 的marshaller进行解码

链路的建立

该私有协议栈支持服务端和客户端对于使用该私有协议栈的应用程序而言不需要刻意区分到底是客户端还是服务端在分布式组网环境中一个节点可能既是服务端也是客户端。
考虑到安全链路建立需要通过基于IP地址或者号段的黑白名单 安全认证机制本协议使用基于IP地址的安全认证如果有多个IP通过逗号进行分割。

客户端握手请求

客户端与服务端链路建立成功之后由客户端发送握手请求消息。

握手请求消息定义

1消息头的type 字段值为3
2可选附件个数为0
3消息体为空
4握手消息的长度为22个字节

服务端握手请求

服务端收到客户端的握手请求之后如果IP校验通过返回握手成功应答消息给客户端。应用层链路建立成功。

握手应答消息定义

1消息头的type字段值为4
2可选附件个数为0
3消息体为byte类型的结果0 认证成功1认证失败
链路建立成功之后客户端和服务端就可以互相发送业务消息了。

链路的关闭

由于采用长连接通信在正常的业务运行期间双方通过心跳和业务消息维持链路任何一方都不需要主动关闭连接。但是下面清空服务端和客户端需要关闭连接
1当对方宕机或者重启会主动关闭链路另一方读取到操作系统的通知信号得知对方REST链路需要关闭链路释放自身的句柄等资源。由于采用TCP全双工通信通信双方都需要关闭连接释放资源
2消息读写过程中发送了I/O异常需要主动关闭连接
3心跳消息读写过程中发生了I/O异常需要主动关闭连接
4心跳超时需要主动关闭连接
5发送编码异常等不可恢复错误时需要主动关闭连接

可靠性设计

该私有协议栈可能会运行在非常恶劣的网络环境中网络超时闪断对方进程僵死或者处理缓慢等情况都有可能发送。为了包装在这些极端异常场景下 私有协议栈能够正常工作或者自动恢复需要对可靠性进行规划和设计。

心跳机制

在凌晨等业务低谷期时段如果发生网络问题连接被Hang住时由于没有业务消息很难发现问题。到了白天业务高峰期间会发生大量的网络通信失败严重的会导致一段时间进程内无法处理业务消息。为了解决该问题在网络空闲时采用心跳机制来检测链路的互通性一旦发生网络故障立即关闭链路主动重连。以此来保证 连接可用

设计思路

1当网络处于空闲状态持续时间达到T(连续周期T 没有读写消息)时客户端主动发起Ping 心跳消息给服务端
2如果在下一个周期T到来时客户端没有收到对方发送的Pong心跳应答消息或者读取到服务端发送的其他业务消息则心跳失败计数器加1.
3每当客户端接收到服务端的业务消息或者Pong应答消息将心跳失败计数器清0。当连续N次没有接收到服务端的Pong消息或者业务消息则关闭链路。间隔 INTERVAL时间发起重连操作。
4服务端网络空闲状态持续时间达到T后服务端将心跳失败计数器加1只要接收到客户端发送的Ping消息或者其他业务消息计数器清零。
5服务端濑尿虾N次没有接收到客户端的Ping消息或者其他业务消息则关闭链路释放资源等待客户端重连。

通过Ping-Pong双向心跳机制可用保证无论通信哪一方出现网络故障都能被及时地检测出来为了防止由于短时间内繁忙没有及时返回应答造成的误判设计 只有连续N次心跳检测都失败才认定链路已经损害需要关闭链路并重建链路。

当读或者写心跳消息发送I/O异常时说明链路已经中断此时需要立即关闭链路如果时客户端需要重新发起连接如果时服务端需要清空缓冲的半包信息等待客户端重连。

重连机制

如果链路中断等待INTERVAL周期时间后由客户端发起重连操作如果重连失败间隔周期INTERVAL后发起重连直到重连成功。

为了保证服务端能够有充足的时间释放句柄资源在首次断连时客户端需要 等待INTERVAL 时间后再发起重连而不是失败后就立即重连。

为了保证句柄资源能够及时释放无论什么场景下的重连失败客户端都必须保证自身的资源被及时释放包括不仅限于SocketChannel,Sokcet。 否则大量连接失败 可能导致内存泄漏

重连失败后需要大于异常堆栈信息方便定位跟踪问题。

重复登录保护

当客户端握手成功之后在链路处于正常状态下不允许客户端重复登录以防止客户端在异常状态下反复重连导致句柄资源被耗尽。
服务端接收到客户端的握手请求消息之后首先对IP地址进行合法性校验如果校验成功在缓存的地址表中查看客户端是否已经登录如果已经登录则拒绝重复登录返回错误码-1.同时关闭TCP链路并在服务端的日志打印握手失败的原因。

客户端接收到握手失败的应答消息之后关闭客户端的TCP连接等待INTERVAL时间之后再次发起TCP连接直到认证成功。
为了防止由服务端和客户端对链路状态理解不一致导致的客户端无法握手成功的问题当服务端连续N次心跳超时之后需要主动关闭链路清空该客户端的地址缓存信息以保证后续该客户端可用重连成功防止被重复登录保护机制拒绝掉。

消息缓存重发

无论客户端还是服务端当发送链路中断之后在链路恢复之前缓存在消息队列中待发送的消息不能丢失等链路恢复之后重新发送这些消息保证链路中断期间消息不丢失。
考虑到内存溢出的风险可用设置消息缓存队列的上限当达到上限后拒绝继续向该队列添加新的消息。

安全设计

为了保证整个集群环境的安全内部长连接采用基于IP地址的安全认证机制服务端对握手请求消息的IP地址进行合法性校验如果在白名单之内则校验通过否则拒绝对方连接。
如果将该私有协议放入到公网使用需要采用更加严格的安全认知机制例如基于密码和AES加密的用户+密码认证机制也可以采用SSL/TSL安全传输。

可扩展性设计

该私有协议需要具备一定的扩展能力业务可以在消息头中自定义业务域字段通过Netty 消息头中的可选附件attachment字段业务可以方便地进行自定义扩展。

私有协议栈开发

maven 依赖

        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.4.11.Final</version>
        </dependency>

        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.4.11.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jibx</groupId>
            <artifactId>jibx-run</artifactId>
            <version>1.4.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.jibx/jibx-extras -->
        <dependency>
            <groupId>org.jibx</groupId>
            <artifactId>jibx-extras</artifactId>
            <version>1.4.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.jibx/jibx-bind -->
        <dependency>
            <groupId>org.jibx</groupId>
            <artifactId>jibx-bind</artifactId>
            <version>1.4.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.jibx/jibx-tools -->
        <dependency>
            <groupId>org.jibx</groupId>
            <artifactId>jibx-tools</artifactId>
            <version>1.4.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.jibx/jibx-schema -->
        <dependency>
            <groupId>org.jibx</groupId>
            <artifactId>jibx-schema</artifactId>
            <version>1.4.2</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.bcel/bcel -->
        <dependency>
            <groupId>org.apache.bcel</groupId>
            <artifactId>bcel</artifactId>
            <version>6.7.0</version>
        </dependency>

       <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId> <!-- Use 'netty5-all' for 5.0-->
            <version>5.0.0.Alpha1</version>
            <scope>compile</scope>
        </dependency>

数据结构定义

NettyMessage 类

public class NettyMessage {

    //消息头
    private Header header;
    //消息体
    private Object body;

    public Header getHeader() {
        return header;
    }

    public void setHeader(Header header) {
        this.header = header;
    }

    public Object getBody() {
        return body;
    }

    public void setBody(Object body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return "NettyMessage{" +
                "header=" + header +
                ", body=" + body +
                '}';
    }
}

消息头 Header

public class Header {
    private int crcCode = 0xabef0101;
    //消息长度
    private int length;
    //会话ID
    private long sessionID;
    //消息类型
    private byte type;
    //消息优先级
    private byte priority;
    //附件
    private Map<String, Object> attachment = new HashMap<String, Object>();


    public int getCrcCode() {
        return crcCode;
    }

    public void setCrcCode(int crcCode) {
        this.crcCode = crcCode;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public long getSessionID() {
        return sessionID;
    }

    public void setSessionID(long sessionID) {
        this.sessionID = sessionID;
    }

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    public byte getPriority() {
        return priority;
    }

    public void setPriority(byte priority) {
        this.priority = priority;
    }

    public Map<String, Object> getAttachment() {
        return attachment;
    }

    public void setAttachment(Map<String, Object> attachment) {
        this.attachment = attachment;
    }

    @Override
    public String toString() {
        return "Header{" +
                "crcCode=" + crcCode +
                ", length=" + length +
                ", sessionID=" + sessionID +
                ", type=" + type +
                ", priority=" + priority +
                ", attachment=" + attachment +
                '}';
    }
}

消息类型 MessageType

public enum MessageType {
    //业务请求消息
    SERVICE_REQ((byte)0),
    //业务响应消息
    SERVICE_RESP((byte)1),
    //业务ONE WAY 消息
    ONE_WAY((byte)2),
    //握手请求消息
    LOGIN_REQ((byte)3),
    //握手响应消息
    LOGIN_RESP((byte)4),
    //心跳请求消息
    HEARTBEAT_REQ((byte)5),
    //心跳响应消息
    HEARTBEAT_RESP((byte)6);
    private byte value;
    MessageType (byte value){
        this.value=value;
    }

    public byte value(){
        return value;
    }
}

消息工厂类 NettyMessageFactory

抽取公共的方法

public class NettyMessageFactory {


    public static NettyMessage buildNettyMessage(byte value){
        NettyMessage message=new NettyMessage();
        Header header=new Header();
        header.setType(value);
        message.setHeader(header);
        return message;
    }
}

消息编解码

分别定义了NettyMessageDecoder和NettyMessageEncoder。用于NettyMessage消息的编解码。

Netty消息编码 NettyMessageEncoder

public final class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    MarshallingEncoder marshallingEncoder;

    public NettyMessageEncoder() throws IOException {
        System.out.println("NettyMessageEncoder 构造");
        this.marshallingEncoder=new MarshallingEncoder();
    }


    /**
     * 编码
     * @param context
     * @param nettyMessage
     * @param sendBuf
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext context,
                          NettyMessage nettyMessage, ByteBuf sendBuf) throws Exception {
        System.out.println("NettyMessageEncoder.encode");
        if (nettyMessage ==null || nettyMessage.getHeader()==null){
            throw new Exception("The encode message is null");
        }
       // ByteBuf sendBuf= Unpooled.buffer();

        sendBuf.writeInt(nettyMessage.getHeader().getCrcCode());//校验码
        sendBuf.writeInt(nettyMessage.getHeader().getLength());//总长度
        sendBuf.writeLong(nettyMessage.getHeader().getSessionID());//回话id
        sendBuf.writeByte(nettyMessage.getHeader().getType());//消息类型
        sendBuf.writeByte(nettyMessage.getHeader().getPriority());//优先级
        //附件长度编码
        //编码规则为如果attachment的长度为0表示没有可选附件则将长度	编码设置为0
        //如果attachment长度大于0则需要编码规则
        //首先对附件的个数进行编码
        sendBuf.writeInt(nettyMessage.getHeader().getAttachment().size());


        String key=null;

        byte[] keyArray=null;
        Object value= null;
        //然后循环对每个附件进行编码
        for (Map.Entry<String,Object> param:nettyMessage.getHeader().getAttachment().entrySet()){
            key=param.getKey();
            keyArray=key.getBytes(StandardCharsets.UTF_8);
            sendBuf.writeInt(keyArray.length);
            sendBuf.writeBytes(keyArray);
            //采用JBoss  的Marshaling 进行编码
            marshallingEncoder.encode(value,sendBuf);
        }

        key =null;
        keyArray=null;
        value=null;
        if (nettyMessage.getBody()!=null){
            //body 也是采用JBoss 的Marshalling进行编码
            marshallingEncoder.encode(nettyMessage.getBody(),sendBuf);
        }else {
            //如果没有数据则进行补位方便后续的decoder操作
            sendBuf.writeInt(0);

        }
        //最后我们要获取整个数据包的总长度也就是header + body
        //这里需要减掉 8个字节是因为 要把CRC 和长度本身占的减掉
        //总长度是在header 协议的第二个标记字段中
        //第一个参数是长度属性的索引位置
        sendBuf.setInt(4,sendBuf.readableBytes()-8);
    }
}

JBoss Marshalling 编码类 MarshallingEncoder

public class MarshallingEncoder {
    //空白占位用于预留设置 body 的数据长度包长度
    private static final byte[] LENGTH_PLACEHOLDER=new byte[4];

    Marshaller marshaller;

    public MarshallingEncoder () throws IOException {
       marshaller=MarshallingCodeCFactory.buildMarshalling();
    }

    public void encode(Object msg, ByteBuf out) throws IOException {
        try {
            //必须要知道当前的数据位置在哪起始位置
            int lengthPos=out.writerIndex();
            //占位写操作先写一个4个字节的空的内容记录起始数据位置用于设置内容长度
            out.writeBytes(LENGTH_PLACEHOLDER);
            ChannelBufferByteOutput output=new ChannelBufferByteOutput(out);
            marshaller.start(output);
            marshaller.writeObject(msg);
            marshaller.finish();
            //总长度(结束位置)-初始化长度(起始位置)-预留的长度 = body 数据长度
            out.setInt(lengthPos,out.writerIndex()-lengthPos-4);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            marshaller.close();
        }
    }

}


MarshallingEncoder 依赖类ChannelBufferByteOutput

public class ChannelBufferByteOutput implements ByteOutput {

    private final ByteBuf byteBuf;

    public ChannelBufferByteOutput(ByteBuf byteBuf){
        this.byteBuf=byteBuf;
    }


    @Override
    public void write(int i) throws IOException {
        byteBuf.writeByte(i);
    }

    @Override
    public void write(byte[] bytes) throws IOException {
        byteBuf.writeBytes(bytes);
    }

    @Override
    public void write(byte[] bytes, int srcIndex, int length) throws IOException {
        byteBuf.writeBytes(bytes,srcIndex,length);
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public void flush() throws IOException {

    }

    public ByteBuf getByteBuf() {
        return byteBuf;
    }
}

消息解码类 NettyMessageDecoder

public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

    MarshallingDecoderPrivate marshallingDecoder;

    /**
     *
     * @param maxFrameLength 代表最大的序列化长度
     * @param lengthFieldoffset 长度属性的偏移量简单来说就是message中
     *                          总长度的起始位置(Header中的Length属性的起始位置)本例中是 4
     * @param lengthFieLength 代表长度属性的长度整个属性占多长 4
     * @throws IOException
     */
    public NettyMessageDecoder(int maxFrameLength,int lengthFieldoffset,int lengthFieLength) throws IOException {
        super(maxFrameLength,lengthFieldoffset,lengthFieLength);
        System.out.println("NettyMessageDecoder 构造方法");
        marshallingDecoder=new MarshallingDecoderPrivate();
    }
    protected Object decode(ChannelHandlerContext context, ByteBuf in) throws IOException {
        System.out.println("NettyMessageDecoder decode.....");
        ByteBuf frame= null;
        try {
            frame = (ByteBuf) super.decode(context,in);
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (frame==null){
            return null;
        }

        NettyMessage message=new NettyMessage();
        Header header=new Header();
        //代码写错了将frame 写成了in .排查问题 2小时。
        //此时 in.readInt 报错。校验的时候 报数组下标越界.
        header.setCrcCode(frame.readInt());
        header.setLength(frame.readInt());
        header.setSessionID(frame.readLong());
        header.setType(frame.readByte());
        header.setPriority(frame.readByte());
        int size=frame.readInt();
        //附件个数大于0则需要解码操作
        if (size>0){
            Map<String,Object> attachment=new HashMap<>(size);
            int keySize=0;
            byte[]keyArray=null;
            String key =null;
            for (int i=0;i<size;i++){
                keySize=in.readInt();
                keyArray=new byte[keySize];
                in.readBytes(keyArray);
                key=new String(keyArray,"UTF-8");
                attachment.put(key,marshallingDecoder.decode(frame));
            }
            keyArray=null;
            key=null;
            header.setAttachment(attachment);
        }
        if (in.readableBytes()>4){
            message.setBody(marshallingDecoder.decode(in));
        }
        //对于ByteBuf来说对一个数据就少一个所以读完header生效就是body了。
        message.setHeader(header);
        return message;

    }


}

JBoss Marshalling 解码类 MarshallingDecoderPrivate

public  class MarshallingDecoderPrivate {
    
        private final Unmarshaller unmarshaller;
    
        public MarshallingDecoderPrivate() throws IOException {
            unmarshaller = MarshallingCodeCFactory.buildMUnMarshaller();
        }
    
        public Object decode(ByteBuf in) throws IOException {
            //首先读取4个长度
            int objectSize = in.readInt();
            //获取实际body 的缓冲内容
            ByteBuf buf = in.slice(in.readerIndex(), objectSize);
            //转换
            ByteInput input = new ChannelBufferByteInput(buf);
            try {
                //读取
                unmarshaller.start(input);
                Object obj = unmarshaller.readObject();
                unmarshaller.finish();
                //读取完成后更新当前读取起始位置
                in.readerIndex(in.readerIndex() + objectSize);
                return obj;
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } finally {
                unmarshaller.close();
            }
            return null;
        }
    }

MarshallingDecoderPrivate 依赖类 ChannelBufferByteInput

public class ChannelBufferByteInput implements ByteInput {
    private final ByteBuf byteBuf;

    public ChannelBufferByteInput(ByteBuf byteBuf) {
        this.byteBuf = byteBuf;
    }


    @Override
    public int read() throws IOException {
        if (byteBuf.isReadable()) {
            return byteBuf.readByte() & 0xff;
        }
        return -1;
    }

    @Override
    public int read(byte[] bytes) throws IOException {

        return read(bytes,0,bytes.length);
    }

    @Override
    public int read(byte[] bytes, int dstIndex, int length) throws IOException {
        int available=available();
        if (available==0){
            return -1;
        }
        length=Math.min(available,length);
        byteBuf.readBytes(bytes,dstIndex,length);
        return length;
    }

    @Override
    public int available() throws IOException {
        return byteBuf.readableBytes();
    }

    @Override
    public long skip(long bytes) throws IOException {
        int readAble=byteBuf.readableBytes();
        if (readAble<bytes){
            bytes=readAble;
        }
        byteBuf.readerIndex((int)(byteBuf.readerIndex()+bytes));
        return bytes;
    }

    @Override
    public void close() throws IOException {

    }
}

握手和安全认证

客户端握手业务处理类 LoginAuthRespHandler

public class LoginAuthReqHandler extends ChannelHandlerAdapter {

    public void channelActive(ChannelHandlerContext context){
        System.out.println("LoginAuthReqHandler .channelActive");
        //握手成功后客户端给服务端发送消息类型为3
        NettyMessage message=NettyMessageFactory.buildNettyMessage(MessageType.LOGIN_REQ.value());
        System.out.println("握手成功后,客户端给服务端发送心跳消息 --->"+message);
        context.writeAndFlush(message);
    }


    public void channelRead(ChannelHandlerContext context,Object msg){
        System.out.println("LoginAuthReqHandler.channelRead");
        NettyMessage message=(NettyMessage)msg;
        //如果是握手应答消息需要判断是否认证成功
        if (message.getHeader()!=null&&message.getHeader().getType()== MessageType.LOGIN_REQ.value()){
            byte loginResult=(byte) message.getBody();
            //非0 表示认证失败
            if (loginResult!= (byte) 0){
                //握手失败关闭连接关闭链路。等会发现连接
                context.close();
            }else {
                System.out.println("Login is ok : "+message);
                context.fireChannelRead(msg);
            }
        }
        else {
            //如果不是握手应答消息则直接传给后面的ChannelHandler进行错了
            context.fireChannelRead(msg);
        }
    }

    public void exceptionCaught(ChannelHandlerContext context,Throwable cause){
        context.fireExceptionCaught(cause);
    }

}


服务端握手业务处理类 LoginAuthRespHandler

public class LoginAuthRespHandler extends ChannelHandlerAdapter {
    private Map<String,Boolean> nodeCheck=new ConcurrentHashMap<>();

    //定义白名单
    private String[] whiteList={"127.0.0.1","172.21.27.85"};

    public void channelRead(ChannelHandlerContext context,Object msg){
        System.out.println("LoginAuthRespHandler.channelRead");
        NettyMessage message =(NettyMessage) msg;
        //如果是握手请求消息处理
        if (message.getHeader()!=null&&message.getHeader().getType()== MessageType.LOGIN_REQ.value()){
            String nodeIndex=context.channel().remoteAddress().toString();
            NettyMessage loginResp=null;
            //重复登录拒绝防止由于客户端重复登录导致的句柄泄露(即这个对象 无法被回收导致内存无效使用)
            if (nodeCheck.containsKey(nodeIndex)){
                loginResp=buildResponse((byte)-1);
            }else {
                //校验地址是否在白名单中如果在则通过握手成功
                InetSocketAddress address=(InetSocketAddress) context.channel().remoteAddress();
                String ip=address.getAddress().getHostAddress();
                boolean isOk=false;
                for (String whiteIp:whiteList){
                    if (whiteIp.equals(ip)){
                        isOk=true;
                        break;
                    }

                }
                //握手成功后需要返回握手应答消息
                loginResp= isOk?buildResponse((byte)0):buildResponse((byte)-1);
                if (isOk){
                    nodeCheck.put(nodeIndex,true);
                }

            }
            System.out.println("The login response is : "+loginResp+" body ["+loginResp.getBody()+" ]");
            context.writeAndFlush(loginResp);

        }else {
            context.fireChannelRead(msg);
        }
    }

    private NettyMessage buildResponse(byte result){
        NettyMessage message = NettyMessageFactory.buildNettyMessage(MessageType.LOGIN_RESP.value());
        message.setBody(result);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext context,Throwable cause){
        //发生异常的时候删除缓存中 的地址信息保证后续客户端能重连成功
        nodeCheck.remove(context.channel().remoteAddress().toString());
        context.close();
        context.fireExceptionCaught(cause);
    }

}

心跳检测逻辑

客户端心跳业务处理类 HeartBeatReqHandler

public class HeartBeatReqHandler extends ChannelHandlerAdapter {
    private volatile ScheduledFuture<?> heartBeat;

    public void channelRead(ChannelHandlerContext context,Object msg){
        System.out.println("HeartBeatReqHandler.channelRead");
        NettyMessage message=(NettyMessage) msg;
        //握手成功主动发送心跳消息
        if (message.getHeader()!=null&& message.getHeader().getType()== MessageType.LOGIN_RESP.value()){
            System.out.println("客户端发送消息给服务端.... ");
            //如果握手成功循环给服务端发送心跳
            heartBeat =context.executor().scheduleAtFixedRate(
                    new HeartBeatTask(context),0,5000, TimeUnit.MILLISECONDS);

            //如果握手消息则 打印服务端的消息
        }else if (message.getHeader()!=null&& MessageType.HEARTBEAT_RESP.value()==message.getHeader().getType()){
            System.out.println("Client receive server heart beat message : --->"+message);
        }else {
            context.fireChannelRead(msg);
        }
    }

    private class HeartBeatTask implements Runnable{
        private final ChannelHandlerContext context;

        public HeartBeatTask(ChannelHandlerContext context){
            this.context=context;
        }


        @Override
        public void run() {
            //构造NettyMessage 实体然后通过ChannelHandlerContext 发送心跳
            NettyMessage beatMessage= NettyMessageFactory.buildNettyMessage(MessageType.HEARTBEAT_REQ.value());
            System.out.println("Client send start beat message to server : -->"+beatMessage);
            context.writeAndFlush(beatMessage);
        }

    }
    public void exceptionCaught(ChannelHandlerContext context,Throwable cause){
        if (heartBeat!=null){
            heartBeat.cancel(true);
            heartBeat=null;
        }
        context.fireExceptionCaught(cause);
    }
}

服务端心跳业务处理类 HeartBeatRespHandler

public class HeartBeatRespHandler extends ChannelHandlerAdapter {
    public void channelActive(ChannelHandlerContext context){
        System.out.println("channel is connected");
    }
    public void channelRead(ChannelHandlerContext context,Object msg){
        System.out.println("HeartBeatRespHandler.channelRead");
        NettyMessage message =(NettyMessage) msg;
        //返回心跳应答消息构造NettyMessage 直接返回即可
        //然后打印 接收和发送的心跳消息
        if (message.getHeader()!=null&&message.getHeader().getType()== MessageType.HEARTBEAT_REQ.value()){
            System.out.println("receive client heart beat message : -->"+message);
            NettyMessage heartBeat= NettyMessageFactory.buildNettyMessage(MessageType.HEARTBEAT_RESP.value());
            System.out.println("Send heart beat response message to client : --> "+heartBeat);
            context.writeAndFlush(heartBeat);
        }else {
            context.fireChannelRead(msg);
        }
    }


}

启动类

客户端启动类 NettyClient

public class NettyClient {
    private ScheduledExecutorService executorService= Executors.newScheduledThreadPool(1);

    EventLoopGroup group=new NioEventLoopGroup();


    public void connect(String host,int port){
        try {
            //配置客户端NIO线程组
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(group)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //NettyMessageDecoder 用于消息解码为了防止由于单条消息过大导致的内存
                            //溢出或者畸形码流导致解码错位引起内存分配失败对单条消息的最大长度做了上限限制
                            socketChannel.pipeline()
                                    .addLast(new NettyMessageDecoder(1024*1024,4,4));
                            //NettyMessageEncoder 用于协议消息的自动编码
                            socketChannel.pipeline()
                                    .addLast(new NettyMessageEncoder());
                            //ReadTimeoutHandler用于处理超时
                            socketChannel.pipeline()
                                    .addLast("readTimeoutHandler",new ReadTimeoutHandler(50));
                            //LoginAuthReqHandler 握手处理
                            socketChannel.pipeline()
                                    .addLast("LoginAuthHandler",new LoginAuthReqHandler());
                            //HeartBeatReqHandler 心跳处理
                            socketChannel.pipeline()
                                    .addLast("HeartBeatHandler",new HeartBeatReqHandler());
                        }
                    });
            ChannelFuture future= bootstrap.connect(host,port).sync();
            System.out.println("The client is connect to server with port : "+port);
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("链接失败....."+e.getMessage());
        }finally {
           //所有资源释放完成之后清空资源再次发起重连操作
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        //重连操作
                        System.out.println("发起重连操作");
                        connect("127.0.0.1",8080);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    public static void main(String[] args) {
        new NettyClient().connect("127.0.0.1",8080);
    }
}

服务端启动类 NettyServer

public class NettyServer {
    public void bind(){
        //配置服务端的NIO线程组
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new NettyMessageDecoder(1024*1024,4,4));

                            socketChannel.pipeline().addLast(new NettyMessageEncoder());
                            socketChannel.pipeline()
                                    .addLast("readTimeoutHandler",new ReadTimeoutHandler(50));
                            socketChannel.pipeline()
                                    .addLast(new LoginAuthRespHandler());
                            socketChannel.pipeline()
                                    .addLast("heartBeatHandler",new HeartBeatRespHandler());
                           /* socketChannel.pipeline()
                                    .addLast(new ServerHandler());*/
                        }
                    });
            ChannelFuture future=bootstrap.bind("127.0.0.1",8080).sync();
            System.out.println("the server is started ...");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyServer().bind();
    }
}

测试

正常场景

依次启动服务端和客户端。

服务端打印截图:

在这里插入图片描述

客户端打印截图

在这里插入图片描述
客户端和服务端都能正常打印数据发送心跳请求和握手请求。没有发送丢包和粘包现象。

模拟服务端宕机

在服务端和客户端正常连接发送消息的过程中。重启服务端。
观察如下功能是否正常
1客户端能否正常发起重连
2重连成功后不再发起重连
3短链期间心跳定时器停止工作不再发送心跳请求消息
4服务端重启成功后允许客户端重新登录
5服务端重启成功过之后客户端端能重连并握手成功
6重连成功之后双方的心跳能够正常互发
7性能指标重连期间客户端资源得到正确回收不会导致句柄资源泄漏

服务端关闭后截图

在这里插入图片描述

机器内存使用情况截图:

在这里插入图片描述
截图可以看出内存没有随着发起 重连请求的次数增加而增加说明没有发送句柄泄漏。
但是本机模拟的数据较小可能不太准确小伙伴们可以多弄几个客户端测试下。

服务端启动后截图

在这里插入图片描述

通过截图能看出上述功能均正常。

模拟客户端宕机

客户端宕机后服务端需要能够清除缓存信息。允许客户端重新登录。

关闭客户端之后的服务端截图

在这里插入图片描述

重启客户端之后的服务端截图

在这里插入图片描述

常见bug

我本人遇到的问题最多 的还是编码解码那块的代码。错了之后系统把异常吃掉了只能是去debugger跟踪。
很是麻烦。在此讲下自己遇到的一些问题。

问题现象 服务端接收不到数据

问题简单说明如截图。问题的现象是服务端接收不到数据。捋清楚的思路想了下 应该是服务端解码出现了问题。然后就一直debugg 看了还好找到了。

NettyMessageDecoder

在这里插入图片描述

问题现象 客户端无法发送数据

也是分析了很久发现是客户端无法发送数据。大概率是发送数据的时候编码出现了问题。

NettyMessageEncoder

错误的版本 错误的版本中NettyMessageEncoder 是继承的MessageToMessageEncoder
还是重写的encode方法。但是 入参不一样。错误的版本中有个Listout 参数。需要我们 把编码后的ByteBuf 对象加入到 out 属性中继续传递下。但是没有加一行代码。也是找了半天问题。关键是要定位是什么问题
比如客户端无法发出数据还是客户端无法接收数据还是服务端无法发出数据服务端无法接收数据。
我刚好遇到了客户端无法发出数据和服务端无法接收数据 两个问题。

public final class NettyMessageEncoder extends MessageToMessageEncoder<NettyMessage> {

    MarshallingEncoder marshallingEncoder;

    public NettyMessageEncoder() throws IOException {
        System.out.println("NettyMessageEncoder 构造");
        this.marshallingEncoder=new MarshallingEncoder();
    }


    /**
     * 编码
     * @param context
     * @param nettyMessage
     * @param list
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext context,
                          NettyMessage nettyMessage, List<Object> list) throws Exception {
        //System.out.println("NettyMessageEncoder.encode");
        if (nettyMessage ==null || nettyMessage.getHeader()==null){
            throw new Exception("The encode message is null");
        }
       ByteBuf sendBuf= Unpooled.buffer();

        sendBuf.writeInt(nettyMessage.getHeader().getCrcCode());//校验码
        sendBuf.writeInt(nettyMessage.getHeader().getLength());//总长度
        sendBuf.writeLong(nettyMessage.getHeader().getSessionID());//回话id
        sendBuf.writeByte(nettyMessage.getHeader().getType());//消息类型
        sendBuf.writeByte(nettyMessage.getHeader().getPriority());//优先级
        //附件长度编码
        //编码规则为如果attachment的长度为0表示没有可选附件则将长度	编码设置为0
        //如果attachment长度大于0则需要编码规则
        //首先对附件的个数进行编码
        sendBuf.writeInt(nettyMessage.getHeader().getAttachment().size());


        String key=null;

        byte[] keyArray=null;
        Object value= null;
        //然后循环对每个附件进行编码
        for (Map.Entry<String,Object> param:nettyMessage.getHeader().getAttachment().entrySet()){
            key=param.getKey();
            keyArray=key.getBytes(StandardCharsets.UTF_8);
            sendBuf.writeInt(keyArray.length);
            sendBuf.writeBytes(keyArray);
            //采用JBoss  的Marshaling 进行编码
            marshallingEncoder.encode(value,sendBuf);
        }

        key =null;
        keyArray=null;
        value=null;
        if (nettyMessage.getBody()!=null){
            //body 也是采用JBoss 的Marshalling进行编码
            marshallingEncoder.encode(nettyMessage.getBody(),sendBuf);
        }else {
            //如果没有数据则进行补位方便后续的decoder操作
            sendBuf.writeInt(0);

        }
        //最后我们要获取整个数据包的总长度也就是header + body
        //这里需要减掉 8个字节是因为 要把CRC 和长度本身占的减掉
        //总长度是在header 协议的第二个标记字段中

        sendBuf.setInt(4,sendBuf.readableBytes()-8);
    }
}

bug分析指导

当出现了问题较难排查时可以将客户端和服务端分开。用自己的客户端 连接 正确的服务端来判断客户端是否有问题。用自己的服务端来连接正确的客户端。判断服务端是否有问题。
当然了本博客的代码基本流程时正确的大家可以模拟去验证下自己写的代码。

总结

本章的内容比较多从内容介绍协议栈的设计各种场景的考虑以及实现细节的分析到代码的落地。都有详细的说明。
如果我们需要自己在工作中开发协议栈这个博客是个很好的例子供我们学习然后逐步深入。

参考内容:

<<netty权威指南>> 需要可以评论区留下邮箱
博客:https://blog.csdn.net/qq_42651904/article/details/106484685

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6