一起学netty(19)心跳机制的概念和 IdleStateHandler 原理分析
什么是心跳?
顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性。
为什么需要心跳?
因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle
状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG
交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.
如何实现心跳?
我们可以通过两种方式实现心跳机制:
- 1.使用 TCP 协议层面的 keepalive 机制.
- 2.在应用层上实现自定义的心跳机制.
虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:
- 1.它不是 TCP 的标准协议, 并且是默认关闭的.
- 2.TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
- 3.TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.
虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳.
Netty应用层自定义心跳机制
服务端代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyServerHeartBeat {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boot = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boot,work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("创建连接中...");
ChannelPipeline pipeline = socketChannel.pipeline();
// 5秒没有读写操作,则触发userEventTriggered方法
pipeline.addLast(new IdleStateHandler(0,0,5, TimeUnit.SECONDS));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new SimpleChannelInboundHandler<String>() {
/**
* 读数据时触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("消息:"+msg);
}
/**
* 读、写、读写超时时触发
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent)evt;
IdleState state = event.state();
switch (state){
case READER_IDLE:
System.out.println("读空闲");
break;
case WRITER_IDLE:
System.out.println("写空闲");
break;
case ALL_IDLE:
System.out.println("读写空闲");
break;
}
// 5秒没有数据收发,则认为连接中断,服务器主动关闭连接
ctx.close();
Channel channel = ctx.channel();
channel.close();
}
});
}
});
ChannelFuture sync = serverBootstrap.bind(8099).sync();
sync.channel().closeFuture().sync();
}
}
客户端代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyClientHeartBeat {
private static SocketChannel socketChannel;
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boot =new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(boot).
channel(NioSocketChannel.class).
handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 4秒没有写数据就发送一次心跳,告知服务器我还活着
pipeline.addLast(new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent)evt;
IdleState state = event.state();
// 写超时
if(state==IdleState.WRITER_IDLE){
// 超时 发送心跳包
ctx.writeAndFlush("我还活着,别把我断开");
}
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 8099).sync();
socketChannel=(SocketChannel)future.channel();
future.channel().closeFuture().sync();
}
}
对于服务端的测试,可以直接在cmd
命令行窗口使用telnet
命令进行测试,例如:
telnet localhost 8099
回车
连接成功后窗口是没有内容的。
此时按 ctrl + ]
键进入telnet
的命令行窗口,可以输入send
命令向服务端发送字符数据。
注意:1.服务器设置了超过5秒不发送数据连接就断开了。2.输入telnet的时候ip和端口之间是没有冒号的。
上述代码只是简单的例子,具体使用还需要详细的设计。Protobuf可以是一种比较不错的解决方案。
IdleStateHandler实现原理
首先要知道的是,netty每次创建一个客户端通道,都会调用initChannel方法,该方法中为每个客户端设置处理器链,也就是ChannelPipeline。所以每个客户端都会有一个IdleStateHandler对象。
IdleStateHandler类的构造函数
构造函数:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
参数
- readerIdleTime 读超时时间
- writerIdleTime 写超时时间
- allIdleTime 读写超时时间
- unit 时间单位
channelActive方法
该在通道创建时调用,该方法中调用了initialize方法。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.initialize(ctx);
super.channelActive(ctx);
}
initialize方法
该方法在channelActive方法中被调用
如果设置的超时时间大于0,则内部会创建一个task任务。
304行是这样的,用当前时间减去最后一次channelRead
方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead
已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么316行则会触发userEventTriggered
方法。