`

Twitter Kestrel如何使用Netty以及Netty scala压测代码

阅读更多

Twitter的核心队列Kestrel使用Netty作为通信模块,从另一个角度证明了Netty的性能和健壮。

Netty是否比MINA强?从底层实现,两者几乎差不多,但Netty的优势是从架构上采用事件通知机制,真正的将异步模式引入来解决各种场景。响应时间可能会加长,但优势在于系统之间的依赖减弱,自身处理能力的决定因素自封闭(瓶颈可以直接根据自身业务处理资源消耗情况估计出来)

 

我们看看Twitter是怎么用Netty。Twitter很多项目都是用scala写的,scala是很简洁的语言,直接运行在jvm上。可以直接调用Java类。下边的代码都是来自Twitter的核心队列项目Kestrel。这个项目很有意思,可能以后还会讨论,这里先说说怎么用Netty。


NettyHandler.scala是处理Netty网络事件的基类,其他具体协议实现类,MemcacheHandler和TextHandler都继承NettyHandler。NettyHandler应用Netty的ChannelUpStreamHandler接口,这个接口处理上行请求。同时继承KestrelHandler。KestrelHandler处理Kestrel消息队列的行为,包括getItem、setItem等等。



 

NettyHandler主要方法是handleUpstream。处理上行请求:MessageEvent,ChannelStatEvent,等等。这些实现基本上参照Netty官网给的sample很容易实现。方法不长,才40多行,用scala写出来,有点小清新:)

def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {
    event match {
      case m: MessageEvent =>
        // 具体实现由协议实现类MemcacheHandler等实现
        handle(m.getMessage().asInstanceOf[M])
      case e: ExceptionEvent =>
        // 异常处理
        e.getCause() match {
          case _: ProtocolError =>
            handleProtocolError()
          case e: ClosedChannelException =>
            finish()
          case e: IOException =>
            log.debug("I/O Exception on session %d: %s", sessionId, e.toString)
          case e =>
            log.error(e, "Exception caught on session %d: %s", sessionId, e.toString)
            handleException(e)
        }
        e.getChannel().close()
      case s: ChannelStateEvent =>
        // 目前状态为connected但statevent.getValue is null,中断连接
        if ((s.getState() == ChannelState.CONNECTED) && (s.getValue() eq null)) {
          finish()
        } else if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {
          // 创建连接
          channel = s.getChannel()
          remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
          if (clientTimeout.isDefined) {
            channel.getPipeline.addFirst("idle", new IdleStateHandler(Kestrel.kestrel.timer, 0, 0, clientTimeout.get.inSeconds.toInt))
          }
          channelGroup.add(channel)
          // don't use `remoteAddress.getHostName` because it may do a DNS lookup.
          log.debug("New session %d from %s:%d", sessionId, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)
        }
      case i: IdleStateEvent =>
        // 增加idel监控
        log.debug("Idle timeout on session %s", channel)
        channel.close()
      case e =>
        // 其他消息继续发出upstream事件
        context.sendUpstream(e)
    }
  }

 

MemcacheHandler和TextHandler是协议具体的实现。继承NettyHandler。因为Memcached协议比较简单,所以协议实现类就不多说了。阅读这些代码主要的障碍还是在于Java程序员对于某些scala的语法不习惯。我这里介绍个简单但是常用的:Scala的泛型。Scala创始人Martin Odersky曾说过,泛型正是他想要创建Scala语言的最重要因素之一。当然Java1.5以后已经引入了泛型,我们对这个东东已经很熟悉了。看看Twitter怎么使用Scala泛型。比教科书上生动很多。和Java使用<>指定泛型类似,NettyHandler中Scala的泛型M,放在[]里。

 

abstract class NettyHandler[M](

  val channelGroup: ChannelGroup,

  queueCollection: QueueCollection,

  maxOpenTransactions: Int,

  clientTimeout: Option[Duration])

extends KestrelHandler(queueCollection, maxOpenTransactions) with 

 

ChannelUpstreamHandler {

...

  def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {

    event match {

      case m: MessageEvent =>

        handle(m.getMessage().asInstanceOf[M])

  }

...

}

 

在NettyHandler中,任何MessageEvent都被转换为泛型M,并交给子类处理。TextHandler和MemcacheHandler是这样给自己的泛型定义的。

class TextHandler( ...) extends NettyHandler[TextRequest](...) 

class MemcacheHandler(...) extends NettyHandler[MemcacheRequest](...) 

 

接下来我们自己写一个Scala程序。

Netty服务器压测代码网上有不少版本,基本思路就是实现一个简单的echo handler。还可以添加了一个server主动push的部分。代码用scala实现,可以作为朋友们学习scala的例子。

 

 

import org.jboss.netty.channel._
import org.jboss.netty.buffer._
import org.jboss.netty.bootstrap.ServerBootstrap
import java.util._
import java.util.concurrent._
import java.io._
import java.net._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 
import scala.collection.mutable

object NettyLoadServer {
	def main(args: Array[String]): Unit = {
		val testServer = new NettyLoadServer();
		testServer.loadTest();
	}
}

class NettyLoadServer {
	var channel: Channel = null
	private var remoteAddress: InetSocketAddress = null
	val channels = new mutable.ListBuffer[Channel];
	var number = 0;
	
	class LoadTestHandler extends SimpleChannelHandler with ChannelUpstreamHandler {
        override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)
        {
            e.getCause().printStackTrace();
            channels -= e.getChannel()
            e.getChannel().close();
        }

        override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
            e.getChannel().write(e.getMessage());
        }
        
        override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {
            e match {
        		case s: ChannelStateEvent =>
                	if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {
                    	channel = s.getChannel()
                        remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
                        channels += channel                                  
                        System.out.println("New session from " + remoteAddress.getAddress.getHostAddress +
                        	":" + remoteAddress.getPort)
                        }
                case e =>
                	// ignore
            }

            super.handleUpstream(ctx, e);
        }
	}
  
	class ChannelManagerThread extends Thread { 
		override def run() { 
			while (true) { 
				try {
					System.out.println("channels.size() = " + channels.count(c => c.isInstanceOf[Channel]));
					
					for(s <- channels) {
						var cb = new DynamicChannelBuffer(256); 
						cb.writeBytes("abcd1234".getBytes()); 
						s.write(cb); 
					}
				
					Thread.sleep(500); 
				}
				catch { 
					case e => e.printStackTrace();
				} 
			} 
		} 
	} 

	
	def loadTest() {
		try {
			val factory = new NioServerSocketChannelFactory(Executors 
    		  .newCachedThreadPool(), Executors.newCachedThreadPool()); 
			val bootstrap = new ServerBootstrap(factory); 
			val handler = new LoadTestHandler(); 
			val pipeline = bootstrap.getPipeline(); 
			pipeline.addLast("loadtest", handler); 
			bootstrap.setOption("child.tcpNoDelay", true); 
			bootstrap.setOption("child.keepAlive", true); 
			bootstrap.bind(new InetSocketAddress(8007)); 
			
			val cmt = new ChannelManagerThread(); 
			cmt.start(); 
		} 
		catch {
			case e => e.printStackTrace();
		}
	}
}

附件里是我的scala sbt工程。

 

压测client推荐使用Jboss自己的Benchmark:

http://anonsvn.jboss.org/repos/netty/subproject/benchmark/

 

用ab也可以:

ab -n 20000 -c 20000 -k -t 999999999 -r http://192.168.1.2:8007/

 

补充:Twitter还有很多很有意思的项目,希望有兴趣的朋友一起来研究学习。

 

  • 大小: 10.5 KB
0
0
分享到:
评论

相关推荐

    在.NET 6.0上使用Kestrel配置和自定义HTTPS.doc

    1. 配置 Kestrel,我们可以在 CreateHostBuilder 方法中使用 UseKestrel 方法来配置 Kestrel。 2. 在 .NET Core 6.0 中,我们可以使用 var builder = WebApplication.CreateBuilder(args); builder.WebHost....

    ASP.NET Core Kestrel 中使用 HTTPS (SSL)

    在ASP.NET Core中,如果在Kestrel中想使用HTTPS对站点进行加密传输,可以按照如下方式  申请证书  这一步就不详细说了,有免费的和收费的,申请完成之后会给你一个*.pfx结尾的文件。  添加NuGet包  nuget中...

    征服 Kestrel

    【文件名称列表】:在提供的"kestrel"压缩包中,可能包含了Kestrel的源代码、示例项目、配置文件等。这些文件可以用来进一步学习和实践Kestrel的使用: 1. `kestrel.csproj`:项目的解决方案文件,包含了项目依赖和...

    征服 Kestrel + XMemcached

    在这里,我们将会探讨这两个技术的基本概念、它们在IT领域的应用以及如何将它们结合使用。 **Kestrel** Kestrel是由Twitter开发的消息队列系统,其设计目标是提供低延迟、高吞吐量的特性,特别是在处理大规模并发...

    Kestrel框架的使用demo

    这个项目演示了如何创建一个简单的Web应用并使用Kestrel作为服务器。运行步骤如下: 1. 解压`KestrelApp-master`压缩包。 2. 使用Visual Studio或命令行工具(如`dotnet run`)打开并运行项目。 3. 浏览器访问`...

    Kestrel持久化队列服务器

    - **持久化机制**: 包括如何将内存中的消息写入磁盘,以及如何在启动时加载持久化数据,这部分代码位于`src/storage`目录。 - **复制逻辑**: 主备复制的实现,可能在`src/replication`目录中。 ### 开发与测试 在`...

    征服 Kestrel + XMemcached + Spring TaskExecutor

    通过深入学习Kestrel的配置、管理和监控,掌握XMemcached的使用技巧,以及理解Spring TaskExecutor的线程池管理策略,开发者可以有效地优化应用程序的性能,同时保证系统的可伸缩性和可靠性。在实际项目中,可能还...

    Python库 | kestrel-lang-1.0.5.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:kestrel-lang-1.0.5.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Storm @Twitter-Slides.pdf

    《Storm @Twitter》是大数据流处理领域的经典之作,它由Twitter公司的工程师们提出,为实时数据流分析提供了一个强大的平台。这篇论文的原作PPT是学习Storm和流处理技术的重要资源。以下是对Storm核心概念和内部机制...

    web服务器KestrelHttpServer.zip

    在`KestrelHttpServer-master`压缩包中,包含了项目的源代码,开发者可以通过阅读和学习这些代码来了解Kestrel的内部工作原理,或者进行二次开发。通常,部署一个Kestrel服务器的步骤包括: 1. 创建一个新的ASP.NET...

    addlog-kestrel

    addlog-kestrel

    Python库 | kestrel_lang-1.1.0-py3-none-any.whl

    安装完成后,你就可以在你的Python代码中导入并使用`kestrel_lang`库了,例如: ```python import kestrel_lang # 接下来,你可以调用库提供的函数或类,具体取决于库的功能 result = kestrel_lang.some_function...

    ASP.NET完整项目源代码

    本压缩包包含的是一个完整的ASP.NET项目源代码集合,提供了十个不同的项目示例,这对于学习和理解ASP.NET的工作原理、开发流程以及最佳实践来说,是一个宝贵的资源。 1. **ASP.NET基础**:ASP.NET是.NET框架的一...

    kestrel.node:Node.js 的 Kestrel 客户端

    红隼节点Node.js 的 Kestrel 客户端安装 npm install kestrel.node用法 var Kestrel = require ( 'kestrel.node' ) ;var client = new Kestrel ( 'localhost:22133' ) ;// get can optionally take a timeout in ...

    .NET-KestrelHttpServer一个用于ASPNETCore的跨平台Web服务器

    5. **可配置性**:Kestrel允许开发者通过配置文件或代码来调整服务器的行为,如设置监听端口、限制连接数、控制日志级别等。 ### 使用场景 1. **开发环境**:在开发阶段,Kestrel可以直接作为主服务器,快速搭建...

    Kestrel封装成WindowServer.zip

    在一些开发过程中,会在局域网内搭建webapi服务作为移动端的服务接口使用,但是每次实施人员要到客户现场安装iis等工具,还有一些web的配置,非常繁琐,所以想着把webapi封装到WindowService中,可以通过自定义的...

    ASP.net示例代码

    示例代码可能涉及如何创建DbContext、定义实体类、执行CRUD操作以及使用LINQ查询。 最后,Web部署也是ASP.NET开发中的重要环节。可能的示例会展示如何使用Web Deploy将应用程序发布到IIS服务器,或者配置Azure...

    ASP.NET Core 因为 Nginx 配置 Connection 为 Upgrade 导致 Kestrel 返回 400

    ASP.NET Core 是微软推出的开源、跨平台的 web 开发框架,它允许开发者使用 C# 或者 F# 来构建高效、可移植的 web 应用。Nginx 是一款高性能的 HTTP 和反向代理服务器,常被用于部署 ASP.NET Core 应用,以提供负载...

    Kestrel:Kestrel是在I386上运行的实验内核-开源

    Kestrel是不是Unix或Windows的内核。

Global site tag (gtag.js) - Google Analytics