一起学netty(16)自定义协议
什么是网络协议?
网络协议就是为计算机网络中进行数据交换而建立的规则、标准或约定的集合。有了网络协议才能实现不同设备、不同操作系统、不同软件之间的数据交换。
我们常说的tcp协议和udp协议其实是操作系统级别的底层协议(传输层)。很多的应用层的协议都是基于tcp协议或udp协议来实现的,比如FTP(文件传送协议)、Telnet(远程登录协议)、DNS(域名解析协议)、SMTP(邮件传送协议),POP3协议(邮局协议),HTTP协议(Hyper Text Transfer Protocol)。以及java开发中经常用到的协议比如dubbo协议,redis协议,websocket协议,mqtt协议等。
注意,本专题所说的netty可不是一个协议,netty是一个高性能的网络框架,再此框架的基础上能够帮助我们快速开发支持各种协议的网络应用。本节我们就讨论如何在netty框架上来开发一个自定义的协议。
netty中定义协议
刚才说了网络协议的概念,到了应用层其实就是对数据进行编码和解码的约定或规则。那么要字节定义协议,肯定要自己去定义这些规则。举个简单的例子:
/**
* 消息头的长度共9个字节
*/
public static class MessageBody{
// 消息类型 1字节
public byte type;
// 命令 4字节
public int command;
// 消息体长度 4字节
public int messageLength;
// 消息体(可以是其他自定义类型)
public String message;
@Override
public String toString() {
return "MessageBody{" +
"type=" + type +
", command=" + command +
", messageLength=" + messageLength +
", message='" + message + '\'' +
'}';
}
}
上述代码定义了一个完整消息的规范。可以把type、command、messageLength这三个字段看作消息头,message是消息体。其中messageLength代表消息体的长度,占4个字节,type消息类型1个字节,command四个字节,那么完整的消息头就是9个字节。
服务端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.net.InetSocketAddress;
import java.util.List;
public class NettyServer5 {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boot = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boot,work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 自定义解码器,将byte字节数据转成 MessageBody 类型,然后传入下一个处理器
pipeline.addLast(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 可读数据长度必须大于消息头的长度
int length = byteBuf.readableBytes();
if(length>9){
// 标记读指针
byteBuf.markReaderIndex();
byte type = byteBuf.readByte();
int commond = byteBuf.readInt();
int messageLength = byteBuf.readInt();
int len = byteBuf.readableBytes();
// 判断消息体是否完整
if(len>=messageLength){
byte[] bytes = new byte[messageLength];
byteBuf.readBytes(bytes);
String mesage = new String(bytes,"utf-8");
MessageBody body = new MessageBody();
body.message=mesage;
body.command=commond;
body.type=type;
body.messageLength=messageLength;
list.add(body);
// 读到一条完整的消息则返回,将消息交给下一个处理器处理
return;
}
// 没有读到一条完整的消息,则重置读指针
byteBuf.resetReaderIndex();
}
}
});
// 处理器 处理MessageBody类型的消息
pipeline.addLast(new SimpleChannelInboundHandler<MessageBody>(){
@Override
public void channelRead0(ChannelHandlerContext ctx, MessageBody body) throws Exception {
// 打印消息
System.out.println(body);
ctx.fireChannelActive();
}
});
}
});
ChannelFuture bind = bootstrap.bind(new InetSocketAddress(8099));
bind.sync();
}
/**
* 消息头的长度共9个字节
*/
public static class MessageBody{
// 消息类型 1字节
public byte type;
// 命令 4字节
public int command;
// 消息体长度 4字节
public int messageLength;
// 消息体(可以是其他自定义类型)
public String message;
@Override
public String toString() {
return "MessageBody{" +
"type=" + type +
", command=" + command +
", messageLength=" + messageLength +
", message='" + message + '\'' +
'}';
}
}
}
对于消息的解析,主要看解码器部分:
// 自定义解码器,将byte字节数据转成 MessageBody 类型,然后传入下一个处理器
pipeline.addLast(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 可读数据长度必须大于消息头的长度
int length = byteBuf.readableBytes();
if(length>9){
// 标记读指针
byteBuf.markReaderIndex();
byte type = byteBuf.readByte();
int commond = byteBuf.readInt();
int messageLength = byteBuf.readInt();
int len = byteBuf.readableBytes();
// 判断消息体是否完整
if(len>=messageLength){
byte[] bytes = new byte[messageLength];
byteBuf.readBytes(bytes);
String mesage = new String(bytes,"utf-8");
MessageBody body = new MessageBody();
body.message=mesage;
body.command=commond;
body.type=type;
body.messageLength=messageLength;
list.add(body);
// 读到一条完整的消息则返回,将消息交给下一个处理器处理
return;
}
// 没有读到一条完整的消息,则重置读指针
byteBuf.resetReaderIndex();
}
}
});
首先要解析消息头,因为只有解析完消息头才能确定消息体的长度。消息头一共9个字节的大小,所以先判断消息的长度是否大于9个字节,大于9个字节的时候再按照字节读取:
byte type = byteBuf.readByte();
int commond = byteBuf.readInt();
int messageLength = byteBuf.readInt();
int len = byteBuf.readableBytes();
接下来判断消息体的长度,判断byteBuf缓存中数据的长度,是否大于等于消息头中标记的数据长度即:len>=messageLength,返回true代表byteBuf中已包含完整的消息体数据,可以进行读取。否则说明消息体的数据还不完整,并重置读指针byteBuf.resetReaderIndex();,如果读取到一个完整的消息时,将该消息加入到list集合。然后会被传入到下一个处理器,进行对数据的处理。
d代码中其实主要是在处理拆包粘包问题。
客户端代码:
import com.weblog.netty.basic.server.NettyServer5;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 自定义编码器 将 MessageBody 类型的数据转成byte字节数据
pipeline.addLast(new MessageToByteEncoder<NettyServer5.MessageBody>(){
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, NettyServer5.MessageBody messageBody, ByteBuf byteBuf) throws Exception {
byteBuf.writeByte(messageBody.type);
byteBuf.writeInt(messageBody.command);
byteBuf.writeInt(messageBody.messageLength);
byteBuf.writeBytes(messageBody.message.getBytes());
}
});
}
});
//服务器ip端口
ChannelFuture future = bootstrap.connect("localhost", 8099).sync();
Channel channel = future.channel();
System.out.println("发送消息");
String message = "这是消息体";
NettyServer5.MessageBody messageBody = new NettyServer5.MessageBody();
messageBody.type=1;
messageBody.command=1;
messageBody.messageLength=message.getBytes().length;
messageBody.message = message;
channel.writeAndFlush(messageBody);
ChannelFuture channelFuture = channel.closeFuture();
channelFuture.sync();
}
}
客户端的代码比较简单,主要就是定义一个编码器,将数据对象编码成byte字节数据,进行数据传输。
最后
以上代码以及相应的解释都很简单,可以说完全没有涉及到重点。如果你想用netty写出优秀的代码,你必须要对netty的api比较了解,特别是在定义协议这一块,你需要非常了解ButeByfd类的api,ByteToMessageDecoder类对channelRead的方法的实现,以及对粘包、拆包问题的理解等,如果没有深入的理解,那么decode方法就不好实现,甚至会写出bug。
ButeByfd和ByteToMessageDecoder这两个类比较重要,而且内容比较多。所以后面我们再去探讨。