卡卷网
当前位置:卡卷网 / 每日看点 / 正文

Netty到底是个啥?

作者:卡卷网发布时间:2024-12-26 02:08浏览数量:82次评论数量:0次

一、简介

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端,例如协议服务器和客户端。它极大地简化了网络编程,例如TCP和UDP套接字服务器,将原本NIO繁琐的实现流程进行了封装,保留了NIO模型同步非阻塞的特点,提高了开发人员的效率。

二、代码实现

服务端:

//启动器,负责组装netty组件,启动服务器。 new ServerBootstrap() //包含BossEventLoop和WorkerEventLoop,一个selector加一个thread就是一个EventLoop,循环处理事件, //不同EventLoop所代表的事件类型也不同 .group(new NioEventLoopGroup()) //选择实现哪种类型的Channel,这里实现的是NIO模型 .channel(NioServerSocketChannel.class) //根据分类去执行不同的处理逻辑,boss代表连接事件,worker(child)代表处理读写事件,下面这个childHandler就决定了 //下面的workerHandler可执行哪些操作 .childHandler( //代表和客户端数据读写的通道 Initializer 初始化,负责添加别的handler。 new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { //添加具体的handler,这里一种解码器,将传过来的字节形式 ByteBuf 转换为字符串 ch.pipeline().addLast(new StringDecoder()); //自定义handler,实现自己定义的业务逻辑 ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override //此处的msg参数就代表上面传过来的ByteBuf解码之后的字符串 protected void channelRead(ChannelHandlerContext ctx, String msg) { //执行打印msg的业务 System.out.println(msg); } }); } }) //绑定监听端口 .bind(8080);

客户端:

//启动类 new Bootstrap() //添加EventLoop .group(new NioEventLoopGroup()) //将通道注册为客户端通道 .channel(NioSocketChannel.class) //添加处理器 .handler(new ChannelInitializer<Channel>() { @Override //在连接建立后被调用 protected void initChannel(Channel ch) { //添加编码器 ch.pipeline().addLast(new StringEncoder()); } }) //建立与服务端的连接 .connect("127.0.0.1", 8080) //Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 阻塞方法等待 connect 建立连接完毕 .sync() //获取 channel 对象,它即为通道抽象,可以进行数据读写操作 .channel() //写入消息并清空缓冲区 .writeAndFlush(new Date() + ": hello world!");

流程梳理:

  • Channel理解为数据的通道
  • 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline(流水线) 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • 把 handler 理解为数据的处理工序:
    • 工序有多道,合在一起就是 pipelinepipeline 负责发布事件(读、读取完成...)传播给每个 handlerhandler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
    • handlerInbound (入站)Outbound (出站)两类
  • eventLoop 理解为处理数据的工人:
    • 工人内部有selector多路复用器,可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
    • 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人

并且处理器方法(childHandler)是在建立连接之后才执行的,因为如果没有连接,整个业务流程就会等待在group,直到有连接请求才会继续向下执行。

childHandler与handler:

  • childHandler是子通道的处理器,用于处理每个客户端连接的事件。当一个新的连接被接受时,Netty 会为这个连接创建一个新的 Channel 实例,这个新 Channel 就是父 Channel 的子通道。childHandler 就是为这些子通道设置的处理器。
  • handler是主服务器通道的处理器,用于处理服务器端的事件,比如接收新的连接请求

三、核心组件

1.EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor(顺序事件执行器),
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

2.EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop
  • 可执行io事件,普通任务,定时任务。

新建一个EventLoopGroup时,如果未指定参数,则默认含有当前电脑cpu核心数 * 2的EventLoop

代码示例:

// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程 EventLoopGroup group = new NioEventLoopGroup(2); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next());

输出:

io.netty.channel.DefaultEventLoop@60f82f98 io.netty.channel.DefaultEventLoop@35f983a6 io.netty.channel.DefaultEventLoop@60f82f98

可以看到调用了三次next() ,第三次输出的与第一次输出的一样。

3.Channel

channel是netty中数据流动的通道,其中包含多个处理器,用来对传来的数据做处理。每个channel都与一个EventLoop做绑定,以后这个channel传来的数据都由它绑定的EventLoop来处理。每个channel只能绑定一个EventLoop,但一个EventLoop可以绑定多个channel

channel的主要作用:

  • close()可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

调用write()方法并不是立刻将消息发出,而是先存进缓冲区,满足特定条件后才能发出,例如调用flush()方法。

调用writeAndFlush()方法则是写入缓冲区并立刻从缓冲区发出。

4.ChannelFuture

客户端代码:

new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080) .sync() .channel() .writeAndFlush(new Date() + ": hello world!");

connect()方法的返回结果是一个ChannelFuture对象,它是个异步非阻塞方法,异步意味着调用connect()方法的线程与执行连接操作的线程不是同一个,非阻塞代表主线程调用connect()方法后继续向下执行,不会去等待连接结果。

因此,如果不去调用sync()方法去阻塞等待连接完成,很有可能会出现连接还未建立就执行发送消息的情况。也就是说sync()方法起到了一个同步的效果。

因此,自然也有方法可以异步去处理结果,调用addListener()方法,主线程将只负责接收连接,进行连接操作,连接操作完成后事件的处理等都交给子线程。

ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080); System.out.println(channelFuture.channel()); channelFuture.addListener((ChannelFutureListener) future -> { System.out.println(future.channel()); });

addListener()方法参数传一个回调函数,子线程建立连接处理完成后,回调函数将对事件的处理逻辑告诉子线程,子线程接着去执行。

在关闭资源时,也就是调用close()方法时,因为close()方法与connect()方法类似,都是异步处理,所以会出现资源还未关闭,主线程就开始执行资源关闭之后的业务了,解决这个问题也有同步与异步两种方法。

channel调用closeFuture()方法获取closeFuture对象

同步:

  • closeFuture对象调用sync()方法,阻塞等待资源彻底关闭。

异步:

  • closeFuture对象调用addListener()方法,哪个线程关闭了资源,哪个线程就执行关闭之后的业务。

关闭连接后,如果想让整个java程序停止,还需要关闭EventLoopGroup,group调用shutdownGracefully()方法关闭资源。

5.Handler & Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline。

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

每个 Channel 是类似一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。

在channel建立后,netty会自动添加两个handler:headtail,后续自己添加的handler都是在这两个handler之间。

ChannelInboundHandlerAdapter()ChannelOutboundHandlerAdapter() 执行顺序的区别:

服务端:

new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(1); ctx.fireChannelRead(msg); // 1 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(2); ctx.fireChannelRead(msg); // 2 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(3); ctx.channel().write(msg); // 3 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(4); ctx.write(msg, promise); // 4 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(5); ctx.write(msg, promise); // 5 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(6); ctx.write(msg, promise); // 6 } }); } }) .bind(8080);

客户端:

new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080) .addListener((ChannelFutureListener) future -> { future.channel().writeAndFlush("hello,world"); });

服务端打印:

1 2 3 6 5 4

可以看到,ChannelInboundHandlerAdapter 是按照 addLast顺序执行的,而ChannelOutboundHandlerAdapter是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表

想要进行handler间的传递,需要调用super.channelRead(ctx, msg)或者ctx.fireChannelRead(msg)用来将msg传到下一个handler。且ChannelInboundHandlerAdapter只能传递到最后一个ChannelInboundHandlerAdapter,到第一个ChannelOutboundHandlerAdapter就会断掉。

四、处理事件

1.执行普通任务

public class Demo03 { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); group.next().submit(() -> { //执行子线程业务逻辑 System.out.println("yes"); }); //执行主线程业务逻辑 System.out.println("no"); } }

EventLoopGroup调用submit()方法,将事件提交给group中的一个EventLoop,开启一个子线程去处理,相当于异步处理业务逻辑,主线程继续向下执行。想更直观的观察结果,可以让子线程休眠一秒。

2.执行定时任务

public class Demo04 { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); group.next().scheduleAtFixedRate(() -> { System.out.println("ok"); }, 0, 1, TimeUnit.SECONDS); } }

参数含义:"0" 代表延时0秒之后执行,也就是立刻执行;"1" 代表每隔一秒执行一次。

3.执行io事件

服务端

new ServerBootstrap() //注册两个循环事件组,一个只含一个EventLoop,负责主线程;另一个含两个EventLoop,负责子线程;目的是为了将建立连接和处理io事件分离 //主线程负责监听连接事件,子线程负责处理io事件,实现业务上的异步处理 .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) //通道注册为NIO通道,这个通道为父通道,用来监听连接事件,与BossEventLoop中的一个EventLoop绑定 .channel(NioServerSocketChannel.class) //配置和处理每个客户端连接的处理器 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override //当父通道监听到连接事件,连接成功建立后,都会为这个连接新建立一个子通道, //并且与workerEventLoopGroup中的一个EventLoop绑定 protected void initChannel(NioSocketChannel ch) { //为通道添加相关处理器 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null; if (byteBuf != null) { byte[] buf = new byte[16]; ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes()); log.debug(new String(buf)); } } }); } }).bind(8080).sync();

执行逻辑:

  • group()注册两个NioEventLoopGroup,一个是BossEventLoopGroup,只含一个EventLoop,负责主线程;另一个为WorkerEventLoopGroup,含两个EventLoop,负责子线程。目的是为了将建立连接和处理io事件分离,主线程负责监听连接事件,子线程负责处理io事件,实现业务上的异步处理
  • 调用channel(), 注册NIO通道,这个通道为父通道,用来监听连接事件,与BossEventLoop中的一个EventLoop绑定。
  • 配置和处理每个客户端连接的处理器
    • childHandler是子通道的处理器,用于处理客户端的事件。当一个新的连接被接受时,Netty 会为这个连接创建一个新的 Channel 实例,这个新 Channel 就是父 Channel 的子通道。childHandler 就是为这些子通道设置的处理器。
    • handler是主服务器通道的处理器,用于处理服务器端的事件,比如接收新的连接请求
  • 重写initChannel()方法, 当父通道监听到连接事件,连接成功建立后,都会为这个连接新建立一个子通道,并且与workerEventLoopGroup中的一个EventLoop绑定。
  • 之后就在子通道中添加处理器,对传进来的io事件进行处理

ctx.fireChannelRead(msg) 将信息传给下一个处理器。

客户端:

public static void main(String[] args) throws InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup(1)) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { System.out.println("init..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }) .channel(NioSocketChannel.class).connect("localhost", 8080) .sync() .channel(); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes())); Thread.sleep(2000); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

输出:

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu 22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu

可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定

问题:

这样写还是有一定问题的,一个EventLoop虽然可以绑定多个channel,但如果其中一个channel执行耗时过长,占用EventLoop太长时间,就会导致其他channel一直等待,降低整体代码效率。

解决:

另外实现一个DefaultEventLoopGroup,专门处理耗时较长的handler,addLast()方法里可以额外指定两个参数,一个是指定绑定的EventLootGroup名字,一个是这个handler的名字。这样就可以将这个处理器交给指定的EventLoopGroup去分配EventLoop资源。

4.handler执行中如何切换线程

如果不同handler使用的是不同的EventLoop,那事件在经过每个handler时线程是如何切换的,需要参考一个底层的关键代码。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { //next.pipeline.touch()会返回消息对象本身 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); //返回下一个handler的EventLoop EventExecutor executor = next.executor(); // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程 if (executor.inEventLoop()) { // 是,直接调用 next.invokeChannelRead(m); } else { // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人) executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }

所以,如果两个 handler 绑定的是同一个线程,那么就直接调用,否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用。

END

免责声明:本文由卡卷网编辑并发布,但不代表本站的观点和立场,只提供分享给大家。

卡卷网

卡卷网 主页 联系他吧

请记住:卡卷网 Www.Kajuan.Net

欢迎 发表评论:

请填写验证码