`
zb_86
  • 浏览: 43308 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

high performance http server writen by akka

    博客分类:
  • java
 
阅读更多

采用akka2.0 IO ByteString相关技术,代码改自http://doc.akka.io/docs/akka/2.0/scala/io.html,目前代码比较粗糙,但性能已经体现出来了。

 

话不多说,贴代码

 

 

/**
 * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.docs.io.v2

//#imports
import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress
//#imports

//#actor
class HttpServer2(port: Int) extends Actor {

  val state = new scala.collection.mutable.HashMap[IO.Handle, ActorRef]()

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {

    case IO.NewClient(server) ⇒
      val socket = server.accept()
      val worker = context.actorOf(Props(new Worker(socket)))
      state(socket) = worker
      state(socket) ! socket

    case IO.Read(socket, bytes) ⇒
      state(socket) ! IO.Read(socket, bytes)

    case IO.Closed(socket, cause) ⇒
      state(socket) ! IO.Closed(socket, cause)
      state -= socket

  }

}

class Worker(socket: IO.SocketHandle) extends Actor {

  val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    // state(socket) flatMap (_ ⇒ HttpServer2.processRequest(socket))
  }

  def receive = {

    case socket:IO.SocketHandle ⇒
      state(socket) flatMap (_ ⇒ HttpServer2.processRequest(socket))

    case IO.Read(socket, bytes) ⇒
      state(socket)(IO Chunk bytes)

    case IO.Closed(socket, cause) ⇒
      state(socket)(IO EOF None)
      state -= socket

  }

}

//#actor

//#actor-companion
object HttpServer2 {
  import HttpIteratees._

  def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
    IO repeat {
      for {
        request ← readRequest
      } yield {
        val rsp = request match {
          case Request("GET", "ping" :: Nil, _, _, headers, _) ⇒
            OKResponse(ByteString("<p>pong</p>"),
              request.headers.exists { case Header(n, v) ⇒ n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" })
          case req ⇒
            OKResponse(ByteString("<p>" + req.toString + "</p>"),
              request.headers.exists { case Header(n, v) ⇒ n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" })
        }
        socket write OKResponse.bytes(rsp).compact
        if (!rsp.keepAlive) socket.close()
      }
    }

}
//#actor-companion

//#request-class
case class Request(meth: String, path: List[String], query: Option[String], httpver: String, headers: List[Header], body: Option[ByteString])
case class Header(name: String, value: String)
//#request-class

//#constants
object HttpConstants {
  val SP = ByteString(" ")
  val HT = ByteString("\t")
  val CRLF = ByteString("\r\n")
  val COLON = ByteString(":")
  val PERCENT = ByteString("%")
  val PATH = ByteString("/")
  val QUERY = ByteString("?")
}
//#constants

//#read-request
object HttpIteratees {
  import HttpConstants._

  def readRequest =
    for {
      requestLine ← readRequestLine
      (meth, (path, query), httpver) = requestLine
      headers ← readHeaders
      body ← readBody(headers)
    } yield Request(meth, path, query, httpver, headers, body)
  //#read-request

  //#read-request-line
  def ascii(bytes: ByteString): String = bytes.decodeString("US-ASCII").trim

  def readRequestLine =
    for {
      meth ← IO takeUntil SP
      uri ← readRequestURI
      _ ← IO takeUntil SP // ignore the rest
      httpver ← IO takeUntil CRLF
    } yield (ascii(meth), uri, ascii(httpver))
  //#read-request-line

  //#read-request-uri
  def readRequestURI = IO peek 1 flatMap {
    case PATH ⇒
      for {
        path ← readPath
        query ← readQuery
      } yield (path, query)
    case _ ⇒ sys.error("Not Implemented")
  }
  //#read-request-uri

  //#read-path
  def readPath = {
    def step(segments: List[String]): IO.Iteratee[List[String]] = IO peek 1 flatMap {
      case PATH ⇒ IO drop 1 flatMap (_ ⇒ readUriPart(pathchar) flatMap (segment ⇒ step(segment :: segments)))
      case _ ⇒ segments match {
        case "" :: rest ⇒ IO Done rest.reverse
        case _          ⇒ IO Done segments.reverse
      }
    }
    step(Nil)
  }
  //#read-path

  //#read-query
  def readQuery: IO.Iteratee[Option[String]] = IO peek 1 flatMap {
    case QUERY ⇒ IO drop 1 flatMap (_ ⇒ readUriPart(querychar) map (Some(_)))
    case _     ⇒ IO Done None
  }
  //#read-query

  //#read-uri-part
  val alpha = Set.empty ++ ('a' to 'z') ++ ('A' to 'Z') map (_.toByte)
  val digit = Set.empty ++ ('0' to '9') map (_.toByte)
  val hexdigit = digit ++ (Set.empty ++ ('a' to 'f') ++ ('A' to 'F') map (_.toByte))
  val subdelim = Set('!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=') map (_.toByte)
  val pathchar = alpha ++ digit ++ subdelim ++ (Set(':', '@') map (_.toByte))
  val querychar = pathchar ++ (Set('/', '?') map (_.toByte))

  def readUriPart(allowed: Set[Byte]): IO.Iteratee[String] = for {
    str ← IO takeWhile allowed map ascii
    pchar ← IO peek 1 map (_ == PERCENT)
    all ← if (pchar) readPChar flatMap (ch ⇒ readUriPart(allowed) map (str + ch + _)) else IO Done str
  } yield all

  def readPChar = IO take 3 map {
    case Seq('%', rest @ _*) if rest forall hexdigit ⇒
      java.lang.Integer.parseInt(rest map (_.toChar) mkString, 16).toChar
  }
  //#read-uri-part

  //#read-headers
  def readHeaders = {
    def step(found: List[Header]): IO.Iteratee[List[Header]] = {
      IO peek 2 flatMap {
        case CRLF ⇒ IO takeUntil CRLF flatMap (_ ⇒ IO Done found)
        case _    ⇒ readHeader flatMap (header ⇒ step(header :: found))
      }
    }
    step(Nil)
  }

  def readHeader =
    for {
      name ← IO takeUntil COLON
      value ← IO takeUntil CRLF flatMap readMultiLineValue
    } yield Header(ascii(name), ascii(value))

  def readMultiLineValue(initial: ByteString): IO.Iteratee[ByteString] = IO peek 1 flatMap {
    case SP ⇒ IO takeUntil CRLF flatMap (bytes ⇒ readMultiLineValue(initial ++ bytes))
    case _  ⇒ IO Done initial
  }
  //#read-headers

  //#read-body
  def readBody(headers: List[Header]) =
    if (headers.exists(header ⇒ header.name == "Content-Length" || header.name == "Transfer-Encoding"))
      IO.takeAll map (Some(_))
    else
      IO Done None
  //#read-body
}

//#ok-response
object OKResponse {
  import HttpConstants.CRLF

  val okStatus = ByteString("HTTP/1.1 200 OK")
  val contentType = ByteString("Content-Type: text/html; charset=utf-8")
  val cacheControl = ByteString("Cache-Control: no-cache")
  val date = ByteString("Date: ")
  val server = ByteString("Server: Akka")
  val contentLength = ByteString("Content-Length: ")
  val connection = ByteString("Connection: ")
  val keepAlive = ByteString("Keep-Alive")
  val close = ByteString("Close")

  def bytes(rsp: OKResponse) = {
    new ByteStringBuilder ++=
      okStatus ++= CRLF ++=
      contentType ++= CRLF ++=
      cacheControl ++= CRLF ++=
      date ++= ByteString(new java.util.Date().toString) ++= CRLF ++=
      server ++= CRLF ++=
      contentLength ++= ByteString(rsp.body.length.toString) ++= CRLF ++=
      connection ++= (if (rsp.keepAlive) keepAlive else close) ++= CRLF ++= CRLF ++= rsp.body result
  }

}
case class OKResponse(body: ByteString, keepAlive: Boolean)
//#ok-response

//#main
object Main extends App {
  val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 8080
  val system = ActorSystem()
  val server = system.actorOf(Props(new HttpServer2(port)))
}
//#main
分享到:
评论
1 楼 jilen 2014-04-14  
能告诉我性能是怎么体现出来的么?

相关推荐

    echoServer(按行读取版本)

    在`echoServer`中,`readn`可能会用于读取客户端发送的每一行,而`writen`则用于将读取到的行原样返回给客户端。通过这两个函数,服务器可以确保准确地回显接收到的每一条消息,不论其长度如何。 总结来说,`echo...

    duilian_writen.py

    duilian_writen.py

    echoServer定长包与添加报头版本

    "echoServer定长包与添加报头版本"是一个这样的实践案例,它涉及到如何发送和接收具有固定长度前缀的报文,以及如何在报文头部添加额外的信息来增强协议的效率和可扩展性。 首先,我们要理解报文的基本结构。在...

    修复内存不能read和不能written的错误

    引用:Windows系统有时之所以会频繁受到损伤,主要是许多应用程序常常共享调用一些DLL文件,一旦有的应用程序在使用完毕被自动卸载掉后,这些应用程序所调用的DLL文件往往也会跟着被删除掉了,这么一来Windows系统或...

    自己做的基于单片机的密码锁设计(c语言设计)

    void writen_dat(uchar dat) //1602写数据 { lcdrs=1; P0=dat; delay(1); lcden=1; delay(1); lcden=0; } void main(); void cheak_mima() //检查第二次确认密码是否与第一次相同 { if(q1==w1) { if(q2==w2...

    嵌入式实时操作系统μCOS分析与实践PPT1-7

    嵌入式实时操作系统μCOS,全称Micro-Controller Operating System,是一款专为微控制器设计的、源码公开的实时操作系统。它具有小巧、高效、可移植性好、可配置性强等特点,广泛应用于各种嵌入式设备中。...

    SVMs with Various Feature Selection

    Combining SVMs with Various Feature Selection Strategies writen by 林智人

    Expert C Programming

    Deep C Secrets. Writen by Peter van der Linden.

    linux_socket_select_tcp_server_client基础知识.pdf

    本示例中,提供了两个源代码文件,`server.c` 和 `client.c`,分别代表TCP服务器和客户端,它们展示了如何使用socket API进行基本的通信。下面将详细解释这两个程序中的关键知识点。 1. **套接字创建**: - 服务器...

    基于qt5.8写的一个贝塞尔曲线例子非转存Bezier curve demo

    bezier curve writen by qt5.8 qt5.8 project of bezier, you can open the pro file and compile the project directly it is fully test and reliable. this bezier curve's t factor is 0.5f C++ coede of bezier...

    《编程的那些事儿》 文字版

    编程的那些事儿(090620) ———— Approach Programing in a abstract view using python Writen By Minlearn

    计算机毕业设计:python+爬虫

    豆瓣电影、书籍、小组、相册、东西等爬虫集 writen by Python. PS: 哎, 八个月后自己尝试设计了下爬虫框架, 感觉doubanspiders代码简直糟蹋了Scrapy, 阿弥陀佛! ###依赖服务 1. MongoDB ###依赖包 1. pip install...

    计算机网络(第四版)

    computer network (fouth edition) writen by Andrew S. Tanenbaum 这本书的封面比较有意思!

    Complex analysis(上册)

    This is a book writen by serger lang, you can find more interesting things in learning this course with this course.

    UNIX Network Programming Volume 1, Third Edition (Unix网络编程卷1第3版英文版)

    Protocol Usage by Common Internet Applications Section 2.14. Summary Exercises Part 2: Elementary Sockets Chapter 3. Sockets Introduction Section 3.1. Introduction Section 3.2. Socket...

    interupt-timer.rar_lpc timer_lpc2103 timer_timer lpc_vietnamese

    this document use for new student to study program LPC 2103 , it s writen by vietnamese languge . it show that how to understand about Timer and Interupt in chip LPC2103

    kafka-net.dll

    The wire protocol portion is based on the kafka-python library writen by David Arthur and the general class layout attempts to follow a similar pattern as his project. To that end, this project ...

    librtmp长时间直播socket连接断开的原因

    本文将深入探讨“librtmp长时间直播socket连接断开”的原因,并针对“扩展时间戳”和FMS(Flash Media Server)等相关标签提供详细解析。 首先,librtmp是开源的RTMP协议实现库,它允许开发者通过编程方式与服务器...

    Socket-类封装

     2)TCPClient类的send/receive方法使用了著名的writen/readn(来源UNP)实现, 解决了TCP的粘包问题.  3)TCPServer端添加了地址复用, 可以方便TCP服务器重启;  4)添加了异常类,让我们在编写易出错的代码时,可以解放...

Global site tag (gtag.js) - Google Analytics