Netty到底是个啥?
作者:卡卷网发布时间:2024-12-26 02:08浏览数量:82次评论数量:0次
一、简介
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端,例如协议服务器和客户端。它极大地简化了网络编程,例如TCP和UDP套接字服务器,将原本NIO繁琐的实现流程进行了封装,保留了NIO模型同步非阻塞的特点,提高了开发人员的效率。
二、代码实现
服务端:
//启动器,负责组装netty组件,启动服务器。
new ServerBootstrap()
//包含BossEventLoop和WorkerEventLoop,一个selector加一个thread就是一个EventLoop,循环处理事件,
//不同EventLoop所代表的事件类型也不同
.group(new NioEventLoopGroup())
//选择实现哪种类型的Channel,这里实现的是NIO模型
.channel(NioServerSocketChannel.class)
//根据分类去执行不同的处理逻辑,boss代表连接事件,worker(child)代表处理读写事件,下面这个childHandler就决定了
//下面的workerHandler可执行哪些操作
.childHandler(
//代表和客户端数据读写的通道 Initializer 初始化,负责添加别的handler。
new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
//添加具体的handler,这里一种解码器,将传过来的字节形式 ByteBuf 转换为字符串
ch.pipeline().addLast(new StringDecoder());
//自定义handler,实现自己定义的业务逻辑
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
//此处的msg参数就代表上面传过来的ByteBuf解码之后的字符串
protected void channelRead(ChannelHandlerContext ctx, String msg) {
//执行打印msg的业务
System.out.println(msg);
}
});
}
})
//绑定监听端口
.bind(8080);
客户端:
//启动类
new Bootstrap()
//添加EventLoop
.group(new NioEventLoopGroup())
//将通道注册为客户端通道
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer<Channel>() {
@Override
//在连接建立后被调用
protected void initChannel(Channel ch) {
//添加编码器
ch.pipeline().addLast(new StringEncoder());
}
})
//建立与服务端的连接
.connect("127.0.0.1", 8080)
//Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 阻塞方法等待 connect 建立连接完毕
.sync()
//获取 channel 对象,它即为通道抽象,可以进行数据读写操作
.channel()
//写入消息并清空缓冲区
.writeAndFlush(new Date() + ": hello world!");
流程梳理:
- 把
Channel
理解为数据的通道 - 把 msg 理解为流动的数据,最开始输入是
ByteBuf
,但经过 pipeline(流水线) 的加工,会变成其它类型对象,最后输出又变成ByteBuf
- 把 handler 理解为数据的处理工序:
- 工序有多道,合在一起就是
pipeline
,pipeline
负责发布事件(读、读取完成...)传播给每个handler
,handler
对自己感兴趣的事件进行处理(重写了相应事件处理方法) handler
分Inbound
(入站)和Outbound
(出站)两类- 把
eventLoop
理解为处理数据的工人: - 工人内部有
selector
多路复用器,可以管理多个channel
的 io 操作,并且一旦工人负责了某个channel
,就要负责到底(绑定) - 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照
pipeline
顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
并且处理器方法(childHandler
)是在建立连接之后才执行的,因为如果没有连接,整个业务流程就会等待在group
处,直到有连接请求才会继续向下执行。
childHandler与handler:
childHandler
是子通道的处理器,用于处理每个客户端连接的事件。当一个新的连接被接受时,Netty 会为这个连接创建一个新的Channel
实例,这个新Channel
就是父Channel
的子通道。childHandler
就是为这些子通道设置的处理器。handler
是主服务器通道的处理器,用于处理服务器端的事件,比如接收新的连接请求
三、核心组件
1.EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自
j.u.c.ScheduledExecutorService
因此包含了线程池中所有的方法 - 另一条线是继承自 netty 自己的
OrderedEventExecutor
(顺序事件执行器), - 提供了
boolean inEventLoop
(Thread thread) 方法判断一个线程是否属于此 EventLoop - 提供了
parent
方法来看看自己属于哪个 EventLoopGroup
2.EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
- 可执行io事件,普通任务,定时任务。
新建一个EventLoopGroup时,如果未指定参数,则默认含有当前电脑cpu核心数 * 2的EventLoop
代码示例:
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
EventLoopGroup group = new NioEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
输出:
io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98
可以看到调用了三次next()
,第三次输出的与第一次输出的一样。
3.Channel
channel
是netty中数据流动的通道,其中包含多个处理器,用来对传来的数据做处理。每个channel
都与一个EventLoop
做绑定,以后这个channel
传来的数据都由它绑定的EventLoop
来处理。每个channel
只能绑定一个EventLoop
,但一个EventLoop
可以绑定多个channel
。
channel的主要作用:
close()
可以用来关闭 channelcloseFuture()
用来处理 channel 的关闭sync
方法作用是同步等待 channel 关闭- 而
addListener
方法是异步等待 channel 关闭 pipeline()
方法添加处理器write()
方法将数据写入writeAndFlush()
方法将数据写入并刷出
调用write()
方法并不是立刻将消息发出,而是先存进缓冲区,满足特定条件后才能发出,例如调用flush()
方法。
调用writeAndFlush()
方法则是写入缓冲区并立刻从缓冲区发出。
4.ChannelFuture
客户端代码:
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.sync()
.channel()
.writeAndFlush(new Date() + ": hello world!");
connect()
方法的返回结果是一个ChannelFuture对象,它是个异步非阻塞方法,异步意味着调用connect()
方法的线程与执行连接操作的线程不是同一个,非阻塞代表主线程调用connect()
方法后继续向下执行,不会去等待连接结果。
因此,如果不去调用sync()
方法去阻塞等待连接完成,很有可能会出现连接还未建立就执行发送消息的情况。也就是说sync()方法起到了一个同步的效果。
因此,自然也有方法可以异步去处理结果,调用addListener()
方法,主线程将只负责接收连接,进行连接操作,连接操作完成后事件的处理等都交给子线程。
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel());
channelFuture.addListener((ChannelFutureListener) future -> {
System.out.println(future.channel());
});
addListener()方法参数传一个回调函数,子线程建立连接处理完成后,回调函数将对事件的处理逻辑告诉子线程,子线程接着去执行。
在关闭资源时,也就是调用close()方法时,因为close()方法与connect()方法类似,都是异步处理,所以会出现资源还未关闭,主线程就开始执行资源关闭之后的业务了,解决这个问题也有同步与异步两种方法。
channel调用closeFuture()
方法获取closeFuture对象
同步:
- closeFuture对象调用
sync()
方法,阻塞等待资源彻底关闭。
异步:
- closeFuture对象调用
addListener()
方法,哪个线程关闭了资源,哪个线程就执行关闭之后的业务。
关闭连接后,如果想让整个java程序停止,还需要关闭EventLoopGroup,group调用shutdownGracefully()
方法关闭资源。
5.Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline。
- 入站处理器通常是
ChannelInboundHandlerAdapter
的子类,主要用来读取客户端数据,写回结果 - 出站处理器通常是
ChannelOutboundHandlerAdapter
的子类,主要对写回结果进行加工
每个 Channel 是类似一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。
在channel建立后,netty会自动添加两个handler:head和tail,后续自己添加的handler都是在这两个handler之间。
ChannelInboundHandlerAdapter()
与ChannelOutboundHandlerAdapter()
执行顺序的区别:
服务端:
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
ctx.fireChannelRead(msg); // 1
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(2);
ctx.fireChannelRead(msg); // 2
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(3);
ctx.channel().write(msg); // 3
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(4);
ctx.write(msg, promise); // 4
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(5);
ctx.write(msg, promise); // 5
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(6);
ctx.write(msg, promise); // 6
}
});
}
})
.bind(8080);
客户端:
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.addListener((ChannelFutureListener) future -> {
future.channel().writeAndFlush("hello,world");
});
服务端打印:
1
2
3
6
5
4
可以看到,ChannelInboundHandlerAdapter
是按照 addLast
的顺序执行的,而ChannelOutboundHandlerAdapter
是按照 addLast 的逆序执行的。ChannelPipeline
的实现是一个 ChannelHandlerContext
(包装了 ChannelHandler
) 组成的双向链表。
想要进行handler间的传递,需要调用super.channelRead(ctx, msg)
或者ctx.fireChannelRead(msg)
用来将msg传到下一个handler。且ChannelInboundHandlerAdapter
只能传递到最后一个ChannelInboundHandlerAdapter
,到第一个ChannelOutboundHandlerAdapter
就会断掉。
四、处理事件
1.执行普通任务
public class Demo03 {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
group.next().submit(() -> {
//执行子线程业务逻辑
System.out.println("yes");
});
//执行主线程业务逻辑
System.out.println("no");
}
}
EventLoopGroup调用submit()
方法,将事件提交给group中的一个EventLoop,开启一个子线程去处理,相当于异步处理业务逻辑,主线程继续向下执行。想更直观的观察结果,可以让子线程休眠一秒。
2.执行定时任务
public class Demo04 {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
group.next().scheduleAtFixedRate(() -> {
System.out.println("ok");
}, 0, 1, TimeUnit.SECONDS);
}
}
参数含义:"0" 代表延时0秒之后执行,也就是立刻执行;"1" 代表每隔一秒执行一次。
3.执行io事件
服务端
new ServerBootstrap()
//注册两个循环事件组,一个只含一个EventLoop,负责主线程;另一个含两个EventLoop,负责子线程;目的是为了将建立连接和处理io事件分离
//主线程负责监听连接事件,子线程负责处理io事件,实现业务上的异步处理
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
//通道注册为NIO通道,这个通道为父通道,用来监听连接事件,与BossEventLoop中的一个EventLoop绑定
.channel(NioServerSocketChannel.class)
//配置和处理每个客户端连接的处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
//当父通道监听到连接事件,连接成功建立后,都会为这个连接新建立一个子通道,
//并且与workerEventLoopGroup中的一个EventLoop绑定
protected void initChannel(NioSocketChannel ch) {
//为通道添加相关处理器
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();
执行逻辑:
group()
注册两个NioEventLoopGroup,一个是BossEventLoopGroup,只含一个EventLoop,负责主线程;另一个为WorkerEventLoopGroup,含两个EventLoop,负责子线程。目的是为了将建立连接和处理io事件分离,主线程负责监听连接事件,子线程负责处理io事件,实现业务上的异步处理。- 调用
channel()
, 注册NIO通道,这个通道为父通道,用来监听连接事件,与BossEventLoop中的一个EventLoop绑定。 - 配置和处理每个客户端连接的处理器
childHandler
是子通道的处理器,用于处理客户端的事件。当一个新的连接被接受时,Netty 会为这个连接创建一个新的 Channel 实例,这个新 Channel 就是父 Channel 的子通道。childHandler 就是为这些子通道设置的处理器。handler
是主服务器通道的处理器,用于处理服务器端的事件,比如接收新的连接请求- 重写
initChannel()
方法, 当父通道监听到连接事件,连接成功建立后,都会为这个连接新建立一个子通道,并且与workerEventLoopGroup中的一个EventLoop绑定。 - 之后就在子通道中添加处理器,对传进来的io事件进行处理
ctx.fireChannelRead(msg)
将信息传给下一个处理器。
客户端:
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
输出:
22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
问题:
这样写还是有一定问题的,一个EventLoop虽然可以绑定多个channel,但如果其中一个channel执行耗时过长,占用EventLoop太长时间,就会导致其他channel一直等待,降低整体代码效率。
解决:
另外实现一个DefaultEventLoopGroup
,专门处理耗时较长的handler,addLast()
方法里可以额外指定两个参数,一个是指定绑定的EventLootGroup名字,一个是这个handler的名字。这样就可以将这个处理器交给指定的EventLoopGroup去分配EventLoop资源。
4.handler执行中如何切换线程
如果不同handler使用的是不同的EventLoop,那事件在经过每个handler时线程是如何切换的,需要参考一个底层的关键代码。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
//next.pipeline.touch()会返回消息对象本身
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
//返回下一个handler的EventLoop
EventExecutor executor = next.executor();
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
if (executor.inEventLoop()) {
// 是,直接调用
next.invokeChannelRead(m);
}
else {
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
所以,如果两个 handler 绑定的是同一个线程,那么就直接调用,否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用。
免责声明:本文由卡卷网编辑并发布,但不代表本站的观点和立场,只提供分享给大家。
- 上一篇:程序员现在接私活儿越来越难了为什么?
- 下一篇:有没有好看的个人博客的设计?
相关推荐

你 发表评论:
欢迎