无可致疑,netty是java的网络通讯框架,支持高并发。本文扫描使用netty完成简单的http的能力,不涉及安全,业务过滤等内容。
片段1
/** * 启动http服务器 * @throws InterruptedException */ private void runHttpServer(final EventProducer evtProducer) throws InterruptedException { // 配置TCP服务器. EventLoopGroup bossGroup = new NioEventLoopGroup(ServerBootOption.Parent_EventLoopGroup_ThreadNum); EventLoopGroup workerGroup = new NioEventLoopGroup(ServerBootOption.Child_EventLoopGroup_ThreadNum); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 10240) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_LINGER, 0) //.childOption(ChannelOption.SO_TIMEOUT, ServerBootOption.SO_TIMEOUT) // .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("idleStateHandler", new IdleStateHandler(60, 60, 30));<span style="color:#FF0000;">//读信道空闲60s,写信道空闲60s,读,写信道空闲30s</span> p.addLast("http_server_codec", new HttpServerCodec());//<span style="color:#FF0000;">http消息转换</span> p.addLast("http_server_handler",new HttpProxyServerHandler(manager,evtProducer));//<span style="color:#FF0000;">消息处理器</span> } }); // Start the tcp server. ChannelFuture f = b.bind(new InetSocketAddress(ip, port)).sync();//<span style="color:#FF0000;">启动http服务进程</span> logger.info("start http server ok"); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. logger.info("Shut down all event loops to terminate all threads."); bossGroup.shutdownGracefully();//关闭服务进程 workerGroup.shutdownGracefully();//关闭服务进程 } }
片段2HttpProxyServerHandler
/** * * @author * http 请求事件处理器,负责分发http请求事件 */ public class HttpProxyServerHandler extends ChannelHandlerAdapter{ private static final Logger logger = Logger.getLogger(HttpProxyServerHandler.class); private SessionContextManager manager;<span style="color:#FF0000;">//会话管理</span> private final AttributeKey<Long> timestamp = AttributeKey.valueOf("timestamp"); private final StringBuilder buf = new StringBuilder(); public HttpProxyServerHandler(SessionContextManager manager){ this.manager=manager; } @Override public void handlerAdded(final ChannelHandlerContext ctx) throws Exception{ logger.info("["+ctx.channel().id().asLongText()+" ] is Added "); } @Override <span style="color:#FF0000;">会话关闭,失效事件</span> public void channelInactive(ChannelHandlerContext ctx) throws Exception { manager.getSession().remove(ctx.channel().id().asLongText()); super.channelInactive(ctx); } @Override <span style="color:#FF0000;">读消息</span><span style="color:#FF0000;">事件,业务处理的入口</span> public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("HttpProxyServerHandler[channelRead] "); Attribute<Long> attr_timestamp = ctx.channel().attr(timestamp); attr_timestamp.set(System.currentTimeMillis());//<span style="color:#FF0000;">测试信道中保存状态消息</span>,<span style="color:#FF0000;">如消息处理的开如时间</span> SessionContext sctx=new SessionContext(); sctx.setCtx(ctx); sctx.setChannelId(ctx.channel().id()); sctx.setSessionActiveTime(""+System.currentTimeMillis()); manager.getSession().put(sctx.getChannelId().asLongText(), sctx);//<span style="color:#FF0000;">manager.getSession() 为并发Map结构,用于会话保持</span> logger.info(sctx.getChannelId().asLongText()+" req time "+System.currentTimeMillis()); try { dispatchMeesage(ctx,manager ,msg); } finally { ReferenceCountUtil.release(msg); } } private QueryStringDecoder qdecoder = null; private String uri="";//http uri private String url="";//http url private Map<String,List<String>> httpReqParams=null;//http请求参数 /** * 消息分发 * @param ctx * @param manager * @param msg 消息 * @return * @throws Exception */ private String dispatchMeesage(final ChannelHandlerContext ctx, final SessionContextManager manager, final Object msg) throws Exception{ String decode_message = ""; HttpResponseUtils util = new HttpResponseUtils(); String result_code="1"; if (msg instanceof HttpRequest) {// http请求头 HttpRequest req = (HttpRequest) msg; url = req.getUri(); if (url.equals("/favicon.ico")) { ctx.close(); return "0"; } if(!url.equals("/billing")){ ctx.close(); return "0"; } //if (is100ContinueExpected(req)) { // ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); // } qdecoder = new QueryStringDecoder(url); httpReqParams = qdecoder.parameters(); uri = qdecoder.path(); /* TODO:身份认证 * if(qdecoder.parameters().containsKey("crendial")){ * crendial=(String * )qdecoder.parameters().get("crendial").get(0).toUpperCase(); } */ } else if (msg instanceof HttpContent) { // http请求体 HttpContent httpContent = (HttpContent) msg; ByteBuf content = httpContent.content(); if (content.isReadable()) { String chunk_message=content.toString(CharsetUtil.UTF_8); buf.append(chunk_message); } if (!(msg instanceof LastHttpContent)) {//<span style="color:#FF0000;">不是最后一个chunk包块,此项非常 重要,当http分多片传输时,需要将多块内容合并</span> return "0"; }else{ decode_message= buf.toString(); // logger.info(decode_message); } if (msg instanceof LastHttpContent) { //LastHttpContent trailer = (LastHttpContent) msg; String sessionId=ctx.channel().id().asLongText(); System.out.println("请求"+decode_message); System.out.println("请求参数"+httpReqParams); System.out.println("请求地址"+uri); System.out.println("会话Id"+sessionId); //<span style="color:#FF0000;">TODO:模拟发送请求消息给后端处理,可以放入消息队列,ctx对象进入会话保持(Map对象)中,等消息队列中处理完成后,恢复会话,并完成消息应答。</span> } } return result_code; } private static AtomicInteger counter = new AtomicInteger(0); @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { manager.getSession().remove(ctx.channel().id().asLongText()); cause.printStackTrace(); ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { logger.error("HttpProxyServerHandler[userEventTriggered] "); if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.ALL_IDLE) { logger.error("ALL_IDLE"); } else if (e.state() == IdleState.READER_IDLE){ logger.error("READER_IDLE"); }else if (e.state() == IdleState.WRITER_IDLE){ logger.error("WRITER_IDLE"); } ctx.close(); }else if(evt instanceof ErrorEvent){ logger.error(((ErrorEvent) evt).getErrorCode()); ctx.close(); } } }
SessionContext
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; /** * http请求会话 * @author wanghao * */ public class SessionContext { private ChannelId channelId =null ;/**会话 channel ID*/ private ChannelHandlerContext ctx;/**会话*/ private String sessionActiveTime;/**会话创建时间*/ private String sessionUnActiveTime;/**会话失效时间*/ public String getSessionActiveTime() { return sessionActiveTime; } public void setSessionActiveTime(String sessionActiveTime) { this.sessionActiveTime = sessionActiveTime; } public String getSessionUnActiveTime() { return sessionUnActiveTime; } public void setSessionunActiveTime(String sessionUnActiveTime) { this.sessionUnActiveTime = sessionUnActiveTime; } public ChannelHandlerContext getCtx() { return ctx; } public void setCtx(ChannelHandlerContext ctx) { this.ctx = ctx; } public ChannelId getChannelId() { return channelId; } public void setChannelId(ChannelId channelId) { this.channelId = channelId; } }
http请求会话管理器
import java.util.Map; /** * http请求会话管理器 * @author * */ public interface SessionContextManager { abstract Map<String,SessionContext> getSession(); }
相关推荐
Netty-SocketIO实现了这个协议,使得开发者可以轻松地在Netty上实现SocketIO的功能。 3. **ServerBootstrap类**:这是Netty中启动服务器的主要入口点。在API接口文档中,你会看到如何配置ServerBootstrap以设置各种...
这个“Netty-API-文档中文版”提供了详细的Netty API 使用指南,帮助开发者更容易理解和应用Netty,避免了语言障碍,使得中文环境下的开发工作更加便捷。 Netty 的核心特性包括: 1. **异步模型**:Netty 采用非...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议...通过深入学习和实践,你可以充分利用 Netty 的强大功能,构建出高效、可靠的网络应用,尤其在处理大数据和高并发场景时。
这个"Netty 3.2 API 中文版"提供了Netty 3.2版本的中文文档,使得开发者在理解和使用Netty时能够更方便地查阅相关API和功能。 Netty基于Java NIO(Non-blocking I/O,非阻塞I/O)库构建,旨在简化网络编程,提供一...
Netty 4.0.3 的 API 文档提供了该版本的详细接口说明,包括类、接口、方法等,这对于理解和使用 Netty 的各种组件和功能至关重要。开发者可以通过查阅这份文档,了解如何创建 Channel、配置 Pipeline、处理事件、...
总的来说,“netty-api-4.1 中文版”文档是学习和掌握 Netty 的重要参考资料,它详细介绍了各个类和接口的功能,以及如何使用它们来构建高效、可靠的网络应用。通过阅读和实践,开发者可以深入理解 Netty 的设计理念...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,...综上所述,这个“netty-3.7.0官方API所有jar包”为开发者提供了全面的学习资源,从理论到实践,帮助他们充分利用Netty的强大功能,构建高效稳定的网络应用。
综上所述,"springmvc+netty实现聊天功能"项目结合了Spring MVC的Web开发能力与Netty的网络通信优势,创建了一个能够处理实时聊天的系统。网页端代码负责用户界面和与服务器的通信,而Netty服务器则作为后台,处理...
在本文中,我们将深入探讨如何使用Spring Boot和Netty实现一个简单的一对一聊天应用程序。Spring Boot是Java领域中广泛使用的微服务框架,它简化了配置并提供了快速启动的应用程序开发体验。Netty则是一个高性能、...
- **在项目中的角色**:Spring Boot作为后端服务框架,负责处理HTTP请求,与数据库交互,提供RESTful API,实现温湿度数据的存储和查询。 2. **Netty**: - **简介**:Netty是一个高性能、异步事件驱动的网络应用...
Java + Netty 实现的高并发高可用MQTT服务broker,轻松支持10万并发(有群友实现了130万在线).zip 功能说明: 参考MQTT3.1.1规范实现 完整的QoS服务质量等级实现 遗嘱消息, 保留消息及消息分发重试 心跳机制 MQTT连接...
Netty 提供了一个事件驱动的网络应用框架,简化了网络编程,特别是 TCP、UDP 和 HTTP 等协议的实现。它使得开发者可以专注于业务逻辑,而无需深入底层网络编程的复杂性。 在 "netty-api-4.1中文.rar" 这个压缩包中...
标题"使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据"揭示了这个项目的核心内容:通过Netty接收TCP长连接的数据,并将这些数据存储到Kafka中,同时利用Kafka的批量消费功能对数据进行处理。下面我们将...
Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发服务器和客户端的网络应用,如TCP、UDP和HTTP等协议的通信。内网穿透工具,又称为NAT穿透,是一种允许内网设备通过公网进行通信的技术,这对于测试、...
11. **HTTP 协议支持**: Netty 提供了全面的 HTTP/1.x 和 HTTP/2 的支持,包括 WebSocket 和 RESTful 服务的实现。 12. **TCP 和 UDP 支持**: Netty 可以处理 TCP 和 UDP 协议,适用于各种网络通信需求。 13. **...
### Netty实现原理浅析 #### 一、总体结构 Netty是一款由JBoss推出的高效且功能丰富的Java NIO框架,旨在简化网络编程并提高性能。为了更好地理解Netty的工作原理,我们首先需要了解它的整体架构。Netty的总体结构...
总的来说,这个“Netty实现前置系统”项目涉及了网络编程、并发处理、连接管理、协议转换、错误恢复等多个核心知识点。通过Netty的高效框架,我们可以构建出稳定、可扩展的前置系统,以满足系统A和系统C之间的复杂...
它提供了丰富的API,使得开发者能更方便地构建TCP、UDP、HTTP、FTP等协议的服务器和客户端。在本项目中,我们将利用Netty来构建TCP服务器,处理DTU发送过来的数据。 1. **Netty基础架构**: Netty的核心组件包括...
本节将介绍如何使用Netty实现微信的核心功能,包括单聊和群聊。 #### 四、单聊功能实现 ##### 4.1 单聊流程分析 单聊流程主要包括以下步骤: 1. **连接建立**:用户A和用户B分别与服务器建立连接,并进行登录...