`

模块简单设计——用Jetty实现一个client adapter

 
阅读更多

首先看看下面的应用场景

传统的服务器端为若干个客户端提供服务,一般需要开启多个服务器端进程。为了进一步提升服务器端的处理能力,可以如图所示将服务解耦为两部分(adapter与workers),它们之间通过消息队列传输数据,其中workers处理具体业务,adapter负责接入请求以及反馈结果,具体包含下面两个工作。

1,将所有客户端的请求发送到消息队列(进而传给后台处理)

2,后台处理完毕后将结果返回响应队列,client adapter获取到结果后返回给相应客户端。

我们选择用Jetty(8),redis以及php简单实现这个场景,主要用到jetty的continuation机制和redis的list数据结构

 

A,先配置一个服务器如下,同时开启一个守护线程阻塞监听response queue(用到json lib库以及jedis库)

package test;

import java.util.HashMap;
import java.util.List;

import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.server.*;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;

import org.json.simple.*;

import redis.clients.jedis.Jedis;

public class PJetty{
	
	public static HashMap<String,Continuation>globalMap = new HashMap<String,Continuation>();
	
	// 用一个守护线程阻塞等待结果队列返回数据
	public static class DaemonThread extends Thread{
		
		private JSONObject obj = new JSONObject();

		private Jedis jedis = new Jedis("127.0.0.1",6379);
		private List<String> res;
		
		public void run(){
			
			while(true){
				// 阻塞等待响应队列
				res = jedis.brpop(0, "response_queue");
				
				// 获取结果信息
				String response = res.get(1);

				obj=(JSONObject) JSONValue.parse(response);
				String request_sid = obj.get("request_sid").toString();
				String result = obj.get("results").toString();
				
				if(request_sid == null){
					continue;
				}
				
				// 通过消息中的连接的sessonid获取到响应的continuation实例,然后设置结果信息再唤醒实例
				Continuation con = globalMap.get(request_sid);
				if(con == null){continue;}
				globalMap.remove(request_sid);
				
				//客户端异常断开这里会抛错
				try{
					con.setAttribute("results", result);
					con.resume();
				} catch(Exception e){
					continue;
				}
			}
			
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		//开启守护线程去阻塞等待响应结果队列,唤醒请求
		DaemonThread dt = new DaemonThread();
		dt.start();
		
		//设置connectors
		SelectChannelConnector connector1 = new SelectChannelConnector();
		connector1.setPort(1987);
		connector1.setThreadPool(new QueuedThreadPool(5));
		
		Server server = new Server();
		server.setConnectors(new Connector[]{connector1});

		//使用servlet处理请求
		ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
		context.setContextPath("/");
		context.addServlet(new ServletHolder(new NonBlockingServlet()), "/fetch");
		server.setHandler(context);
		
        server.start();
        server.join();
	}
}

B,实现自定义的servlets接受前端client连接,将请求信息传入队列request queue

package test;

import java.io.IOException;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.json.simple.JSONObject;

import redis.clients.jedis.Jedis;

public class NonBlockingServlet extends HttpServlet {

	/**
	 * generated serialize number
	 */
	private static final long serialVersionUID = 3313258432391586994L;
	
	
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException
	{
		// 用sleeptime来模拟后台工作量
		 String sleepTime = request.getParameter("st");
		 if(sleepTime == null){
			 sleepTime = "0";
		 }
		 
	     // 查看结果队列是否返回此连接的请求结果
	     Object results = request.getAttribute("results");
	     if (results==null) // 如果异步处理尚未返回结果
	      {
	       Continuation continuation = ContinuationSupport.getContinuation(request);
	 
	       if(continuation.isInitial()){
	    	   // 设置连接超时时间
		    	continuation.setTimeout(10000);
	    	   	response.setContentType("text/plain");
			    response.getWriter().flush();
			    
			    HttpSession session=request.getSession();
			    String sid = session.getId();
			    
				Jedis jedis = new Jedis("127.0.0.1",6379);
			    //将请求连接sessionid以及请求内容encode后传到处理队列中
			    JSONObject obj=new JSONObject();
			    obj.put("request_sid",sid);
			    obj.put("params",sleepTime);
			    
			    jedis.lpush("request_queue", obj.toJSONString());
			    
			    //将连接和continuation实例做个映射关系存到全局hashmap中,不确定这里是否应该加锁
			    PJetty.globalMap.put(sid, continuation);
	       }
	       
	       // 判断是否超时
	       if (continuation.isExpired())
	       {
	         // 返回超时Response
	    	 response.getWriter().println("timeout");
	   	     response.getWriter().flush();  
	         return;
	       }
	 
	       // 挂起HTTP连接
	       continuation.suspend(); 
	 
	       return; // or continuation.undispatch();
	     }
	 
	     // 连接恢复后返回结果
		 response.getWriter().println("Got Result:\t" + results);
		 response.getWriter().flush();  
	}
}

C,实现后端worker.php(可以自定义worker进程数,进程数多能获取更好的并发)(用到predis库)

#!/root/bin/php
<?php

require_once("lib/Predis/Autoloader.php");

function worker_thread()
{
        Predis\Autoloader::register();
        $redis = new Predis\Client('tcp://127.0.0.1:6379');

                while(true){
                        try{
                                $request = $redis->brpop("request_queue", 0);
                        } catch(Exception $e){
                                continue;
                        }
                        /** demo
                         array(2) {
                         [0]=>
                         string(13) "request_queue"
                         [1]=>
                         string(55) "{"request_sid":"q0muxazo8k1h1k3uw85wuayh","params":"4"}"
                         }
                         */
                        $request = json_decode($request[1], true);
                        // sleep represents work loads
                        sleep(intval($request["params"]));
                        $results = array("request_sid"=>$request["request_sid"], "results"=>$request["params"]);
                        $response = json_encode($results);
                        $redis->lpush("response_queue",$response);
                }
}

//开启多个worker进程提供服务
for ($worker_nbr = 0; $worker_nbr < 5; $worker_nbr++) {
        $pid = pcntl_fork();
        if ($pid == 0) {
                worker_thread();

                return;
        }
}

?>

 运行效果如下:

 

root # for((i=10;i>=1;i--)) ; do lynx -dump http://127.0.0.1:1987/fetch?st=$i & done
[1] 14112
[2] 14113
[3] 14114
[4] 14115
[5] 14116
[6] 14117
[7] 14118
[8] 14119
[9] 14120
[10] 14121
root # Got Result:     3
Got Result:     4
Got Result:     2
Got Result:     7
Got Result:     1
Got Result:     9
Got Result:     6
timeout
timeout
timeout

[1]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[2]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[3]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[4]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[5]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[6]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[7]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[8]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[9]-  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[10]+  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i

 这只是一个简单的demo,为了防止redis,workers进程挂掉或者客户端异常断开,还需要做些异常处理,比如设置请求超时,捕获一些空指针等,超时需要将continuation从globalMap中剔除。

redis数据库中存储的内容如下,可以看出虽然经后台处理后顺序变化了,但是对应关系正确:

 

redis 127.0.0.1:6379> lrange request_queue 0 15
 1) "{\"request_sid\":\"igiwkwnb715aphw8uvtfa6rj\",\"params\":\"3\"}"
 2) "{\"request_sid\":\"wsrglxa3h6ef19ik5i0nbiiys\",\"params\":\"2\"}"
 3) "{\"request_sid\":\"tyiqoj6awj5t16ddpqusftwc8\",\"params\":\"6\"}"
 4) "{\"request_sid\":\"1052tgkiyy7c31bmxjtbom7ca\",\"params\":\"5\"}"
 5) "{\"request_sid\":\"17jo1xwnnkd3h1mhcqcfplrl5k\",\"params\":\"8\"}"
 6) "{\"request_sid\":\"1xk521sq6vmmf6enxauwzduj9\",\"params\":\"4\"}"
 7) "{\"request_sid\":\"1cxnir1slgjiq1o2n3xwznh0kk\",\"params\":\"9\"}"
 8) "{\"request_sid\":\"961vf8hao3stsv4vt1qif3ws\",\"params\":\"7\"}"
 9) "{\"request_sid\":\"35pfn5au6p8qdbri17p636si\",\"params\":\"10\"}"
10) "{\"request_sid\":\"1ca4wy8qsfr7av0hwk8xtlqhp\",\"params\":\"1\"}"

redis 127.0.0.1:6379> lrange response_queue 0 15
 1) "{\"request_sid\":\"tyiqoj6awj5t16ddpqusftwc8\",\"results\":\"6\"}"
 2) "{\"request_sid\":\"igiwkwnb715aphw8uvtfa6rj\",\"results\":\"3\"}"
 3) "{\"request_sid\":\"wsrglxa3h6ef19ik5i0nbiiys\",\"results\":\"2\"}"
 4) "{\"request_sid\":\"35pfn5au6p8qdbri17p636si\",\"results\":\"10\"}"
 5) "{\"request_sid\":\"1052tgkiyy7c31bmxjtbom7ca\",\"results\":\"5\"}"
 6) "{\"request_sid\":\"1cxnir1slgjiq1o2n3xwznh0kk\",\"results\":\"9\"}"
 7) "{\"request_sid\":\"17jo1xwnnkd3h1mhcqcfplrl5k\",\"results\":\"8\"}"
 8) "{\"request_sid\":\"961vf8hao3stsv4vt1qif3ws\",\"results\":\"7\"}"
 9) "{\"request_sid\":\"1xk521sq6vmmf6enxauwzduj9\",\"results\":\"4\"}"
10) "{\"request_sid\":\"1ca4wy8qsfr7av0hwk8xtlqhp\",\"results\":\"1\"}"
  • 大小: 9.7 KB
分享到:
评论

相关推荐

    用jetty8.0写的websocket实现的简单聊天程序

    在使用Jetty实现WebSocket聊天程序时,我们需要创建一个继承自`org.eclipse.jetty.websocket.WebSocket.OnTextMessage`的类,重写`onOpen`、`onClose`、`onMessage`等方法。`onOpen`在连接建立时调用,`onClose`在...

    jetty-client-9.4.43.v20210629-API文档-中文版.zip

    赠送jar包:jetty-client-9.4.43.v20210629.jar; 赠送原API文档:jetty-client-9.4.43.v20210629-javadoc.jar; 赠送源代码:jetty-client-9.4.43.v20210629-sources.jar; 赠送Maven依赖信息文件:jetty-client-...

    jetty-client-9.4.43.v20210629-API文档-中英对照版.zip

    赠送jar包:jetty-client-9.4.43.v20210629.jar; 赠送原API文档:jetty-client-9.4.43.v20210629-javadoc.jar; 赠送源代码:jetty-client-9.4.43.v20210629-sources.jar; 赠送Maven依赖信息文件:jetty-client-...

    jetty9分拆的各个包下载

    jetty-alpn-client-9.2.26.v20180806.jar jetty-alpn-server-9.2.26.v20180806.jar jetty-annotations-9.2.26.v20180806.jar jetty-cdi-9.2.26.v20180806.jar jetty-client-9.2.26.v20180806.jar jetty-continuation...

    Android源码——i-jetty开源项目.zip

    i-jetty开源项目就是专为Android平台设计的一个轻量级HTTP服务器,它基于Java的Jetty服务器,实现了在Android设备上运行HTTP服务的功能。本文将详细探讨这个项目的核心概念、工作原理以及如何在实际开发中应用。 1....

    jetty-client-9.4.11.v20180605-API文档-中英对照版.zip

    赠送jar包:jetty-client-9.4.11.v20180605.jar; 赠送原API文档:jetty-client-9.4.11.v20180605-javadoc.jar; 赠送源代码:jetty-client-9.4.11.v20180605-sources.jar; 赠送Maven依赖信息文件:jetty-client-...

    jetty实现websocket功能

    Jetty是一个轻量级、高性能的Java Web服务器和Servlet容器,它也支持WebSocket协议,并且集成了一系列WebSocket的API,使得开发者可以轻松地在Java应用中实现WebSocket功能。 在Jetty中实现WebSocket功能,首先你...

    安卓Android源码——i-jetty开源项目.zip

    【描述】中的"安卓Android源码——i-jetty开源项目.zip"意味着这个压缩包包含了一个完整的源代码实现,开发者可以通过阅读和分析这些源代码来学习如何在Android环境中配置和运行Jetty服务器。 从【标签】"安卓 源码...

    JeeSite 代码生成器的应用

    JeeSite 代码生成器的应用

    jetty-client-9.4.11.v20180605-API文档-中文版.zip

    赠送jar包:jetty-client-9.4.11.v20180605.jar; 赠送原API文档:jetty-client-9.4.11.v20180605-javadoc.jar; 赠送源代码:jetty-client-9.4.11.v20180605-sources.jar; 赠送Maven依赖信息文件:jetty-client-...

    通过Jetty实现文件上传下载的小工具

    NULL 博文链接:https://vista-rui.iteye.com/blog/1386427

    jetty-client-6.1.6rc0.jar

    jetty-client-6.1.6rc0.jar

    课程设计——学生信息系统

    本项目以Java语言作为主要开发工具,利用数据库技术实现对学生信息的有效管理,是一个个人开发的系统,旨在提升开发者在实际问题解决上的能力。 一、Java语言基础 Java是一种广泛使用的面向对象的编程语言,以其...

    eclipse + maven多模块项目 + SpringMVC + jetty热部署实现验证码图片实例源码

    并且基于Spring MVC提供了一个完整功能:实现了生成验证码图片,以及验证输入是否匹配的两个接口,接口为Rest风格,符合内容协商原则(同一资源,多种展现:xml,json,html)。 另外,演示了注解(Annotation)的用法,实现...

    JSP毕业设计——JSP实现的简单旅游管理系统的设计(源代码+论文).zip

    **JSP毕业设计——JSP实现的简单旅游管理系统** 该毕业设计项目是基于JSP技术构建的一个简单旅游管理系统,旨在让学生掌握JSP编程基础以及如何将它应用于实际的Web应用程序开发中。通过这个项目,我们可以深入理解...

    JSP论文格式化系统_——后台模块的设计与实现(源代码+论文).zip

    《JSP论文格式化系统——后台模块的设计与实现》是一个基于Java技术和JSP(JavaServer Pages)的毕业设计项目,其主要目标是提供一个能够自动格式化学术论文的后台管理系统。该系统涵盖了论文的上传、格式检查、格式...

    jetty6.1.6-2

    Jetty 6.1.6 是一个开源的、轻量级的Java Web服务器和Servlet容器。这个版本的Jetty发布于较早时期,但它的设计理念和功能仍然对理解Web服务器和Servlet容器的工作原理有很大帮助。Jetty在设计时强调了性能、可嵌入...

    Java毕业设计——学校管理系统设计与实现(源码+数据库).zip

    《Java毕业设计——学校管理系统设计与实现》是一个典型的软件工程实践项目,主要涵盖了Java编程语言、数据库管理和Web应用开发等多个领域的知识。以下是该项目可能涉及的关键技术点和知识点的详细解析: 1. **Java...

    jetty相关的全部jar包

    jetty-security-9.4.8.v20171121.jar,jetty-io-9.4.8.v20171121.jar,jetty-continuation-9.4.8.v20171121.jar,jetty-client-9.4.8.v20171121.jar,jetty-jmx-9.4.8.v20171121.jar,jetty-plus-9.4.8.v20171121....

    jetty 适合jdk1.8用的服务器

    Jetty是一款开源、轻量级的Web服务器和Servlet容器,被广泛用于开发、测试和部署Java Web应用程序。相较于Apache Tomcat,Jetty以其简洁的架构、高性能和低内存占用而受到开发者青睐。在选择Jetty时,必须考虑到与...

Global site tag (gtag.js) - Google Analytics