一起学netty(16)自定义协议

weblog 1206 0 0

什么是网络协议?

网络协议就是为计算机网络中进行数据交换而建立的规则、标准或约定的集合。有了网络协议才能实现不同设备、不同操作系统、不同软件之间的数据交换。

我们常说的tcp协议和udp协议其实是操作系统级别的底层协议(传输层)。很多的应用层的协议都是基于tcp协议或udp协议来实现的,比如FTP(文件传送协议)、Telnet(远程登录协议)、DNS(域名解析协议)、SMTP(邮件传送协议),POP3协议(邮局协议),HTTP协议(Hyper Text Transfer Protocol)。以及java开发中经常用到的协议比如dubbo协议,redis协议,websocket协议,mqtt协议等。

注意,本专题所说的netty可不是一个协议,netty是一个高性能的网络框架,再此框架的基础上能够帮助我们快速开发支持各种协议的网络应用。本节我们就讨论如何在netty框架上来开发一个自定义的协议。

netty中定义协议

刚才说了网络协议的概念,到了应用层其实就是对数据进行编码和解码的约定或规则。那么要字节定义协议,肯定要自己去定义这些规则。举个简单的例子:

/**
     * 消息头的长度共9个字节
     */
    public static class MessageBody{
        // 消息类型 1字节
        public byte type;
        // 命令 4字节
        public int command;
        // 消息体长度 4字节
        public int messageLength;
        // 消息体(可以是其他自定义类型)
        public String message;

        @Override
        public String toString() {
            return "MessageBody{" +
                    "type=" + type +
                    ", command=" + command +
                    ", messageLength=" + messageLength +
                    ", message='" + message + '\'' +
                    '}';
        }
}

上述代码定义了一个完整消息的规范。可以把type、command、messageLength这三个字段看作消息头,message是消息体。其中messageLength代表消息体的长度,占4个字节,type消息类型1个字节,command四个字节,那么完整的消息头就是9个字节。

服务端代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.net.InetSocketAddress;
import java.util.List;

public class NettyServer5 {
    public static void main(String[] args) throws InterruptedException {

        NioEventLoopGroup boot = new NioEventLoopGroup(1);
        NioEventLoopGroup work = new NioEventLoopGroup(8);

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boot,work)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 自定义解码器,将byte字节数据转成 MessageBody 类型,然后传入下一个处理器
                        pipeline.addLast(new ByteToMessageDecoder() {
                            @Override
                            protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
                                // 可读数据长度必须大于消息头的长度
                                int length = byteBuf.readableBytes();
                                if(length>9){
                                    // 标记读指针
                                    byteBuf.markReaderIndex();
                                    byte type = byteBuf.readByte();
                                    int commond = byteBuf.readInt();
                                    int messageLength = byteBuf.readInt();
                                    int len  = byteBuf.readableBytes();
                                    // 判断消息体是否完整
                                    if(len>=messageLength){
                                        byte[] bytes = new byte[messageLength];
                                        byteBuf.readBytes(bytes);
                                        String mesage = new String(bytes,"utf-8");
                                        MessageBody body = new MessageBody();
                                        body.message=mesage;
                                        body.command=commond;
                                        body.type=type;
                                        body.messageLength=messageLength;
                                        list.add(body);
                                        // 读到一条完整的消息则返回,将消息交给下一个处理器处理
                                        return;
                                    }
                                    // 没有读到一条完整的消息,则重置读指针
                                    byteBuf.resetReaderIndex();
                                }
                            }
                        });
                        // 处理器 处理MessageBody类型的消息
                        pipeline.addLast(new SimpleChannelInboundHandler<MessageBody>(){
                            @Override
                            public void channelRead0(ChannelHandlerContext ctx, MessageBody body) throws Exception {
                                // 打印消息
                                System.out.println(body);
                                ctx.fireChannelActive();
                            }
                        });
                    }
                });
        ChannelFuture bind = bootstrap.bind(new InetSocketAddress(8099));
        bind.sync();
    }

    /**
     * 消息头的长度共9个字节
     */
    public static class MessageBody{
        // 消息类型 1字节
        public byte type;
        // 命令 4字节
        public int command;
        // 消息体长度 4字节
        public int messageLength;
        // 消息体(可以是其他自定义类型)
        public String message;

        @Override
        public String toString() {
            return "MessageBody{" +
                    "type=" + type +
                    ", command=" + command +
                    ", messageLength=" + messageLength +
                    ", message='" + message + '\'' +
                    '}';
        }
    }

}

对于消息的解析,主要看解码器部分:

// 自定义解码器,将byte字节数据转成 MessageBody 类型,然后传入下一个处理器
pipeline.addLast(new ByteToMessageDecoder() {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 可读数据长度必须大于消息头的长度
        int length = byteBuf.readableBytes();
        if(length>9){
            // 标记读指针
            byteBuf.markReaderIndex();
            byte type = byteBuf.readByte();
            int commond = byteBuf.readInt();
            int messageLength = byteBuf.readInt();
            int len  = byteBuf.readableBytes();
            // 判断消息体是否完整
            if(len>=messageLength){
                byte[] bytes = new byte[messageLength];
                byteBuf.readBytes(bytes);
                String mesage = new String(bytes,"utf-8");
                MessageBody body = new MessageBody();
                body.message=mesage;
                body.command=commond;
                body.type=type;
                body.messageLength=messageLength;
                list.add(body);
                // 读到一条完整的消息则返回,将消息交给下一个处理器处理
                return;
            }
            // 没有读到一条完整的消息,则重置读指针
            byteBuf.resetReaderIndex();
        }
    }
});

首先要解析消息头,因为只有解析完消息头才能确定消息体的长度。消息头一共9个字节的大小,所以先判断消息的长度是否大于9个字节,大于9个字节的时候再按照字节读取:

byte type = byteBuf.readByte();
int commond = byteBuf.readInt();
int messageLength = byteBuf.readInt();
int len  = byteBuf.readableBytes();

接下来判断消息体的长度,判断byteBuf缓存中数据的长度,是否大于等于消息头中标记的数据长度即:len>=messageLength,返回true代表byteBuf中已包含完整的消息体数据,可以进行读取。否则说明消息体的数据还不完整,并重置读指针byteBuf.resetReaderIndex();,如果读取到一个完整的消息时,将该消息加入到list集合。然后会被传入到下一个处理器,进行对数据的处理。

d代码中其实主要是在处理拆包粘包问题。

客户端代码:

import com.weblog.netty.basic.server.NettyServer5;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;

public class NettyClient {
    public static void main(String[] args) throws Exception{
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 自定义编码器  将 MessageBody 类型的数据转成byte字节数据
                        pipeline.addLast(new MessageToByteEncoder<NettyServer5.MessageBody>(){
                            @Override
                            protected void encode(ChannelHandlerContext channelHandlerContext, NettyServer5.MessageBody messageBody, ByteBuf byteBuf) throws Exception {
                                byteBuf.writeByte(messageBody.type);
                                byteBuf.writeInt(messageBody.command);
                                byteBuf.writeInt(messageBody.messageLength);
                                byteBuf.writeBytes(messageBody.message.getBytes());
                            }
                        });
                    }
                });
        //服务器ip端口
        ChannelFuture future = bootstrap.connect("localhost", 8099).sync();
        Channel channel = future.channel();
        System.out.println("发送消息");

        String message = "这是消息体";

        NettyServer5.MessageBody messageBody = new NettyServer5.MessageBody();
        messageBody.type=1;
        messageBody.command=1;
        messageBody.messageLength=message.getBytes().length;
        messageBody.message = message;

        channel.writeAndFlush(messageBody);

        ChannelFuture channelFuture = channel.closeFuture();
        channelFuture.sync();
    }
}

客户端的代码比较简单,主要就是定义一个编码器,将数据对象编码成byte字节数据,进行数据传输。

最后

以上代码以及相应的解释都很简单,可以说完全没有涉及到重点。如果你想用netty写出优秀的代码,你必须要对netty的api比较了解,特别是在定义协议这一块,你需要非常了解ButeByfd类的api,ByteToMessageDecoder类对channelRead的方法的实现,以及对粘包、拆包问题的理解等,如果没有深入的理解,那么decode方法就不好实现,甚至会写出bug。

ButeByfdByteToMessageDecoder这两个类比较重要,而且内容比较多。所以后面我们再去探讨。

 

 


猜你喜欢
official 1158 UpdaterequestHTTP包建立连接,之后的通信全部使用websocket己的,就和http没啥关系了。有兴趣的同可以多了解下websocket报文的详细信息。Netty实现websoc
weblog 2829 代码:namespaceConsoleApplication3{//声明委托(委托方法的返回值为int,有两个int类型的形参)publicdelegateintdelegateTest
official 1453 之前的文章中提到过,单线程的nio模型任然有缺点。在上节《netty(7)netty的线程模型》中也提到,netty的出现,封装了nio复杂的代码,并且介入多线程来处理事件,最大限度的提
java基础 4105 ,是对数据传输的总称或抽象。它的特性是进行数据传输。所以io流不仅仅用于文件内容的传输。接下来演示输入输出流并利用流的概念复制个字符串,将字符串s中的内容流向s2。importjava.io.
weblog 2524 个插件.js(function(){ //需要向外暴露的插件对象 constMyPlugin={}; MyPlugin.install=function(Vue,options){ //1
mqtt协议 1590 、概述二、设计规范三、主要特性四、原理mqtt客户端mqtt服务器、概述  MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输),是种基于发布
算法基础,linux 4642 问题描述springboot项目在跨域名调用接口,并且需要传的请求头时报错:AccesstoXMLHttpRequestat'http://ydatestapi.libawall.com
mqtt协议 1975 (Fixedheader)和有效载荷(Payload)之间。每个的可变报头都不样。其中大多数都会有的字段报文标识符。  具体可变报文头的内容以及有效负载和控制报文类型的关系,下面会有详细描述。四、有效载荷
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。