Netty介绍
原生NIO存在的问题
- NIO的类库和API繁杂,使用麻烦,需要很熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
- 需要具备其他的额外技能:要熟悉Java多线程编程。因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的NIO程序
- 开发工作量和工作难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等
- JDK NIO的Bug:臭名昭著的Epoll Bug,它会导致Selector空轮询,最终导致CPU100%。该问题至今未被完全解决
概念
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
![](https://blog.rubinchu.com/wp-content/uploads/2021/09/1-9.png)
从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。
Netty具备如下优点:
- 设计优雅,提供了阻塞和非阻塞的Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型
- 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存开销
- 提供安全传输特性
- 支持多种主流协议;预置多种编解码功能,支持用户开发私有协议
线程模型
线程模型基本介绍
不同的线程模型,对程序的性能有很大的影响。在学习Netty线程模型之前,我们需要了解一下各个线程模型的理念。最后通过比较来体会Netty线程模型的优越性。目前主流的线程模型如下:
- 传统阻塞I/O模型
- Reactor模型:分为单Reactor单线程、单Reactor多线程和主从Reactor多线程
传统阻塞I/O服务模型
采用阻塞I/O模式获取输入的数据,每个连接都需要独立的线程完成数据的输入、业务处理以及结果返回的工作:
该模式存在以下问题:
- 并发量很大时会创建大量线程,占用大量系统资源
- 连接创建之后,会阻塞在read操作处,如果长时间无数据交互会造成资源浪费
Reactor模型
Reactor模式就是通过一个或多个输入同时传递给服务处理器的模式,服务器端将多个请求分派到对应的处理线程去处理,因此Reactor模式也叫做Dispatcher模式。Reactor模式使用IO复用监听事件,收到事件后,份发给某个线程(或进程)去处理,从而提高并发量。
单Reactor单线程
该模式如图所示,所有的连接、读取、处理、响应的事件都在一个线程中去处理。
该模式的优点就是模型简单,没有多线程,就不涉及线程通信、数据竞争等等问题。
而缺点也和明显:只有一个线程在处理所有事情,事件只能一件一件顺序处理,容易导致性能瓶颈,也不能发挥多核CPU的性能优势;如果线程意外终止或者进入死循环,将导致整个系统变得不可用。
单Reactor多线程
该线程模型的事件处理步骤如下:
- Reactor对象通过Selector监控客户端的请求事件,收到事件之后通过Dispatcher进行分发
- 如果建立连接请求,则由Acceptor通过accept处理连接请求
- 如果非连接请求,则由Reactor分发调用连接对应的Handler来处理
- Handler只负责响应事件,不做具体的业务处理。读取数据后,分发给对应的worker线程池的某个线程去处理业务
- worker线程池会分配独立的线程完成真正的业务,并将结果返回给Handler
- Handler接到相应后回写给客户端
改模型的优点就是可以充分利用撮合CPU的并发处理能力。缺点是多线程数据共享和访问比较复杂,Reactor处理所有的时间的监听和响应,在单线程运行,在高并发场景下容易出现性能瓶颈。
主从Reactor多线程
该线程模型的事件处理流程如下:
- Reactor主线程MainReactor对象通过select监听客户端连接事件,收到事件后,通过Acceptor处理客户端连接事件
- 当Acceptor处理完客户端连接事件后,MainReactor将连接分配给SubReactor。
- SubReactor将连接加入到自己的连接队列进行监听,并创建Handler对各种事件进行处理
- 当连接上有新的事件发生时,SubReactor就会调用对应的Handler处理
- Handler只负责响应事件,不做具体的业务处理。读取数据后,分发给对应的worker线程池的某个线程去处理业务
- worker线程池会分配独立的线程完成真正的业务,并将结果返回给Handler
- Handler接到相应后回写给客户端
该模式的优点如下:
- MainReactor和SubReactor之间职责明确
- 多个SubReactor线程可以支持更大的并发量
该模式的缺点就是编程较为复杂。但是由于其优点明显,在许多项目中被广泛应用。包括Nginx、Memcached、Netty等。该模式也被叫做服务器的1+M+N线程模式,即使用该模式开发的服务器包含一个(或多个,1只是表示相对较少)连接建立的线程+M个IO线程+N个业务处理线程。该模式是业界成熟的服务程序设计模式。
Netty线程模型
该模型的事件处理流程如下:
- Netty抽象出两组线程池:BossGroup和WorkerGroup。每个线程池中都有NioEventLoop线程。BossGroup中的线程专门负责和客户端建立连接。WorkerGroup中的线程专门负责处理读写事件。BossGroup和WorkerGroup的类型都是NioEventGroup
- NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop
- NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
- NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
每个 BossNioEventLoop 中循环执行以下三个步骤:
- select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
- runAllTasks:再去以此循环处理任务队列中的其他任务
每个 WorkerNioEventLoop 中循环执行以下三个步骤:
- select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
- processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
- runAllTasks:再去以此循环处理任务队列中的其他任务
在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
核心API介绍
ChannelHandler及其实现类
ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具体的业务逻辑。API 关系如下图所示:
Netty开发中需要自定义一个Handler类去实现ChannelHandler接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑。一般需要重写的方法如下:
- public void channelActive(ChannelHandlerContext ctx),通道就绪事件
- public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
- public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件
ChannelPipeline
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链。
如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler。 InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行。
ChannelHandlerContext
该对象是事件处理器的上下文对象,Pipeline链中的实际处理节点。每个处理节点ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler。同时,ChannelHandlerContext中也绑定了对应的ChannelPipeline和Channel的信息,方便对ChannelHandler进行调用。常用方法如下:
- ChannelFuture close(),关闭通道
- ChannelOutboundInvoker flush(),刷新
- ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline中当前ChannelHandler的下一个ChannelHandler开始处理出站
ChannelOption
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:
- ChannelOption.SO_BACKLOG:对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。
- ChannelOption.SO_KEEPALIVE:一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试连接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
ChannelFuture
表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。常用方法如下所示:
- Channel channel():返回当前正在进行 IO 操作的通道
- ChannelFuture sync():等待异步操作执行完毕,将异步改为同步
EventLoopGroup和实现类NioEventLoopGroup
EventLoopGroup 是一组 EventLoop的抽象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
EventLoopGroup 提供next
接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例。BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。一般情况下我们都是用实现类NioEventLoopGroup。
常用方法如下:
- public NioEventLoopGroup():构造方法,创建线程组
- public Future shutdownGracefully():断开连接,关闭线程
ServerBootstrap和Bootstrap
ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置。Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下所示:
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup):该方法用于服务器端,用来设置两个 EventLoop
- public B group(EventLoopGroup group) :该方法用于客户端,用来设置一个 EventLoop
- public B channel(Class channelClass):该方法用来设置一个通道实现。一般服务器端是NioServerSocketChannel;客户端是NioSocketChannel
- public B option(ChannelOption option, T value):用来给 ServerChannel 添加配置,即设置parentGroup中Channel
- public ServerBootstrap childOption(ChannelOption childOption, T value):用来给接收到的读写事件通道添加配置,即设置childGroup中Channel
- public ServerBootstrap childHandler(ChannelHandler childHandler):该方法用来设置业务处理类(自定义的 handler)
- public ChannelFuture bind(int inetPort) :该方法用于服务器端,用来设置占用的端口号
- public ChannelFuture connect(String inetHost, int inetPort) :该方法用于客户端,用来连接服务器端
Unpooled类
这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:
- public static ByteBuf copiedBuffer(CharSequence string, Charset charset):通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
Netty异步模型
基本介绍
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果.。Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)。
Future和Future-Listener
Future表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口。ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器。
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。常用方法如下:
- sync 方法, 阻塞等待程序结果反回
- isDone 方法来判断当前操作是否完成
- isSuccess 方法来判断已完成的当前操作是否成功
- getCause 方法来获取已完成的当前操作失败的原因
- isCancelled 方法来判断已完成的当前操作是否被取消
- addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器
Future-Listener 机制就是给Future添加监听器,监听操作结果。下面是一个代码示例:
ChannelFuture future = bootstrap.bind(9999);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好
呀,我是Netty客户端", CharsetUtil.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});
Netty入门案例
Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.67.Final</version>
</dependency>
Netty服务端编写
服务端实现步骤:
- 创建bossGroup线程组: 处理网络事件--连接事件
- 创建workerGroup线程组: 处理网络事件--读写事件
- 创建服务端启动助手
- 设置bossGroup线程组和workerGroup线程组
- 设置服务端通道实现为NIO
- 参数设置
- 创建一个通道初始化对象
- 向pipeline中添加自定义业务处理handler
- 启动服务端并绑定端口,同时将异步改为同步
代码实现如下:
/**
* Netty服务器
*/
@Slf4j
public class NettyServer {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
private Integer port;
public NettyServer(Integer port) {
this.port = port;
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(20);
}
/**
* 启动服务区
*/
public void start() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerChannelHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 256)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
try {
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
log.info("The netty server startup success, listened on port : {}", this.port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭服务器
*/
public void close() {
if (this.bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(11000);
nettyServer.start();
}
}
自定义服务器端Handler:
@Slf4j
public class ServerChannelHandler implements ChannelInboundHandler {
private Map<Channel, String> channels = new ConcurrentHashMap<>(256);
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("There is a new connection from {}", socketAddress.toString().substring(1));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("The Client {} is offline", socketAddress.toString().substring(1));
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
ByteBuf byteBuf = (ByteBuf) o;
String receiveMsg = byteBuf.toString(CharsetUtil.UTF_8);
log.info("Receive msg from {}, the msg content is {}", socketAddress.toString().substring(1), receiveMsg);
channels.put(channelHandlerContext.channel(), receiveMsg);
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
String receiveMsg = channels.get(channelHandlerContext.channel());
String responseMsg = "From server:";
if ("quit".equalsIgnoreCase(receiveMsg)) {
responseMsg += "Bye Bye";
} else {
responseMsg += receiveMsg;
}
channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer(responseMsg, CharsetUtil.UTF_8));
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
channelHandlerContext.close();
}
}
Netty客户端编写
客户端实现步骤:
- 创建线程组
- 创建客户端启动助手
- 设置线程组
- 设置客户端通道实现为NIO
- 创建一个通道初始化对象
- 向pipeline中添加自定义业务处理handler
- 启动客户端,等待连接服务端,同时将异步改为同步
代码实现如下:
@Data
public class NettyClient {
private NioEventLoopGroup workerGroup;
private Integer port;
public NettyClient(Integer port) {
this.workerGroup = new NioEventLoopGroup();
this.port = port;
}
/**
* 启动客户端
*/
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(this.workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientChannelHandler(NettyClient.this));
}
});
try {
final ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", this.port)).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭客户端
*/
public void close() {
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient(11000);
nettyClient.start();
}
}
自定义客户端Handler如下:
@Slf4j
public class ClientChannelHandler implements ChannelInboundHandler {
private NettyClient nettyClient;
public ClientChannelHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
private BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("Connect to 127.0.0.1:{} with {}", nettyClient.getPort(), socketAddress.toString().substring(1));
String inputMsg = reader.readLine();
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(inputMsg, CharsetUtil.UTF_8));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
String responseMsg = byteBuf.toString(CharsetUtil.UTF_8);
log.info("{}", responseMsg);
if ("From Server:Bye Bye".equalsIgnoreCase(responseMsg)) {
nettyClient.close();
return;
}
String inputMsg = reader.readLine();
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(inputMsg, CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
}
}
关于Netty的基础知识就到这里,欢迎留言交流~~
文章评论