Netty功能实现:实现心跳检测

x33g5p2x  于2021-11-30 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(441)

netty实现心跳检测

检测逻辑:

1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。

需求设计:

1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接

服务端编写

  1. public class HreatBeatServer {
  2. public static void main(String[] args) {
  3. //创建两个Reactor 构建主从 Reactor 模型
  4. //用于处理 连接和读写事件 , 无限循环组(线程池)
  5. //管理 channel 监听事件
  6. EventLoopGroup bossGroup = new NioEventLoopGroup();
  7. EventLoopGroup workerGroup = new NioEventLoopGroup();
  8. // 我们需要一个服务端引导程序来开启服务端。
  9. ServerBootstrap serverBootstrap = new ServerBootstrap();
  10. //将主从 Reactor 入参,设置当前参数
  11. //这个方法返回的事对象本身,我们可以点出其他方法, 这种返回类型为对象自身 提供了 链式编程的方式
  12. serverBootstrap.group(bossGroup, workerGroup)
  13. //我们需要设置 channel 的 类型
  14. //对应的是 netty NIO BIO
  15. //NioServerSocketChannel <== ServerSocketChannel <== ServerSocket
  16. .channel(NioServerSocketChannel.class)
  17. //设置当前通道的处理器,使用Netty提供的日志打印处理器
  18. .handler(new LoggingHandler(LogLevel.INFO))
  19. //定义客户端连接处理的使用
  20. //此方法需要设置参数 ChannelInitializer 通道初始化器
  21. //初始化 要处理客户端 通道, 所以泛型设置为 SocketChannel
  22. //此类 为抽象类 需要实现其抽象方法 initchannel (alt+enter)快捷键
  23. .childHandler(new ChannelInitializer<SocketChannel>() {
  24. @Override
  25. protected void initChannel(SocketChannel socketChannel) throws Exception {
  26. //通过channel 获取管道 pipeline
  27. // 通道代表我们连接的角色, 管道代表处理业务得逻辑管理
  28. // 管道相当于 链表,可以将不同的处理器连接起来,管理处理器的顺序
  29. // 使用时 常常使用的事尾插法, addList 将加入到尾部
  30. ChannelPipeline pipeline = socketChannel.pipeline();
  31. pipeline.addLast(new StringDecoder());
  32. pipeline.addLast(new StringEncoder());
  33. /* * 使用心跳检测处理器 * 读空闲 写空闲 读写空闲 的超时时间 * 最后一个参数是 时间的单位 * IdleStateHandler发现有空闲的时候 会触发 IdleStateEvent时间 * 他会把事件推送给下一个 handler的指定方法 userEventTriggered 去处理 * */
  34. pipeline.addLast(new IdleStateHandler(5, 10, 20, TimeUnit.SECONDS));
  35. socketChannel.pipeline().addLast(new HreatBeatServerHandler());
  36. }
  37. });
  38. System.out.println("服务端初始化完成");
  39. // 设置并启动端口号,但需要使用sync 异步启动
  40. try {
  41. ChannelFuture future = serverBootstrap.bind(2020).sync();
  42. // 将关闭通道的方式也设置为异步的
  43. // 阻塞Finally中的代码执行
  44. future.channel().closeFuture().sync();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. } finally {
  48. // 优雅方式都关闭
  49. bossGroup.shutdownGracefully();
  50. workerGroup.shutdownGracefully();
  51. }
  52. }
  53. }

IdleStateHandler , 是netty提供的处理器

1)超过多长时间没有读 readerIdleTime
2) 超过多长时间没有写 writerIdleTime
3) 超过多长时间没有读和写 allIdleTime

底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。

处理器编写

  1. public class HreatBeatServerHandler extends SimpleChannelInboundHandler<String> {
  2. private int times;
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  5. if ("I am alive".equals(msg)) {
  6. ctx.writeAndFlush(Unpooled.copiedBuffer("over", CharsetUtil.UTF_8));
  7. }
  8. }
  9. //处理心跳检测事件的方法
  10. @Override
  11. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  12. IdleStateEvent event = (IdleStateEvent) evt;
  13. String eventDesc = null;
  14. switch (event.state()) {
  15. case READER_IDLE:
  16. eventDesc = "读空闲";
  17. break;
  18. case WRITER_IDLE:
  19. eventDesc = "写空闲";
  20. break;
  21. case ALL_IDLE:
  22. eventDesc = "读写空闲";
  23. break;
  24. }
  25. System.out.println(ctx.channel().remoteAddress() + "发生超时事件--" + eventDesc);
  26. times++;
  27. if (times > 3) {
  28. System.out.println("空闲次数超过三次 关闭连接");
  29. ctx.writeAndFlush("you are out");
  30. ctx.channel().close();
  31. }
  32. //super.userEventTriggered(ctx, evt);
  33. }
  34. }

其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类

客户端编写

客户端不断循环给服务端发消息确认存活的期间 线程睡眠 模拟失去心跳场景

  1. package com.hyc.netty.Hreatbeat;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import java.util.Random;
  10. public class HreatbeatClient {
  11. public static void main(String[] args) {
  12. //客户端只需要一个事件循环组
  13. EventLoopGroup group = new NioEventLoopGroup();
  14. // 客户端启动的对象
  15. Bootstrap bootstrap = new Bootstrap();
  16. bootstrap.group(group)
  17. .channel(NioSocketChannel.class)
  18. .handler(new ChannelInitializer<SocketChannel>() {
  19. @Override
  20. protected void initChannel(SocketChannel socketChannel) throws Exception {
  21. ChannelPipeline pipeline = socketChannel.pipeline();
  22. pipeline.addLast(new StringDecoder());
  23. pipeline.addLast(new StringEncoder());
  24. socketChannel.pipeline().addLast(new HreatbeatClientHandler());
  25. }
  26. });
  27. System.out.println("客户端初始化完成");
  28. try {
  29. ChannelFuture future = bootstrap.connect("127.0.0.1", 2020).sync();
  30. String data = "I am alive";
  31. while (future.channel().isActive()) {
  32. //模拟空闲状态
  33. int num = new Random().nextInt(10);
  34. Thread.sleep(num * 1000);
  35. future.channel().writeAndFlush(data);
  36. }
  37. //future.channel().closeFuture().sync();
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. } finally {
  41. group.shutdownGracefully();
  42. }
  43. }
  44. static class HreatbeatClientHandler extends SimpleChannelInboundHandler<String> {
  45. @Override
  46. protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
  47. System.out.println("server data:" + s);
  48. if ("you are out".equals(s)) {
  49. System.out.println("关闭");
  50. channelHandlerContext.channel().close();
  51. }
  52. }
  53. }
  54. }

客户端随机线程睡眠 一旦接受到 服务端返回的you are out代表空闲次数超过了 3次 则关闭客户端连接

相关文章