Rubin's Blog

  • 首页
  • 关于作者
  • 隐私政策
享受恬静与美好~~~
分享生活的点点滴滴~~~
  1. 首页
  2. Netty
  3. 正文

Netty进阶

2021年 9月 16日 874点热度 0人点赞 0条评论

Netty编解码器

在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据之后发送到客户端,客户端对数据进行解码。

对于Netty而言,编解码器由两部分组成:编码器和解码器:

  • 解码器:负责将消息从字节获取它序列形式转成指定的消息对象。负责处理入站的数据,本身就是一个InboundHandler
  • 编码器:将消息对象转成字节或其他序列形式在网络上传输。负责处理出站的数据,本身就是一个OutboundHandler

Netty的编解码器实现了ChannelHandlerAdapter,也是一种特殊的ChannelHandler,所以依赖于ChannelPipeline,可以将多个编解码器连接在一起,以实现复杂的转换逻辑。

解码器(Decoder)

解码器负责解码“入站”数据的格式转换。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder。

抽象解码器:

  • ByteToMessageDecoder:用于将字节转为消息,需要检查缓冲区是否有足够的字节
  • ReplayingDecoder:继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,同是不是所有的ByteBuf都支持。项目复杂度比较高推荐使用ReplayingDecoder,否则推荐使用ByteToMessageDecoder
  • MessageToMessageDecoder:用于从一种消息解码为另外一种消息(例如POJO到POJO)

核心方法是:

decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)

代码示例:

/* 消息解码-可以将字符串消息进行在进行解码. 只有消息入站时才会进行解码 */
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in,
 List<Object> out) throws Exception {
    System.out.println("正在进行消息解码");
    out.add(in.toString(CharsetUtil.UTF_8));
 }
}

通道读取方法:

/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
 Exception {
  System.out.println("客户端发送过来的消息:" + msg);
}

启动类:

protected void initChannel(SocketChannel ch) throws Exception {
  //8. 向pipeline中添加自定义业务处理handler
  ch.pipeline().addLast(new MessageDecoder());//添加解码器
  ch.pipeline().addLast(new NettyServerHandler());
}

编码器(Encoder)

与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器MessageToByteEncoder和MessageToMessageEncoder,二者都实现了ChannelOutboundHandler接口。

抽象编码器如下:

  • MessageToByteEncoder: 将消息转化成字节
  • MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)

核心方法:

encode(ChannelHandlerContext ctx, String msg, List<Object> out)

代码实现如下:

/* 编码器 */
public class MessageEncoder extends MessageToMessageEncoder<String> {
  @Override
  protected void encode(ChannelHandlerContext ctx, String msg,
 List<Object> out) throws Exception {
    System.out.println("消息进行消息编码");
    out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
   }
}

消息发送:

/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
  ChannelFuture future = ctx.writeAndFlush("你好呀.我是Netty客户端");
    future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception
 {
      if (future.isSuccess()) {
        System.out.println("数据发送成功!");
       } else {
        System.out.println("数据发送失败!");
       }
     }
   });
}

启动类:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
  //6. 向pipeline中添加自定义业务处理handler
  ch.pipeline().addLast(new MessageDecoder());//添加解码器
  ch.pipeline().addLast(new MessageEncoder());//添加编码器
  ch.pipeline().addLast(new NettyClientHandler());
}

编码解码器Codec

编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。

Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类。ByteToMessageCodec 、MessageToMessageCodec都继承与此类。

代码实现:

/**
* 编解码器
*/
public class MessageCoder extends MessageToMessageCodec {
  @Override
  protected void encode(ChannelHandlerContext ctx, Object msg, List out)
 throws Exception {
    System.out.println("正在进行消息编码");
    String str = (String) msg;
    out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
}
  @Override
  protected void decode(ChannelHandlerContext ctx, Object msg, List out)
 throws Exception {
    System.out.println("正在进行消息解码");
    ByteBuf byteBuf = (ByteBuf) msg;
    out.add(byteBuf.toString(CharsetUtil.UTF_8));
 }
}

启动类:

protected void initChannel(SocketChannel ch) throws Exception {
  //8. 向pipeline中添加自定义业务处理handler
  ch.pipeline().addLast(new MessageCoder());//添加编解码器
  ch.pipeline().addLast(new NettyServerHandler());
}

Netty中粘包和拆包的解决方案

粘包和拆包简介

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。

TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况:

服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包:

服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包:

如果D2的数据包比较大, 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包:

如果D1, D2的数据包都很大, 服务端分多次才能将D1和D2包接收完全,期间发生多次拆包:

TCP粘包和拆包产生的原因是数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

粘包和拆包的解决方法

业内解决方案

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:

  • 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
  • 将换行符作为消息结束符
  • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  • 通过在消息头中定义长度字段来标识消息的总长度

Netty中的粘包和拆包解决方案

Netty提供了4种解码器来解决,分别如下:

  • 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
  • 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
  • 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
  • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

代码示例:

LineBasedFrameDecoder解码器:

ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"\n", CharsetUtil.UTF_8));

DelimiterBasedFrameDecoder解码器:

ByteBuf byteBuf =
 Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"$",
 CharsetUtil.UTF_8));

Netty高级应用示例

群聊天室

首先定义服务端:

/**
 * 群聊服务器
 */
@Slf4j
public class ChatServer {

    private NioEventLoopGroup bossGroup;

    private NioEventLoopGroup workerGroup;

    private Integer port;

    public ChatServer(Integer port) {
        this.port = port;
        this.bossGroup = new NioEventLoopGroup(1);
        this.workerGroup = new NioEventLoopGroup(20);
    }

    /**
     * 启动服务器
     */
    public void start() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.bossGroup, this.workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChatServerChannelHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 256)
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
            log.info("The netty chat server startup success, listened on port : {}", this.port);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭服务器
     */
    public void close() {
        if (this.bossGroup != null) {
            this.bossGroup.shutdownGracefully();
        }
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        ChatServer chatServer = new ChatServer(11001);
        chatServer.start();
    }

}
@Slf4j
public class ChatServerChannelHandler implements ChannelInboundHandler {

    // 每次启动都会创建新的实例 故定义为静态
    private static List<Channel> channels = new ArrayList<>(256);

    @Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        log.info("客户端:{}上线!", channel.remoteAddress().toString().substring(1));
        this.channels.add(channel);
    }

    @Override
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        log.info("客户端:{}下线!", channel.remoteAddress().toString().substring(1));
        this.channels.remove(channel);
    }

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        final Channel currentChannel = channelHandlerContext.channel();
        // 只负责转发消息 编解码以及拆包粘包问题 客户端解决
        if (!this.channels.isEmpty() && this.channels.size() > 1) {
            this.channels.stream().filter(channel -> !channel.equals(currentChannel)).forEach(channel -> {
                ByteBuf byteBuf = (ByteBuf) o;
                String message = byteBuf.toString(CharsetUtil.UTF_8);
                message = currentChannel.remoteAddress().toString().substring(1) + ":" + message;
                channel.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
            });
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {

    }

}

由上我们可知,我们服务端只负责转发消息,不负责编解码的处理以及粘包拆包的处理。这样做的考虑是笔者认为拆包粘包的处理,服务端或者客户端一端处理即可,两端都处理有代码冗余。

客户端定义如下:

public class ChatClient {

    private NioEventLoopGroup workerGroup;

    private Integer port;

    public ChatClient(Integer port) {
        this.workerGroup = new NioEventLoopGroup();
        this.port = port;
    }

    /**
     * 启动客户端
     */
    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.workerGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        final ChannelPipeline pipeline = socketChannel.pipeline();
                        // 添加编解码器 通过自定义分隔符 处理拆包粘包的问题
                        pipeline.addLast(new ChatMessageEncoder());
                        pipeline.addLast(new DelimiterBasedFrameDecoder(102400, Unpooled.copiedBuffer("__,__", CharsetUtil.UTF_8)));
                        pipeline.addLast(new StringDecoder());
                        // 添加业务处理器
                        pipeline.addLast(new ChatClientChannelHandler());
                    }
                });
        try {
            final ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", this.port)).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭客户端
     */
    public void close() {
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        ChatClient nettyClient = new ChatClient(11001);
        nettyClient.start();
    }

}
@Slf4j
public class ChatClientChannelHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String message) throws Exception {
        log.info("{}", message);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        final Channel channel = ctx.channel();
        new InputTextHandler(channel).start();
    }
}

自定义编码器:

public class ChatMessageEncoder extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String message, List<Object> list) throws Exception {
        list.add(Unpooled.copiedBuffer(message + "__,__", CharsetUtil.UTF_8));
    }
}

有上我们可以看出,我们是用分隔符解码器来处理拆包粘包的问题,使用自定义编码器来给消息添加分隔符发送来处理发送消息编码问题。最后我们来定义一下文本输入的线程实现(这里读取控制台输入会阻塞线程,故开辟新线程去处理):

@AllArgsConstructor
public class InputTextHandler extends Thread {

    private Channel channel;

    @Override
    public void run() {
        super.run();
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            while (channel != null && channel.isOpen()) {
                String inputMsg = reader.readLine();
                channel.writeAndFlush(inputMsg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

HTTP服务器

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器(即不挂载外部项目,比如Nginx就是一个轻量级的HTTP非Web容器的服务器)的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。

服务端代码实现如下:

/**
 * Http服务器
 */
@Slf4j
public class HttpServer {

    private NioEventLoopGroup bossGroup;

    private NioEventLoopGroup workerGroup;

    private Integer port;

    public HttpServer(Integer port) {
        this.port = port;
        this.bossGroup = new NioEventLoopGroup(1);
        this.workerGroup = new NioEventLoopGroup(20);
    }

    /**
     * 启动服务器
     */
    public void start() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.bossGroup, this.workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new HttpServerCodec());
                        socketChannel.pipeline().addLast(new HttpServerChannelHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 256)
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
            log.info("The netty http server startup success, listened on port : {}", this.port);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭服务器
     */
    public void close() {
        if (this.bossGroup != null) {
            this.bossGroup.shutdownGracefully();
        }
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HttpServer httpServer = new HttpServer(8080);
        httpServer.start();
    }

}
@Slf4j
public class HttpServerChannelHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        // 判断是不是HTTP请求
        if (httpObject instanceof HttpRequest) {
            DefaultHttpRequest defaultHttpRequest = (DefaultHttpRequest) httpObject;
            log.info("get the http request with uri:{}", defaultHttpRequest.uri());
            ByteBuf byteBuf = Unpooled.copiedBuffer("hello from netty!", CharsetUtil.UTF_8);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
            channelHandlerContext.writeAndFlush(response);
        }
    }
}

WebSocket服务器

WebSocket简介

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

WebSocket和HTTP的区别

HTTP协议是用在应用层的协议,他是基于TCP协议的, HTTP 协议建立连接也必须要有三次握手才能发送信息。 HTTP 连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端要想实时获取服务端消息就得不断发送长连接到服务端。

WebSocket实现了多路复用,他是全双工通信。在WebSocket协议下服务端和客户端可以同时发送信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端。而且信息当中不必在带有head的部分信息。这种方式,不仅能降低服务器的压力,而且信息当中也减少了部分多余的信息。

服务器代码如下:

/**
 * WebSocket服务器
 */
@Slf4j
public class WebSocketServer {

    private NioEventLoopGroup bossGroup;

    private NioEventLoopGroup workerGroup;

    private Integer port;

    private String path;

    public WebSocketServer(Integer port, String path) {
        this.port = port;
        this.path = path;
        this.bossGroup = new NioEventLoopGroup(1);
        this.workerGroup = new NioEventLoopGroup(20);
    }

    /**
     * 启动服务器
     */
    public void start() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.bossGroup, this.workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 支持HTTP协议
                        socketChannel.pipeline().addLast(new HttpServerCodec());
                        // 支持大数据流
                        socketChannel.pipeline().addLast(new ChunkedWriteHandler());
                        // HttpObjectAggregator将多个信息(post请求分三部分. request line / request header / message body)转化成单一的request或者response对象
                        socketChannel.pipeline().addLast(new HttpObjectAggregator(102400));
                        // 将http协议升级为ws协议. websocket的支持
                        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler(path));
                        // 自定义Handler
                        socketChannel.pipeline().addLast(new WebSocketServerChannelHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 256)
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
            log.info("The netty websocket server startup success, listened on port : {}", this.port);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭服务器
     */
    public void close() {
        if (this.bossGroup != null) {
            this.bossGroup.shutdownGracefully();
        }
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        WebSocketServer webSocketServer = new WebSocketServer(8090, "/chat");
        webSocketServer.start();
    }

}
@Slf4j
public class WebSocketServerChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 由于该类不是单例 设置共享变量
    public static List<Channel> channels = new ArrayList<>();


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        Channel channel = ctx.channel();
        log.info("客户端:{}上线!", channel.remoteAddress().toString().substring(1));
        this.channels.add(channel);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        Channel channel = ctx.channel();
        log.info("客户端:{}下线!", channel.remoteAddress().toString().substring(1));
        this.channels.remove(channel);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        final Channel currentChannel = channelHandlerContext.channel();
        // 只负责转发消息 编解码以及拆包粘包问题 客户端解决
        if (!this.channels.isEmpty() && this.channels.size() > 1) {
            this.channels.stream().filter(channel -> !channel.equals(currentChannel)).forEach(channel -> {
                // 这一步很重要 一定要新建一个TextWebSocketFrame 否则消息推送无效
                channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
            });
        }
    }

}

我们可以按照以下代码定义一个简单的客户端连接我们的服务器去测试服务器是否可用:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>网页聊天室</title>
    <script src="http://code.jquery.com/jquery-2.1.1.min.js"></script>
</head>
<body>
<h1 id="chat-room-header"></h1>
<div>
    <ul id="message-list-content"></ul>
</div>
<div>
    <textarea name="input-text" id="input-text" cols="30" rows="10"></textarea>
    <button id="send-button">发送</button>
</div>
<script>
    $(function () {
        var page = {
            username: '',
            ws: undefined,
            init: function () {
                this.initHeader()
                this.initConnection()
                this.bindEvent()
                $('#input-text').focus()
            },
            initHeader: function () {
                while (true) {
                    this.username = prompt("请输入您的名字", "")
                    if (this.username.trim()) {
                        $('#chat-room-header').html(this.username + '的聊天室')
                        break
                    }
                }
            },
            initConnection: function () {
                var _this = this
                _this.ws = new WebSocket("ws://127.0.0.1:8090/chat");
                _this.ws.onopen = function () {
                    console.log("连接成功.")
                }
                _this.ws.onmessage = function (evt) {
                    _this.showMessage(evt.data);
                }
                _this.ws.onclose = function (){
                    console.log("连接关闭")
                }
                _this.ws.onerror = function (){
                    console.log("连接异常")
                }
            },
            bindEvent: function() {
                let _this = this
                $('#send-button').click(function () {
                    _this.sendMessage()
                })
                $('#input-text').keyup(function (e) {
                    e.stopPropagation()
                    if (e.keyCode === 13) {
                        _this.sendMessage()
                    }
                })
            },
            showMessage: function (message) {
                $('#message-list-content').append('<li>' + message + '</li>')
            },
            sendMessage: function () {
                let _this = this
                var text = $('#input-text').val()
                if (text) {
                    text = _this.username + ':' + text
                    _this.ws.send(text)
                    _this.showMessage(text)
                    $('#input-text').val('')
                    $('#input-text').focus()
                } else {
                    alert('不支持空内容发送')
                }
            }
        }
        page.init()
    })
</script>
</body>
</html>

本博文所有内容就到这里,欢迎留言交流~~

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: Netty
最后更新:2022年 6月 9日

RubinChu

一个快乐的小逗比~~~

打赏 点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
文章目录
  • Netty编解码器
    • 解码器(Decoder)
    • 编码器(Encoder)
    • 编码解码器Codec
  • Netty中粘包和拆包的解决方案
    • 粘包和拆包简介
    • 粘包和拆包的解决方法
      • 业内解决方案
      • Netty中的粘包和拆包解决方案
  • Netty高级应用示例
    • 群聊天室
    • HTTP服务器
    • WebSocket服务器
      • WebSocket简介
      • WebSocket和HTTP的区别
最新 热点 随机
最新 热点 随机
问题记录之Chrome设置屏蔽Https禁止调用Http行为 问题记录之Mac设置软链接 问题记录之JDK8连接MySQL数据库失败 面试系列之自我介绍 面试总结 算法思维
ZooKeeper之基本应用 JVM常用指令与可视化调优工具 Neo4j之CQL RabbitMQ之集群与运维 Kafka高级特性之主题 MyBatis之插件

COPYRIGHT © 2021 rubinchu.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

京ICP备19039146号-1