一起学netty(18)netty整合protobuf实现高性能数据传输

weblog 1247 0 0

之前的文章介绍了protobuf的概念参考:http://www.jiajiajia.club/blog/artical/351psy9r6l0g/464

以及protobuf的编码方式参考:http://www.jiajiajia.club/blog/artical/5puo1goygd3o/463

proto文件准备

Animal.proto

syntax = "proto3";
import "Dog.proto";
import "Cat.proto";
option java_package = "com.weblog.netty.proto.file";
option java_outer_classname="Animal";
message AnimalInfo{
 enum AnimalType {
   HEARTBEAT = 0;
   DOG = 1;
   CAT = 2;
 }
 AnimalType animalType = 1;
 oneof dataBody{
   DogInfo dog = 2;
   CatInfo cat = 3;
 }

Cat.proto

syntax = "proto3";
option java_package = "com.weblog.netty.proto.file";
option java_outer_classname="Cat";
message CatInfo{
 string catName = 1;
 int32 age = 2;
}

Dog.proto

syntax = "proto3";
option java_package = "com.weblog.netty.proto.file";
option java_outer_classname="Dog";
message DogInfo{
  string dogName = 1;
  int32 age = 2;
}

生成java类的命令 protoc -I=./ --java_out=./ ./Animal.proto,或参考之前的文章。

整体目录:

生成的代码太长了,就不贴出了来了。

服务端代码:

import com.weblog.netty.websocket.server.entity.Animal;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServerProtobuf {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bootGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup work = new NioEventLoopGroup(8);

        ServerBootstrap boot = new ServerBootstrap();
        boot.channel(NioServerSocketChannel.class)
                .group(bootGroup,work)
                .childHandler(new ChannelInitializer(){
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        // String编码器
                        pipeline.addLast(new StringEncoder());
                        // protobuf解码器
                        pipeline.addLast(new ProtobufDecoder(Animal.AnimalInfo.getDefaultInstance()));
                        // 自定义处理器
                        pipeline.addLast(new SimpleChannelInboundHandler<Animal.AnimalInfo>(){
                            @Override
                            public void channelRead0(ChannelHandlerContext ctx, Animal.AnimalInfo msg) throws Exception {
                                // 打印客户端发送的数据
                                System.out.println(msg);
                                // 服务器端向客户端回写数据,使用的是字符串类型的编码器
                                ctx.writeAndFlush("收到");
                                ctx.fireChannelActive();
                            }
                        });
                    }
                });
        ChannelFuture sync = boot.bind(8100).sync();
        sync.channel().closeFuture().sync();
    }
}

客户端代码

import com.weblog.netty.websocket.server.entity.Animal;
import com.weblog.netty.websocket.server.entity.Dog;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bootGroup = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(bootGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // protobuf编码器
                        pipeline.addLast(new ProtobufEncoder());
                        // String解码器
                        pipeline.addLast(new StringDecoder());
                        // 自定义处理器
                        pipeline.addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
                    }
                });

        ChannelFuture future = bootstrap.connect("localhost", 8100).sync();
        Channel channel = future.channel();

        // 构造需要发送的数据
        Animal.AnimalInfo.Builder animal = Animal.AnimalInfo.newBuilder();
        animal.setAnimalType(Animal.AnimalInfo.AnimalType.DOG);

        Dog.DogInfo.Builder dog = Dog.DogInfo.newBuilder();
        dog.setAge(1);
        dog.setDogName("小花");
        animal.setDog(dog);

        // 发送数据
        channel.writeAndFlush(animal);

        ChannelFuture future1 = channel.closeFuture().sync();

    }
}

客户端向服务端发送数据使用的是protobuf编码,服务端向客户端发送数据是字符串,所以客户端的编码器是ProtobufEncoder解码器是StringDecoder,服务端的解码器是ProtobufDecoder编码器是StringEncoder

服务端先启动,客户端启动后服务端打印:

animalType: DOG
dog {
  dogName: "\345\260\217\350\212\261"
  age: 1
}

客户端打印:

收到

猜你喜欢
official 1450   基于javanio+netty+websocket+protobuf+javascript等技术前后端的demo模型。  github地址:https
nginx,前端,java基础 1998   基于javanio+netty+websocket+protobuf+javascript等技术前后端的demo模型。  github地址:https
official 1157 UpdaterequestHTTP包建立连接,之后的通信全部使用websocket自己的协议,就和http没啥关系了。有兴趣的同可以多了解下websocket协议报文的详细信息。Nettywebsoc
official 1190 、什么是bio?bio即:同步阻塞式IO。就是统的javaio网络模型。javabio有两个阻塞的地方,第个地方是需要阻塞的监听端口,等待客户端链接。第二个需要阻塞Socket的read方法
official 1201 什么是网络协议?网络协议就是为计算机网络中进行交换而建立的规则、标准或约定的集。有了网络协议才不同设备、不同操作系统、不同软件之间的交换。我们常说的tcp协议和udp协议其是操作系
框架 2701 安装redis库参考:http://www.jiajiajia.club/blog/artical/166redis配置详解参考:http://www.jiajiajia.club/blog
official 1036 、epoll等各种多路复用器的出,我们通过原生的nio已经可以写出的程序。但是深入分析,我们还是可以从中找出很多问题:1.原生的nio服务端程序需要我们自己去解决客户端的连接、客户端收发,异常等问
official 929 ”越宽,其所的“最率”也越。 三、吞吐量  吞吐量(throughput)表示在单位时间内通过某个网络(或信道、接口)的际的量。吞吐量更经常地用于对世界中的网络的种测量,以便
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。