论坛首页 编程语言技术论坛

在Python3.0中处理web请求-封装wsgi

浏览 3551 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2009-05-26   最后修改:2009-05-26

闲来无聊,写了一些python3000的wsgi封装的代码,包括封装url匹配,客户请求参数,跳转,响应流的封装等,少说废话。上代码:

# -*- coding: utf-8 -*-

import socketserver, re, cgi, io, urllib.parse
from wsgiref.simple_server import WSGIServer

class AppException(Exception):
    pass

class Request(object):
    """保存客户端请求信息"""
    
    def __init__(self, env):
        self.env = env
        self.winput = env["wsgi.input"]
        self.method = env["REQUEST_METHOD"] # 获取请求方法(GET or POST)
        self.__attrs = {}
        self.attributes = {}
        self.encoding = "UTF-8"

    def __getattr__(self, attr):
        if(attr == "params" and "params" not in self.__attrs):
            fp = None
            if(self.method == "POST"):
                content = self.winput.read(int(self.env.get("CONTENT_LENGTH","0")))
                #fp = io.StringIO(content.decode(self.encoding))
                fp = io.StringIO(urllib.parse.unquote(content.decode("ISO-8859-1"),encoding=self.encoding))
                
            self.fs = cgi.FieldStorage(fp = fp, environ=self.env, keep_blank_values=1)# 创建FieldStorage
            self.params = {}
            for key in self.fs.keys():
                self.params[key] = self.fs[key].value
            self.__attrs["params"] = self.params
        return self.__attrs[attr]

class Response(object):
    """对客户端进行响应"""

    def __init__(self, start_response, write = None):
        self.encoding = "UTF-8"
        self.start_response = start_response
        self._write = write

    def write(self, string):
        """向流中写数据
            @param string:要写到流中的字符串
        """
        if(self._write is None):
            self._write = self.start_response("200 OK", [("Content-type","text/html;charset="+self.encoding)])
        self._write(string.encode(self.encoding).decode("ISO-8859-1"))

    def redirect(self, url):
        """跳转"""
        if(self._write is not None):
            raise AppException("响应流已写入数据,无法进行跳转。")
        self.start_response("302 OK", [("Location",url)])
                
class ThreadingWSGIServer(WSGIServer, socketserver.ThreadingMixIn):
    """一个使用多线程处理请求的WSGI服务类"""
    pass

class WSGIApplication(object):
    """WSGI服务器程序"""
    def __init__(self, urls=None):
        self.urls = urls # URL映射

    def getHandlerByUrl(self, url):
        """根据URL获取处理程序,如果没有找到该处理程序则返回None"""
        url = url.replace("//","/") # 避免输入错误引起的url解释错误
        
        urlArr = url.split('/')
        for setUrl in self.urls.keys():
            setUrlArr = setUrl.split("/")
            #print(setUrl.replace("*",r'\w*'))
            if(len(setUrlArr) == len(urlArr)):
                for i in range(len(urlArr)):
                    if(i == len(urlArr) - 1 and
                       (setUrlArr[i] == '*' or setUrlArr[i] == urlArr[i] or
                        ('*' in setUrlArr[i] and re.search(setUrlArr[i].replace("*",r'\w*'),urlArr[i])))):
                        return self.urls[setUrl]
                    if(setUrlArr[i] == '*' or setUrlArr[i]==' '):
                        continue;
                    if(setUrlArr[i] != urlArr[i]):
                        break;

    def make_app(self):
        """建立WSGI响应程序"""
        def wsgi_app(env, start_response):
            #print(";\n".join([k+"="+str(v) for k, v in env.items()]))
            url = env["PATH_INFO"] # 获取当前请求URL
            handlerCls = self.getHandlerByUrl(url)
            if(handlerCls is None):
                # 未经定义的url处理
                start_response("500 OK", [("Content-type","text/html;charset=utf-8")])
                return "Error URL"
            if(not hasattr(handlerCls,"doGET") and not hasattr(handlerCls,"doPOST")):
                # 映射错误
                start_response("500 OK", [("Content-type","text/html;charset=utf-8")])
                return "Error Mapping"
            request = Request(env)
            response = Response(start_response)
            try:
                handler = handlerCls(request, response)
            except TypeError as e:
                handler = handlerCls()
            methodName = "do" + request.method
            returnValue = None
            try:
                returnValue = getattr(handler,methodName)(request, response)
            except TypeError as e:
                returnValue = getattr(handler,methodName)()
            if(returnValue is None):
                returnValue=[]
            return returnValue
        return wsgi_app

    def make_server(self, serverIp='', port=8080, test=False):
        """建立一个默认服务器
           @param test: 是否只是做一次测试
        """
        from wsgiref.simple_server import make_server # 加载模块
        httpd = make_server(serverIp, port, self.make_app(), server_class=ThreadingWSGIServer)
        if test: # 如果只是测试
            httpd.handle_request() # 处理单次请求
        else:
            httpd.serve_forever() # 处理多次请求
        return True

def main():
    app = WSGIApplication(urls={"/a/*":TestHandler, "/a/b/*.do":TestHandler})
    app.make_server(test=True)

class TestHandler(object):
    def __init__(self):
        pass
    def doGET(self, request=None, response=None):
        request.encoding='UTF-8'
        response.write("Hello")
    def doPOST(self, request=None, response=None):
        #request.encoding='UTF-8'
        #response.write(request.params["name"])
        response.redirect("/a/x")

if __name__=="__main__":
    main()
    #input()

 后面的是一些简单的测试。

 

发现可能是做JAVA做得太多的关系,越看越像servlet api。。呵呵

   发表时间:2009-05-27  
servlet 规范做的还是相当不错的.
0 请登录后投票
   发表时间:2009-05-28  
呵呵,恩,确实不错
0 请登录后投票
论坛首页 编程语言技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics