Netty介绍
原生NIO存在的问题
- NIO的类库和API繁杂,使用麻烦,需要很熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
- 需要具备其他的额外技能:要熟悉Java多线程编程。因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的NIO程序
- 开发工作量和工作难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等
- JDK NIO的Bug:臭名昭著的Epoll Bug,它会导致Selector空轮询,最终导致CPU100%。该问题至今未被完全解决
概念
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。
Netty具备如下优点:
- 设计优雅,提供了阻塞和非阻塞的Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型
- 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存开销
- 提供安全传输特性
- 支持多种主流协议;预置多种编解码功能,支持用户开发私有协议
线程模型
线程模型基本介绍
不同的线程模型,对程序的性能有很大的影响。在学习Netty线程模型之前,我们需要了解一下各个线程模型的理念。最后通过比较来体会Netty线程模型的优越性。目前主流的线程模型如下:
- 传统阻塞I/O模型
- Reactor模型:分为单Reactor单线程、单Reactor多线程和主从Reactor多线程
传统阻塞I/O服务模型
采用阻塞I/O模式获取输入的数据,每个连接都需要独立的线程完成数据的输入、业务处理以及结果返回的工作:
该模式存在以下问题:
- 并发量很大时会创建大量线程,占用大量系统资源
- 连接创建之后,会阻塞在read操作处,如果长时间无数据交互会造成资源浪费
Reactor模型
Reactor模式就是通过一个或多个输入同时传递给服务处理器的模式,服务器端将多个请求分派到对应的处理线程去处理,因此Reactor模式也叫做Dispatcher模式。Reactor模式使用IO复用监听事件,收到事件后,份发给某个线程(或进程)去处理,从而提高并发量。
单Reactor单线程
该模式如图所示,所有的连接、读取、处理、响应的事件都在一个线程中去处理。
该模式的优点就是模型简单,没有多线程,就不涉及线程通信、数据竞争等等问题。
而缺点也和明显:只有一个线程在处理所有事情,事件只能一件一件顺序处理,容易导致性能瓶颈,也不能发挥多核CPU的性能优势;如果线程意外终止或者进入死循环,将导致整个系统变得不可用。
单Reactor多线程
该线程模型的事件处理步骤如下:
- Reactor对象通过Selector监控客户端的请求事件,收到事件之后通过Dispatcher进行分发
- 如果建立连接请求,则由Acceptor通过accept处理连接请求
- 如果非连接请求,则由Reactor分发调用连接对应的Handler来处理
- Handler只负责响应事件,不做具体的业务处理。读取数据后,分发给对应的worker线程池的某个线程去处理业务
- worker线程池会分配独立的线程完成真正的业务,并将结果返回给Handler
- Handler接到相应后回写给客户端
改模型的优点就是可以充分利用撮合CPU的并发处理能力。缺点是多线程数据共享和访问比较复杂,Reactor处理所有的时间的监听和响应,在单线程运行,在高并发场景下容易出现性能瓶颈。
主从Reactor多线程
该线程模型的事件处理流程如下:
- Reactor主线程MainReactor对象通过select监听客户端连接事件,收到事件后,通过Acceptor处理客户端连接事件
- 当Acceptor处理完客户端连接事件后,MainReactor将连接分配给SubReactor。
- SubReactor将连接加入到自己的连接队列进行监听,并创建Handler对各种事件进行处理
- 当连接上有新的事件发生时,SubReactor就会调用对应的Handler处理
- Handler只负责响应事件,不做具体的业务处理。读取数据后,分发给对应的worker线程池的某个线程去处理业务
- worker线程池会分配独立的线程完成真正的业务,并将结果返回给Handler
- Handler接到相应后回写给客户端
该模式的优点如下:
- MainReactor和SubReactor之间职责明确
- 多个SubReactor线程可以支持更大的并发量
该模式的缺点就是编程较为复杂。但是由于其优点明显,在许多项目中被广泛应用。包括Nginx、Memcached、Netty等。该模式也被叫做服务器的1+M+N线程模式,即使用该模式开发的服务器包含一个(或多个,1只是表示相对较少)连接建立的线程+M个IO线程+N个业务处理线程。该模式是业界成熟的服务程序设计模式。
Netty线程模型
该模型的事件处理流程如下:
- Netty抽象出两组线程池:BossGroup和WorkerGroup。每个线程池中都有NioEventLoop线程。BossGroup中的线程专门负责和客户端建立连接。WorkerGroup中的线程专门负责处理读写事件。BossGroup和WorkerGroup的类型都是NioEventGroup
- NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop
- NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
- NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
每个 BossNioEventLoop 中循环执行以下三个步骤:
- select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
- runAllTasks:再去以此循环处理任务队列中的其他任务
每个 WorkerNioEventLoop 中循环执行以下三个步骤:
- select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
- processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
- runAllTasks:再去以此循环处理任务队列中的其他任务
在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
核心API介绍
ChannelHandler及其实现类
ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具体的业务逻辑。API 关系如下图所示:
Netty开发中需要自定义一个Handler类去实现ChannelHandler接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑。一般需要重写的方法如下:
- public void channelActive(ChannelHandlerContext ctx),通道就绪事件
- public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
- public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件
ChannelPipeline
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链。
如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler。 InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行。
ChannelHandlerContext
该对象是事件处理器的上下文对象,Pipeline链中的实际处理节点。每个处理节点ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler。同时,ChannelHandlerContext中也绑定了对应的ChannelPipeline和Channel的信息,方便对ChannelHandler进行调用。常用方法如下:
- ChannelFuture close(),关闭通道
- ChannelOutboundInvoker flush(),刷新
- ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline中当前ChannelHandler的下一个ChannelHandler开始处理出站
ChannelOption
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:
- ChannelOption.SO_BACKLOG:对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。
- ChannelOption.SO_KEEPALIVE:一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试连接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
ChannelFuture
表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。常用方法如下所示:
- Channel channel():返回当前正在进行 IO 操作的通道
- ChannelFuture sync():等待异步操作执行完毕,将异步改为同步
EventLoopGroup和实现类NioEventLoopGroup
EventLoopGroup 是一组 EventLoop的抽象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
EventLoopGroup 提供next
接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例。BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。一般情况下我们都是用实现类NioEventLoopGroup。
常用方法如下:
- public NioEventLoopGroup():构造方法,创建线程组
- public Future shutdownGracefully():断开连接,关闭线程
ServerBootstrap和Bootstrap
ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置。Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下所示:
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup):该方法用于服务器端,用来设置两个 EventLoop
- public B group(EventLoopGroup group) :该方法用于客户端,用来设置一个 EventLoop
- public B channel(Class channelClass):该方法用来设置一个通道实现。一般服务器端是NioServerSocketChannel;客户端是NioSocketChannel
- public B option(ChannelOption option, T value):用来给 ServerChannel 添加配置,即设置parentGroup中Channel
- public ServerBootstrap childOption(ChannelOption childOption, T value):用来给接收到的读写事件通道添加配置,即设置childGroup中Channel
- public ServerBootstrap childHandler(ChannelHandler childHandler):该方法用来设置业务处理类(自定义的 handler)
- public ChannelFuture bind(int inetPort) :该方法用于服务器端,用来设置占用的端口号
- public ChannelFuture connect(String inetHost, int inetPort) :该方法用于客户端,用来连接服务器端
Unpooled类
这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:
- public static ByteBuf copiedBuffer(CharSequence string, Charset charset):通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
Netty异步模型
基本介绍
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果.。Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)。
Future和Future-Listener
Future表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口。ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器。
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。常用方法如下:
- sync 方法, 阻塞等待程序结果反回
- isDone 方法来判断当前操作是否完成
- isSuccess 方法来判断已完成的当前操作是否成功
- getCause 方法来获取已完成的当前操作失败的原因
- isCancelled 方法来判断已完成的当前操作是否被取消
- addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器
Future-Listener 机制就是给Future添加监听器,监听操作结果。下面是一个代码示例:
ChannelFuture future = bootstrap.bind(9999);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好
呀,我是Netty客户端", CharsetUtil.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});
Netty入门案例
Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.67.Final</version>
</dependency>
Netty服务端编写
服务端实现步骤:
- 创建bossGroup线程组: 处理网络事件--连接事件
- 创建workerGroup线程组: 处理网络事件--读写事件
- 创建服务端启动助手
- 设置bossGroup线程组和workerGroup线程组
- 设置服务端通道实现为NIO
- 参数设置
- 创建一个通道初始化对象
- 向pipeline中添加自定义业务处理handler
- 启动服务端并绑定端口,同时将异步改为同步
代码实现如下:
/**
* Netty服务器
*/
@Slf4j
public class NettyServer {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
private Integer port;
public NettyServer(Integer port) {
this.port = port;
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(20);
}
/**
* 启动服务区
*/
public void start() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerChannelHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 256)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
try {
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
log.info("The netty server startup success, listened on port : {}", this.port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭服务器
*/
public void close() {
if (this.bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(11000);
nettyServer.start();
}
}
自定义服务器端Handler:
@Slf4j
public class ServerChannelHandler implements ChannelInboundHandler {
private Map<Channel, String> channels = new ConcurrentHashMap<>(256);
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("There is a new connection from {}", socketAddress.toString().substring(1));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("The Client {} is offline", socketAddress.toString().substring(1));
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
ByteBuf byteBuf = (ByteBuf) o;
String receiveMsg = byteBuf.toString(CharsetUtil.UTF_8);
log.info("Receive msg from {}, the msg content is {}", socketAddress.toString().substring(1), receiveMsg);
channels.put(channelHandlerContext.channel(), receiveMsg);
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
String receiveMsg = channels.get(channelHandlerContext.channel());
String responseMsg = "From server:";
if ("quit".equalsIgnoreCase(receiveMsg)) {
responseMsg += "Bye Bye";
} else {
responseMsg += receiveMsg;
}
channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer(responseMsg, CharsetUtil.UTF_8));
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
channelHandlerContext.close();
}
}
Netty客户端编写
客户端实现步骤:
- 创建线程组
- 创建客户端启动助手
- 设置线程组
- 设置客户端通道实现为NIO
- 创建一个通道初始化对象
- 向pipeline中添加自定义业务处理handler
- 启动客户端,等待连接服务端,同时将异步改为同步
代码实现如下:
@Data
public class NettyClient {
private NioEventLoopGroup workerGroup;
private Integer port;
public NettyClient(Integer port) {
this.workerGroup = new NioEventLoopGroup();
this.port = port;
}
/**
* 启动客户端
*/
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(this.workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientChannelHandler(NettyClient.this));
}
});
try {
final ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", this.port)).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭客户端
*/
public void close() {
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient(11000);
nettyClient.start();
}
}
自定义客户端Handler如下:
@Slf4j
public class ClientChannelHandler implements ChannelInboundHandler {
private NettyClient nettyClient;
public ClientChannelHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
private BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("Connect to 127.0.0.1:{} with {}", nettyClient.getPort(), socketAddress.toString().substring(1));
String inputMsg = reader.readLine();
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(inputMsg, CharsetUtil.UTF_8));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
String responseMsg = byteBuf.toString(CharsetUtil.UTF_8);
log.info("{}", responseMsg);
if ("From Server:Bye Bye".equalsIgnoreCase(responseMsg)) {
nettyClient.close();
return;
}
String inputMsg = reader.readLine();
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(inputMsg, CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
}
}
关于Netty的基础知识就到这里,欢迎留言交流~~
文章评论