Netty源码的环境搭建还是很简单的。首先我们在github或者直接下载附件(已经修改了设置项并添加了测试demo,直接Maven安装即可)的源码包。下载之后解压进入源码目录,打开根目录下的pom.xml修改以下几个地方:
首先,修改编译的JDK版本(最低是8,按照自己本地的JDK版本进行修改)(下载附件的跳过此步骤):
禁用maven-checkstyle-plugin插件的检查 (下载附件的跳过此步骤) :
禁用xml-maven-plugin插件的检查 (下载附件的跳过此步骤) :
保存之后,执行以下命令编译打包:
mvn clean install -Dmaven.test.skip=true
编译时间长一些,编译完成之后用IDEA导入项目。在netty-examples模块下创建自己的测试文件夹,并编写以下测试脚本 (下载附件的跳过此步骤) :
服务器端:
/**
* Netty服务器
*/
public class NettyServer {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
private Integer port;
public NettyServer(Integer port) {
this.port = port;
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(20);
}
/**
* 启动服务区
*/
public void start() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerChannelHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 256)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
try {
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
log.info("The netty server startup success, listened on port : {}", this.port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭服务器
*/
public void close() {
if (this.bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(11000);
nettyServer.start();
}
}
public class ServerChannelHandler implements ChannelInboundHandler {
private static final Logger log = LoggerFactory.getLogger(ServerChannelHandler.class);
private Map<Channel, String> channels = new ConcurrentHashMap<Channel, String>(256);
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("There is a new connection from {}", socketAddress.toString().substring(1));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("The Client {} is offline", socketAddress.toString().substring(1));
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
ByteBuf byteBuf = (ByteBuf) o;
String receiveMsg = byteBuf.toString(CharsetUtil.UTF_8);
log.info("Receive msg from {}, the msg content is {}", socketAddress.toString().substring(1), receiveMsg);
channels.put(channelHandlerContext.channel(), receiveMsg);
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
String receiveMsg = channels.get(channelHandlerContext.channel());
String responseMsg = "From server:";
if ("quit".equalsIgnoreCase(receiveMsg)) {
responseMsg += "Bye Bye";
} else {
responseMsg += receiveMsg;
}
channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer(responseMsg, CharsetUtil.UTF_8));
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
channelHandlerContext.close();
}
}
客户端:
public class NettyClient {
private NioEventLoopGroup workerGroup;
private Integer port;
public Integer getPort() {
return port;
}
public NettyClient(Integer port) {
this.workerGroup = new NioEventLoopGroup();
this.port = port;
}
/**
* 启动客户端
*/
public void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(this.workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientChannelHandler(NettyClient.this));
}
});
try {
final ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", this.port)).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭客户端
*/
public void close() {
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient(11000);
nettyClient.start();
}
}
public class ClientChannelHandler implements ChannelInboundHandler {
private static final Logger log = LoggerFactory.getLogger(ClientChannelHandler.class);
private NettyClient nettyClient;
public ClientChannelHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
private BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
final SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
log.info("Connect to 127.0.0.1:{} with {}", nettyClient.getPort(), socketAddress.toString().substring(1));
String inputMsg = reader.readLine();
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(inputMsg, CharsetUtil.UTF_8));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
String responseMsg = byteBuf.toString(CharsetUtil.UTF_8);
log.info("{}", responseMsg);
if ("From Server:Bye Bye".equalsIgnoreCase(responseMsg)) {
nettyClient.close();
return;
}
String inputMsg = reader.readLine();
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(inputMsg, CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
}
}
编写完成后执行服务器的main方法启动,会发现源码中的一些测试类编译错误,类似于下图:
爆红部分注释掉即可。注释完之后重新运行服务器和客户端,都起来之后在客户端的控制台输入一些信息回车查看响应情况。输入quit查看是否可以自动退出客户端并断开连接。如果一些正常的话,Netty的源码环境就搭建完成了。
文章评论