`

Netty + WebSocket 简例

    博客分类:
  • Java
阅读更多

原文:《Netty入门之WebSocket初体验

Netty简述

Netty是一个事件驱动型、异步非阻塞的高性能 Java IO 框架。
它对 Java NIO 框架作了封装与改进,提供了良好的稳定性和伸缩性,可简化网络应用的开发。很多项目将其作为底层Socket通信的基础框架(如,广为人知的Dubbo)。

 

原生 Java NIO 框架的使用门槛比较高,需要对Java多线程编程、网络编程等相关知识非常熟悉。
异步、非阻塞、情况多变的网络状况等特殊性引出的半包读写、失败缓存、客户端断连重连、网络拥塞、异常状态/内容的处理非常繁琐困难,非业务性的编码工作量大
此外,Java NIO 框架各种类、API繁杂,学习成本也不小。

 

相对而言,Netty的使用比较容易。API简单,性能、可靠性、处理能力、可扩展性等各方法都提供了良好的支持,还避免了JDK NIO的bug。
Netty支持多种主流协议,预置了多种编解码功能,使用门槛较低

 

WebSocket简述

WebSocket是H5提出的一个协议规范。它是一种为解决客户端与服务端实时通信而产生的技术。其本质是一种基于TCP的协议。客户端先以 HTTP/HTTPS 协议发出一条特殊的HTTP请求,与服务端握手建立连接后,双方可进行通信。
在WebSocket出现之前,Web交互多是基于HTTP协议(有短连接,也有长连接)。
WebSocket的优势:

  • 节省通信开销
    以前实现“消息推送”/“即时通讯”都采用轮询机制。客户端会定期(如,每隔1秒)主动发起请求,从服务端拉取消息。可能实际需要的业务数据很少,而HTTP Header数据量占比较高,很耗服务器资源。
    在WebSocket模式下,连接建立后,双方通信的非业务数据占比较少。
  • 服务端主动推送数据给客户端
  • 实时通信。WebSocket连接建立后,双方可以相互推送信息,及时性比传统HTTP方式更高。

 

示例:一个简单的应答程序

网页客户界面示例

 

主要代码

引入对Netty的依赖(Maven)

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>5.0.0.Alpha2</version>
</dependency>

 

配置类

public class Config {
  public static final String Service_Host = "localhost";
  public static final int Service_Port = 6789;
  public static final String Service_Url = String.format(
      "ws://%s:%s/echo", Service_Host, Service_Port);
}

 

客户端连接(Channel)处理类

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

public class EchoChannelHandler extends ChannelInitializer {
  /**
   * 初始化客户端连接中的各组件
   */
  @Override
  protected void initChannel(SocketChannel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();
    pipeline.addLast("http-codec", new HttpServerCodec());
    pipeline.addLast("aggregator", new HttpObjectAggregator(4 * 1024));// 请求内容的最大长度:4KB
    pipeline.addLast("http-chunked", new ChunkedWriteHandler());
    pipeline.addLast("handler", new EchoChannelInboundHandler());
  }
}

 

消息内容处理类

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

public class EchoChannelInboundHandler extends SimpleChannelInboundHandler<Object> {

  private WebSocketServerHandshaker handshaker = null;
  /**
   * 用于格式化返回消息中的时间戳
   */
  private DateTimeFormatter dateTimeFormatter =
      DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ssZ").withZone(ZoneId.of("UTC"));

  /**
   * 处理客户端请求的核心方法
   */
  @Override
  protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof FullHttpRequest) {
      // 处理握手请求
      handleHandshakeRequest(ctx, (FullHttpRequest) msg);
    } else if (msg instanceof WebSocketFrame) {
      // 处理WebSocket消息(在握手请求之后)
      handleWebSocketFrame(ctx, (WebSocketFrame) msg);
    }
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    System.out.println(String.format("Client connected. [%s]", ctx.channel().remoteAddress()));
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    super.channelInactive(ctx);
    System.out.println(String.format("Client disconnected. [%s]", ctx.channel().remoteAddress()));
  }

  /**
   * 处理握手请求
   */
  private void handleHandshakeRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
    Channel channel = ctx.channel();

    if (req.decoderResult().isSuccess()
        && "websocket".equals(req.headers().get("Upgrade"))) {
      // 解码成功,且请求的协议是WebSocket
      WebSocketServerHandshakerFactory handshakerFactory
          = new WebSocketServerHandshakerFactory(Config.Service_URL, null, false);
      handshaker = handshakerFactory.newHandshaker(req);
    }

    if (null == handshaker) {
      // 握手器创建设失败
      WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);
    } else {
      // 握手
      handshaker.handshake(channel, req);
    }
  }

  /**
   * 处理WebSocket消息
   */
  private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    Channel channel = ctx.channel();

    if (frame instanceof CloseWebSocketFrame) {
      // 处理关闭WebSocket的指令
      frame.retain();
      handshaker.close(channel, (CloseWebSocketFrame) frame);
    } else if (frame instanceof TextWebSocketFrame) {
      // 此服务只处理文本消息,所以排除非文本消息
      // 返回应答消息
      String requestContent = ((TextWebSocketFrame) frame).text();
      System.out.println(
          String.format("Received [%s]: %s", channel.remoteAddress(), requestContent));
      String responseContent = String.format("%s %s: %s",
          dateTimeFormatter.format(ZonedDateTime.now()), channel.id(), requestContent);
      channel.writeAndFlush(new TextWebSocketFrame(responseContent));
    }
  }
}

 

开启服务的主方法

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public static void main(String[] args) throws Exception {
  EventLoopGroup acceptorGroup = new NioEventLoopGroup();
  EventLoopGroup clientGroup = new NioEventLoopGroup();
  ServerBootstrap bootstrap = new ServerBootstrap();
  bootstrap.group(acceptorGroup, clientGroup);
  bootstrap.channel(NioServerSocketChannel.class);
  bootstrap.childHandler(new EchoChannelHandler());

  try {
    ChannelFuture channelFuture = bootstrap.bind(Config.Service_Host, Config.Service_Port);
    Channel channel = channelFuture.sync().channel();
    System.out.println("Echo Service Started...");
    channel.closeFuture().sync();
  } catch (Exception e) {
    e.printStackTrace();
  } finally {
    // 优雅地退出程序
    acceptorGroup.shutdownGracefully();
    clientGroup.shutdownGracefully();
    System.out.println("Echo Service Stopped.");
  }
}

 

客户端网页

<html>
  <head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
    <title>[Echo] Client</title>
    <script type="text/javascript">
    var echoSocket;

    function initEchoSocket() {
      if (!window.WebSocket) {
        window.WebSocket = window.MozWebSocket;
      }

      if (!window.WebSocket) {
        var spStatus = document.getElementById("status");
        spStatus.innerText = "Your browser does not support WebSocket.";
      } else {
        echoSocket = new WebSocket("ws://localhost:6789/echo");
  
        echoSocket.onopen = function(event) {
          var spStatus = document.getElementById("status");
	      spStatus.innerText = "WebSocket is open. You can communicate with server now.\r\n";
        };
  
        echoSocket.onmessage = function(event) {
          var txtResponse = document.getElementById('responseContent');
	      txtResponse.value += event.data + "\r\n";
        };
  
        echoSocket.onclose = function(event) {
          var spStatus = document.getElementById("status");
	      spStatus.innerText = "WebSocket closed.";
        };
      }
    }

    function sendToServer() {
      if (echoSocket 
        && WebSocket.OPEN == echoSocket.readyState) {
    	message = document.getElementById("message").value;
    	echoSocket.send(message);
      }
    }

    function closeEchoSocket() {
      if (echoSocket
        && WebSocket.OPEN == echoSocket.readyState) {
	    echoSocket.close();
      }
    }

    initEchoSocket();
    </script>
  </head>
  <body>
    <h1>Echo Service Sample</h1>
    <h2>WebSocket Status</h2>
    <p id="status">uninitialized</p>
    <input type="button" value="Open WebSocket" onclick="initEchoSocket()"/>
    <input type="button" value="Close WebSocket" onclick="closeEchoSocket()"/>
	
    <h2>Send to Server</h2>
    <input type="text" id="message" value=""/>
    <input type="button" value="Send" onclick="sendToServer()"/>
	
    <h2>Echo from Server:</h2>
    <textarea id="responseContent" style="width:400px; height:300px;"></textarea>
  </body>
</html>
  • 大小: 17.4 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics