浏览 6513 次
锁定老帖子 主题:python 土拿渡 socket
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-11-29
最后修改:2010-12-03
下面和大家分享一下tornado 如何实现异步处理
下面的程序只能支持多线程的异步处理方式,扩展性一般,如果,可以改进为进程的方式扩展性会好很多 :) 之所以使用tornado 是因为,tornado 性能比较乐观, 具体比较请看下面的文章:http://programmingzen.com/2009/09/13/benchmarking-tornado-vs-twisted-web-vs-tornado-on-twisted/ Client 端代码 >>> import socket >>> d = socket.socket(socket.AF_INET, socket.SOCK_STREAM) >>> d.connect((HOST,PORT)) Server 端代码,需要Linux 服务器 #encoding = utf-8 import errno import functools import socket import time import os import traceback import memcache from tornado import ioloop, iostream from threading import Thread from Queue import Queue from config import * streamDict = {} streamRequests = {} ################################# Utils ############################### def stream_add(stream,count): """ 添加 Stream """ streamkey = str(stream) streamDict[streamkey] = (stream,0) streamRequests[streamkey] = Queue() def stream_remove(streamkey,stream): """ 删除 Stream """ try: stream.close() except: pass del streamDict[streamkey] del streamRequests[streamkey] ################################# Response ############################### class ClockResponse(Thread): """ 定时处理请求接口 """ def __init__ (self): """ 初始化 """ Thread.__init__(self) self.flag = True self.count = 0 def run(self): """ 运行线程 """ while self.flag: #打印 LOG,N次/打印 ct = self.count % 10 if ct == 0: print 'now connections:%s'%(len(streamDict)) self.count = ct + 1 #循环处理请求 for streamkey,(stream,num) in streamDict.items(): try: #这里返回Response queue = streamRequests[streamkey] print "queue.get()",queue.get() print "q.qsize()",queue.qsize() stream.write('haha') except: streamDict[streamkey] = (stream,num+1) #去死吧....... if num>DEAD_VALUE: stream_remove(streamkey,stream) time.sleep(SLEEP_TIME) def stop(self): self.flag = False ################################# Request ############################### class SocketRequest(object): """ Socket 请求 """ def __init__(self, stream, address): self.stream = stream self.streamkey = str(stream) self.address = address #开始读取数据 self.stream.read_until("\r\n", self.on_request) def on_request(self,data): """ 接收处理请求 """ #将数据加载到 Queue print "on_request" , data queue = streamRequests[self.streamkey] queue.put(data) #继续读取请求 self.stream.read_until("\r\n", self.on_request) def connection_ready(sock, fd, events): """ 启动 Connection 监听,处理程序 """ while True: try: connection, address = sock.accept() except socket.error, e: if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): raise return #non-block connection.setblocking(0) stream = iostream.IOStream(connection) #保存stream stream_add(stream,0) #接收Socket 请求 SocketRequest(stream,address) if __name__ == '__main__': #pidfile = open(os.path.join(settings.PROJECT_PATH,'tornadoServer.pid'),'w') #pidfile.write(str(os.getpid())) #pidfile.close() #接收请求 clock = ClockResponse() clock.start() #定义Socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) #这里的0 dummy protocol for TCP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(("", SERVER_PORT)) sock.listen(9999) #获得ioloop io_loop = ioloop.IOLoop.instance() callback = functools.partial(connection_ready, sock) io_loop.add_handler(sock.fileno(), callback, io_loop.READ) #启动服务 try: io_loop.start() except KeyboardInterrupt: io_loop.stop() clock.stop() print "exited cleanly" TCP/IP module for linux 2.6 dummy版 1、概述 英文单词dummy是虚假的意思,dummy版的TCP/IP module就是实现一个虚假的传输层协议,该协议不能完成任何实际的网络通讯。它通过module自身提供的一个经过改造的环回网络设备驱动程序mylo,展示网络数据在TCP/IP协议栈中发送和接收的整个流程。其意义在于为后续的实际的STREAM, DGRAM, RAW的开发提供一个基本的程序框架,以及初步验证这个框架的可行性,同时也帮助更好的理解2.6内核中整个TCP/IP协议栈的实现原理。 2、编译与安装 dummy版的module与第一版相比较,在模块划分和Makefile上作了很多改进。当前初步按TCP/IP协议栈的层次结构分为五个模块,下面是整个源代码目录树的基本情况: ipv4 顶层目录。 | |--inet inet域,主要完成inet域最顶层的一些基本操作。 | |--application 应用层,现在主要为一个dummy协议的测试应用程序。 | |--transport 传输层,目前只有一个dummy协议。 | |--network 网络层。 | |--datalink 数据链路层,目前只实现了环回网络设备驱动程序mylo。 | |--core 核心模块,连接数据链路层和网络层,传递它们之间的数据。 | |--include 头文件 | |--load.sh unload.sh 装载和卸载脚本 | |--Makfile env.mk makefile文件。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |