`
Everyday都不同
  • 浏览: 723712 次
  • 性别: Icon_minigender_1
  • 来自: 宇宙
社区版块
存档分类
最新评论

运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)

阅读更多

记得我之前写过  redis主动向页面push数据  的文章,但文中所描述的方法要应用到J2EE的项目中还是比较困难的(还需用到nodejs什么的)。于是本文来探究下比较适合web项目的主动推技术。

Comet是一种用于web的推送技术,能使服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求,目前有两种实现方式:长轮询(long-polling)和iframe流(streaming)。下面就用iframe流的实现方式来实现服务端主动向客户端(这里客户端指的是jsp页面)推送的效果,并且结合了redis的发布订阅,算是比较典型的例子了。

 

客户端(页面):

<script type="text/javascript">
	$(function() {
		setCometUrl();
		bindLinstener();
	});
	
	function bindLinstener() {
		if (window.addEventListener) {  
		    window.addEventListener("load", comet.initialize, false);  
		    window.addEventListener("unload", comet.onUnload, false);  
		} else if (window.attachEvent) {  
		    window.attachEvent("onload", comet.initialize);  
		    window.attachEvent("onunload", comet.onUnload);  
		} 
	}
	
	function setCometUrl(){
		comet.cometUrl = "pubsub/push.json";
	}
	
	//服务器推送代码
	var comet = {
		connection : false,
		iframediv : false,

		initialize : function() {
			if (navigator.appVersion.indexOf("MSIE") != -1) {
				// For IE browsers
				comet.connection = new ActiveXObject("htmlfile");
				comet.connection.open();
				comet.connection.write("<html>");
				comet.connection.write("<script>document.domain = '" + document.domain + "'");
				comet.connection.write("</html>");
				comet.connection.close();
				comet.iframediv = comet.connection.createElement("div");
				comet.connection.appendChild(comet.iframediv);
				comet.connection.parentWindow.comet = comet;
				comet.iframediv.innerHTML = "<iframe id='comet_iframe' src='"+comet.cometUrl+"'></iframe>";
			
			} else if (navigator.appVersion.indexOf("KHTML") != -1) {
				// for KHTML browsers
				comet.connection = document.createElement('iframe');
				comet.connection.setAttribute('id', 'comet_iframe');
				comet.connection.setAttribute('src', comet.cometUrl);
				with (comet.connection.style) {
					position = "absolute";
					left = top = "-100px";
					height = width = "1px";
					visibility = "hidden";
				}
				document.body.appendChild(comet.connection);

			} else {
				// For other browser (Firefox...)
				comet.connection = document.createElement('iframe');
				comet.connection.setAttribute('id', 'comet_iframe');
				comet.iframediv = document.createElement('iframe');
				comet.iframediv.setAttribute('src', comet.cometUrl);
				
				comet.connection.appendChild(comet.iframediv);
				document.body.appendChild(comet.connection);
			}
		},

		onUnload : function() {
			if (comet.connection) {
				comet.connection = false; // release the iframe to prevent problems with IE when reloading the page
				closePage();
			}
		},
		
		receiveMsg : function(msg) {
			$("#content").append(msg + "<br/>");
		}
		
	}
	
	function closePage() {
		$.ajax({
			async : true,
			cache : false,
			type : "POST",
			//data:{objId:objId},
			dataType:"json",
			url :"pubsub/close.json",
			success : function(data) {
			},
			error: function(){
			}
		});
	}
</script>

</head>
<body >
	<div id="content" class="show"></div>
</body>

 这个客户端页面是利用浏览器支持的Comet,仅发起一次ajax请求,打通后台后,后台就会源源不断主动往这个页面发送数据。

 

后台较为复杂,并且还结合了redis的发布订阅。数据来源则是订阅redis的一个channel而得到。

Action:

@Controller
public class PubSubAction {
	
	LinkedList<String> queue = new LinkedList<String>();
	PrintWriter out;
	
	//线程
	MsgSubHandler subT = null;
	CheckQueueHandler checkT = null;
	
	@RequestMapping("/pubsub/push.json")
	@ResponseBody
	public void pushMsg(HttpServletResponse response) {
		System.out.println("这儿进几次.........");
		//订阅
		subT = new MsgSubHandler("pubsub_channel", queue);
		subT.start();
		//检查
		checkT = new CheckQueueHandler(queue);
		checkT.start();
		//创建Comet Iframe
		sendHtmlScript(response, "<script type=\"text/javascript\"> var comet = window.parent.comet;</script>");
		
		while (true) {
			try {
				Thread.sleep(1000);//每隔1s从队列取数
				if(queue.size() > 0) {
					String msg = queue.pop();
					System.out.println("从队列里取到的信息:" + msg);
					sendHtmlScript(response, "<script type=\"text/javascript\"> comet.receiveMsg('"+msg+"');</script>");
				}
			}catch(InterruptedException e) {
				e.printStackTrace();
			}	
		}
	}
	
	@RequestMapping("/pubsub/close.json")
	@ResponseBody
	public void shutdownServer() throws InterruptedException {
		System.out.println("开始关闭操作..");
		//关闭流
		out.flush();
		out.close();
		//队列情空
		queue.clear();
		//消息的关闭处理
		subT.shut();
		checkT.shut();
		//线程停止
		if(checkT.isAlive()) {
			checkT.interrupt();
			checkT.join();
		}
		if(subT.isAlive()) {
			subT.interrupt();
			subT.join();
		}
	}
	
	private void sendHtmlScript(HttpServletResponse response,String script){
		response.setCharacterEncoding("UTF-8");
		response.setContentType("text/html");
		response.setDateHeader("Expires", 0);
		response.setHeader("Pragma", "No-cache");
		response.setHeader("Cache-Control", "no-cache,no-store,max-age=0");
		try {
			out = response.getWriter();
			out.write(script);
			out.flush();
		} catch (IOException e) {
			e.printStackTrace();
			log.error(e.getMessage(), e);
		}
   }
}

 

其中,订阅消息的线程类和检查消息队列大小的线程类分别如下:

 

1:定时检查队列大小的线程类,目的是避免消息队列大小过大

public class CheckQueueHandler extends Thread {
	
	private LinkedList<String> queue;
	private boolean runFlag = true;
	
	public CheckQueueHandler(LinkedList<String> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			while (runFlag && queue.size()>0) {
				Thread.sleep(60 * 1000);//每隔1分钟检查指定队列的大小
				if (queue.size() >= 500) {
					queue.clear();
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();  
		}
	}
	
	public void shut() {
		runFlag = false;
	}
}

 2:订阅相应的channel的线程类:

public class MsgSubHandler extends Thread{
	
	private LinkedList<String> queue;
	private String channel;
	
	JedisPool pool;
	Jedis jedis;
	PubSubListener listener;
	
	public MsgSubHandler(String channel, LinkedList<String> queue) {
		this.channel = channel;
		this.queue = queue;
		
		//redis资源初始化
		pool = SysBeans.getBean("jedisPool");
		jedis = pool.getResource();
		
		//发布/订阅监听初始化
		listener = new PubSubListener(queue);
	}
	
	@Override
	public void run() {
		//订阅指定的渠道信息
		jedis.subscribe(listener, channel);
	}
	
	public void shut() {
		//归还redis资源
		if(pool !=null && jedis != null) {
			pool.returnResource(jedis);
		}
		//取消渠道订阅
		listener.unsubscribe();
	}
}

 3:redis的发布/订阅监听类

public class PubSubListener extends JedisPubSub {
	
	private LinkedList<String> queue =null;
	
	public PubSubListener(LinkedList<String> queue) {
		this.queue  =  queue;
	}
	
	//取得订阅后消息的处理  
    @Override  
    public void onMessage(String channel, String message) {  
        //System.out.print("onMessage:取得订阅后消息的处理  ");  
        queue.add(message);   
    }  
      
    //取得按表达式的方式订阅的消息后的处理  
    @Override  
    public void onPMessage(String pattern, String channel, String message) {  
        System.out.print("onPMessage:取得按表达式的方式订阅的消息后的处理    ");  
        System.out.println(pattern + "=" + channel + "=" + message);  
    }  
      
    //初始化按表达式的方式订阅时候的处理  
    @Override  
    public void onPSubscribe(String pattern, int subscribedChannels) {  
        System.out.print("onPSubscribe:初始化按表达式的方式订阅时候的处理   ");  
        System.out.println(pattern + "=" + subscribedChannels);    
    }  
      
    //取消化按表达式的方式订阅时候的处理  
    @Override  
    public void onPUnsubscribe(String pattern, int subscribedChannels) {  
        System.out.print("onPUnsubscribe:取消化按表达式的方式订阅时候的处理   ");  
        System.out.println(pattern + "=" + subscribedChannels);   
    }  
      
    //初始化订阅时候的处理  
    @Override  
    public void onSubscribe(String channel, int subscribedChannels) {  
        System.out.print("onSubscribe:初始化订阅时候的处理   ");  
        System.out.println(channel + "=" + subscribedChannels);   
    }  
      
    //取消订阅时候的处理  
    @Override  
    public void onUnsubscribe(String channel, int subscribedChannels) {  
        System.out.print("onUnsubscribe:取消订阅时候的处理   ");  
        System.out.println(channel + "=" + subscribedChannels);  
    }  

}

 

启动工程,打开客户端页面,最初始的div:



 同时控制台打印:

这儿进几次.........

onSubscribe:初始化订阅时候的处理   pubsub_channel=1

这说明:一打开客户端,就实现了订阅对应channel的目的。

接下来,为了让这个div中有数据,我们开始来对这个channel进行publish一些数据,模拟:

public static void main(String[] args) {
		Jedis jedis = new Jedis("localhost");
		while(true) {
			try {
				Thread.sleep(2000);
				jedis.publish("pubsub_channel", "I like " + Math.random()*100 );
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
		}
	}

 然后你再观察这个div,会发现如下现象(某一时刻):


由此说明:我们达到了如题所想要的目的!——结合了redis的发布/订阅  并且客户端只请求服务端一次,服务端主动向客户端推送了数据。

 

最后,我们再试着关闭客户端页面,会发现控制台打印:

onUnsubscribe:取消订阅时候的处理   pubsub_channel=0

说明,客户端一关闭,就取消了对channel的订阅了。并且queue队列也会被清空。


 其实Comet并不是新兴的技术,关于【反ajax】技术,最新的有WebSocket,以后有机会再研究。

  • 大小: 3.2 KB
  • 大小: 15.3 KB
分享到:
评论

相关推荐

    comet demo 向客户端推送例子

    Comet技术是一种基于HTTP长连接的反向Ajax技术,它允许服务器向客户端浏览器主动推送数据,从而实现双向通信。在Web应用中,通常的HTTP请求是客户端发起的,而Comet打破了这种模式,使得服务器可以在适当的时候主动...

    comet4j 服务端向浏览器实时推送消息(支持指定用户推送)

    Comet4j 是一个实现这种技术的开源框架,它基于 Comet 技术,允许服务器端主动向客户端推送数据,而不是传统的请求-响应模式。 首先,Comet 技术是Web服务器向浏览器推送数据的一种策略,解决了HTTP协议本身无状态...

    java 使用 comet4j 主动向客户端推送信息 简单例子

    Java 使用 Comet4j 主动向客户端推送信息是一个常见的实时通信技术,主要应用于构建WebSocket或长轮询等实时交互的应用场景。Comet4j 是一个基于 HTTP 长连接的服务器端框架,它允许服务器端主动向客户端推送数据,...

    多种方式模拟服务器推送客户端

    在Web开发中,服务器向客户端实时推送数据是常见的需求,比如聊天应用、股票实时更新、在线游戏等场景。本文将详细探讨如何通过JAVA实现多种服务器推送客户端的方式,包括Ajax轮询、长连接、长轮询以及Iframe刷新。 ...

    DWR3实现服务器端向客户端精确推送消息

    在“DWR3实现服务器端向客户端精确推送消息”这一主题中,我们将深入探讨如何利用DWR3进行服务器到客户端的消息推送,以及这种技术的优势和应用。 首先,理解DWR3的工作原理是至关重要的。DWR3通过建立一个安全的...

    comet4j 自己写的消息推送 觉得实用

    Comet4j 是一个开源的 Java 框架,用于实现实时的、双向的、基于 HTTP 长连接的消息推送服务,它解决了传统 HTTP 请求响应模式下难以实现服务器主动向客户端推送信息的问题。 在 Web 应用中,消息推送技术是必不可...

    web推送 comet技术

    Comet技术是一种基于HTTP长连接的Web实时通信技术,它允许服务器向客户端主动推送数据,而无需客户端发起新的请求。这种技术打破了传统的HTTP请求-响应模型,极大地提升了Web应用的实时性和交互性,尤其适用于股票...

    3.1.1.Web应用从服务器主动推送Data到客户端有那些方式.doc

    总结起来,Web应用从服务器主动推送数据给客户端的技术主要有AJAX轮询(长轮询)、Comet和WebSocket。根据具体的应用场景和需求,开发者可以选择最适合的方法来实现服务器与客户端的实时通信。WebSocket因其高效性和...

    java 使用 comet4j 主动向客户端推送GPS信息的例子

    基于服务器推送框架 Comet4J ,后台模拟实时生成 gps 坐标信息然后再推送到前端页面显示。...这是客户端主动向服务器发起请求的方式,而采用 comet4j框架来实现正好相反,是服务器主动向客户端来推送消息。

    javaweb实现后台向前台的消息推送 comet4j

    在JavaWeb开发中,实时通信是一项重要的功能,它允许服务器主动向客户端推送数据,而不仅仅是响应客户端的请求。本示例介绍的是如何使用Comet4j这个第三方库来实现后台到前台的消息推送功能。Comet4j是一个专门用于...

    comet4j向客户端推送

    Comet4j是一种Java技术,用于实现在服务器端向客户端进行消息推送的功能。在Web开发中,传统的HTTP协议是基于请求-响应模型的,即客户端发起请求,服务器响应后连接关闭,这使得实时通信变得复杂。而Comet4j正是为了...

    tomcat+comet实现终端与服务端同步的小例子

    Comet是一种Web应用程序设计模式,它允许数据从服务器推送到客户端,而不仅仅是响应客户端的请求,从而实现了双向通信。 **一、Apache Tomcat简介** Apache Tomcat是一个开源的Java Servlet容器,它是Java EE...

    DWR+JAVA进行web消息推送dwr-comet.zip

    在本文中,我们将深入探讨如何使用DWR(Direct Web Remoting)与Java技术结合实现Web消息推送功能。DWR是一种JavaScript库,它允许在浏览器和服务器之间进行实时、双向通信,为Web应用提供了类似AJAX的功能,但更加...

    Comet4J服务器端推送技术

    它分为服务端与客户端两部分,你只要将服务器端(JAR文件,目前仅支持Tomcat6、7)放入WEB-INF\lib,客户端(JavaScript文件)引入到页面,那么你的应用就具备了向客户端推送信息的能力,而你仅需要在服务器端调用Comet4...

    Java 实现 Comet 长连接,服务器主动发送消息给客户端

    Java 实现 Comet 长连接,服务器主动发送消息给客户端是一项关键的技术,它在实时通信、推送服务等领域有着广泛的应用。Comet 是一种基于 HTTP 的持久化连接技术,允许服务器在客户端保持一个打开的 HTTP 连接,直到...

    javaweb消息推送 基于comet实现局域网内部通讯(聊天室)demo

    javaweb消息推送 基于comet实现局域网内部通讯(聊天室)demo 功能特性 推送消息广播。 推送定向消息。 提供连接上线前、上线、下线前、下线、发送消息等多种可处理事件。 消息缓存机制,确保长轮询工作模式下不丢失...

    ssm.rar_comet_java comet_java comet推送_聊天 JAVA SSM

    在描述中提到的"Java comet服务器推送(聊天)实现代码",很可能包含了这些Comet技术的具体实现,包括服务器端的Servlet代码和可能的客户端JavaScript代码,用于处理推送的逻辑和展示消息。 SSM框架与Comet结合,可以...

    comet4j完整包

    - **服务端推送**:与传统的客户端拉取数据不同,服务端推送是服务器主动将数据发送到客户端,这样可以减少延迟,提高实时性,特别适合于实时聊天、股票行情、在线游戏等场景。 在使用comet4j时,开发者需要理解...

    Comet服务器推送技术

    Comet服务器推送技术是一种在Web应用中实现服务器主动向客户端推送数据的技术,它解决了传统HTTP协议下只能由客户端发起请求的局限。随着Ajax技术的普及,开发者希望在浏览器环境中实现更接近桌面应用的实时交互,而...

    Servlet3.0 异步处理 页面推送 Comet 实例

    本实例主要关注的是如何利用Servlet3.0的异步处理来实现页面推送技术——Comet。 Comet是一种Web应用程序架构,用于创建持久连接,允许服务器向客户端(通常是浏览器)实时推送数据。在传统的HTTP协议中,服务器...

Global site tag (gtag.js) - Google Analytics