一直以来,很好奇amfast是如何实现服务器端基于长/普通轮询通道的主动推送的。最近看源码有了收获,记录如下。
正本清源——主动推送
正所谓欲将取之必先予之,要想理解服务器端的主动推送,就必须要坚定心中对于http协议:
一次请求,一次回复;没有请求,没有回复!
的信念,坚信除了websocket和flash socket,所有的浏览器上的消息更新都是基于浏览器的主动发出的http请求,所谓的“主动推送”只是个美丽的名字。
隐藏的真相——广播即固化
amfast提供的消息广播的接口如下:
from amfast.remoting import flex_messages
#使用channel_set对象的publishMessage方法进行广播
# 创建一个待广播的flex消息对象
msg = flex_messages.AsyncMessage(headers=None,
body="Hello World", destination="topic-name")
#广播刚才创建的flex消息对象
channel_set.publishMessage(msg)
#使用channel_set对象的publishObject方法进行广播
channel_set.publishObject("Hello World", "topic-name",
sub_topic="sub-topic-name", headers=None, ttl=30000)
publishMessage和publishObject接口本质上是一样的,publishiObject会将参数包装成一个flex消息,然后利用publishMessage方法发出去。所以以publishMessage方法为例。
我们可以在amfast/remoting/channel.py里发现这个方法的实现:
def publishMessage(self, msg):
"""Publish a pre-formed message.
arguments:
===========
* msg - AbstractMessage, the Flex message to publish.
"""
#关键一
self.subscription_manager.publishMessage(msg)
if self.notify_connections is True:
topic = msg.destination
if hasattr(msg, 'headers') and \
msg.headers is not None and \
messaging.AsyncMessage.SUBTOPIC_HEADER in msg.headers:
sub_topic = msg.headers[messaging.AsyncMessage.SUBTOPIC_HEADER]
else:
sub_topic = None
#关键二
self.notifyConnections(topic, sub_topic)
publishMessage接到待广播的消息后,第一步,先将转手将这个msg交给了subscription_manager.publishMessage 处理。我们可以在subscription_manager
找到这个方法的实现:
def publishMessage(self, msg):
#对消息过期时间做一些处理
# Update timestamp to current server time.
# Is this the correct thing to do???
msg.timestamp = time.time() * 1000
if msg.timeToLive is None or msg.timeToLive == 0:
# Set timeToLive if it has not been pre-set.
msg.timeToLive = self.ttl
#关键!
self.persistMessage(msg)
这是几道手了?? 待广播的msg在做了一些简单加工后,被传递给了persistMessage方法。
这个方法就比较有意思了,它在subscription_manager的基类中并没有实现,而是在subscription_manager基类的三个子类中实现了,按照oop的叫法,这个persistMessage应该算是一个接口了。我们以这个接口在MemcacheSubscriptionManager中的实现为例:
def persistMessage(self, msg):
"""Store a message."""
topic = msg.destination
if hasattr(msg, 'headers') and \
msg.headers is not None and \
messaging.AsyncMessage.SUBTOPIC_HEADER in msg.headers:
sub_topic = msg.headers[messaging.AsyncMessage.SUBTOPIC_HEADER]
else:
sub_topic = None
topic = self.getTopicKey(topic, sub_topic)
# Remove connection data,
# so that it is not pickled
tmp_connection = getattr(msg, 'connection', None)
if tmp_connection is not None:
msg.connection = None
key = self.getKeyName(topic, self.MSG_ATTR)
lock_name = self.getLockName(key)
self._lock.acquire(lock_name)
try:
messages = self.mc.get(key)
if messages is None:
messages = []
messages.append(msg)
self.mc.set(key, messages)
finally:
self._lock.release(lock_name)
# Restore connection data
if tmp_connection is not None:
msg.connection = tmp_connection
这段代码的作用其实故名思议,就是将这个待广播的msg以topic和subtopic为key存入到memcache缓存系统里。就这么简单。广播来广播去,最后被存起来了。这就是标题:广播即固化的意思。
回归本质,原来神马都是浮云
如果清醒的话,就会回想到,我们刚才是在说channel_set对象的publishMessage方法,然后发现msg被n倒手,最终固化到了缓存系统里,下面接着分析channel_set对象的publishMessage方法。
def publishMessage(self, msg):
"""Publish a pre-formed message.
arguments:
===========
* msg - AbstractMessage, the Flex message to publish.
"""
self.subscription_manager.publishMessage(msg)
if self.notify_connections is True:
topic = msg.destination
if hasattr(msg, 'headers') and \
msg.headers is not None and \
messaging.AsyncMessage.SUBTOPIC_HEADER in msg.headers:
sub_topic = msg.headers[messaging.AsyncMessage.SUBTOPIC_HEADER]
else:
sub_topic = None
self.notifyConnections(topic, sub_topic)
下面就是分支判断,如果你在实例化channel_set对象的时候,设置的notify_connection为True时,就会调用notifyConnections方法。
这个publishMessage方法做的事情很简单,就是把msg固化了一下。下面换个思路,就是从request处理的角度分析。
在客户端发来的request的处理过程中,有一个_pollForMessage的方法会被调用(这个方法之前的调用,在这里就不再说了)。这个方法就是关键所在。
def _pollForMessage(self, packet, message, connection):
"""Repeatedly polls for a new message until
message is available or wait_interval is reached.
This is blocking, and should only be used
for Channels where each connection is a thread.
Synchronous servers should override this method.
"""
# If True, don't store persistent 'last_polled' value every poll operation.
soft_touch = hasattr(self.channel_set.connection_manager, "softTouchPolled")
total_time = 0
poll_secs = float(self.poll_interval) / 1000
wait_secs = float(self.wait_interval) / 1000
while True:
event = threading.Event()
event.wait(poll_secs)
msgs = self.channel_set.subscription_manager.pollConnection(connection, soft_touch)
if len(msgs) > 0:
if soft_touch is True:
# Store 'last_polled' value.
connection.touchPolled()
#这一步,就是传说中的广播了
return msgs
total_time += poll_secs
if total_time > wait_secs or connection.connected is False:
if soft_touch is True:
# Store 'last_polled' value.
connection.touchPolled()
return ()
它会调用pollConnection方法,这个方法的作用和刚才的persistMsg是相反的,这个方法会根据topic和subtopic为key,从缓存中取出属于这个客户端的msg,然后把这些msg们返回。如此,那些待广播的消息终于被“广播”了~
最后可以发现,其实广播也好,主动推送也好,其实是一个异步过程。
一个只负责将待广播的msg固化到数据库,缓存或内存中
另一头只负责按照自己的topic,subtopic去数据库,缓存或内存中将属于自己的msg取走。
另外,我们甚至可以根据amfast的格式,给具体某个客户端存些msg,这样就能实现针对某个客户端而不是某些客户端的消息推送了
分享到:
相关推荐
《SFTP 源码分析:理解客户端与服务器交互机制》 SFTP(Secure File Transfer Protocol)是一种基于SSH协议的安全文件传输协议。本文主要探讨SFTP客户端的源码实现,特别是其整体框架和命令调度机制。 1. **整体...
dlmalloc内存分配器源码浅析 内存分配器dlmalloc_2.8.3源码浅析是学习 Linux 经典代码的重要资源,本文将对dlmalloc的源码进行详细分析,探索其内存分配和回收机制。 边界标记法 dlmalloc使用边界标记法来管理...
《网狐6.6完整源码与内核源码解析:105款游戏源码解密探索》 在IT行业中,源码是程序开发的核心,它揭示了软件的内部工作机制,是程序员进行二次开发、优化和调试的基础。"网狐6.6完整源码+内核源码+105款游戏源码...
本资料“计算机软件-商业源码-浅析桌面精灵的实现.zip”包含了对桌面精灵实现的深入探讨,对于学习和开发此类应用的程序员来说具有很高的参考价值。 首先,我们要了解桌面精灵的基本结构。一个典型的桌面精灵由以下...
SSCOM源码 DELPHI 源码 绝对源码!欢迎下载
本资源包含2000套微信小程序的源码,对于开发者来说是一份宝贵的参考资料,可以用来学习、研究或者作为开发新项目的起点。 源码下载是开发者获取程序原始代码的方式,对于学习和理解编程逻辑至关重要。这些微信小...
订餐网,外卖网源码,带积分商城,商家系统,外卖网站建设! 系统特点: 周密策划、项目为先 "项目指导技术,技术服从项目",这是我们一贯秉承的原则,也是我们与其他系统开发商、网站建设公司的本质区别所在!我们...
Android源码浅析(一)——VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置-附件资源
易语言源码就是用这种语言编写的程序代码,通过阅读和理解这些源码,开发者可以学习到如何利用易语言来实现特定功能,比如变速齿轮。 在易语言中实现变速齿轮功能,主要涉及到以下几个关键知识点: 1. **系统时间...
移动医疗APP源码是开发医疗健康应用的核心组成部分,它包含了应用程序的所有逻辑和界面设计。在Android平台上,这种源码通常是用Java或Kotlin语言编写的,并使用Android Studio作为集成开发环境(IDE)。在这个案例...
《cocos creator完整麻将源码解析与开发指南》 cocos Creator是一款强大的2D游戏开发引擎,被广泛应用于游戏开发,尤其是休闲娱乐类游戏,如麻将。本篇将深入探讨"麻将源码"这一主题,结合cocos Creator的特性,为...
供应链管理系统源码是一种用于管理企业内部以及与外部合作伙伴之间物流、信息流和资金流的软件解决方案。这个系统的核心目标是优化整个供应链流程,提高效率,降低成本,并确保在正确的时间、正确的地点提供正确的...
在源码层面,53客服系统源码主要包含了以下几个关键知识点: 1. **多平台支持**:53客服系统通常支持网页、手机APP、微信等多种渠道的接入,源码中会有相应的接口实现,以确保用户可以通过不同的终端与客服进行交互...
0001-2科技发展有限公司升级版源码 0001科技发展有限公司修正版源码 0002机械配件制造销售公司修正版源码 0003家具地板公司修正版源码 0004-1机械有限公司修正版源码 0004机械有限公司修正版源码 0005机械产品公司...
标题中的“非常漂亮的个人博客网站源码”表明这是一个关于个人博客网站的设计与开发资源,它包含了一套完整的源代码,可以用于创建一个美观且个性化的博客平台。这种源码通常包括HTML、CSS、JavaScript等前端代码,...
Linux系统下的DHCP(Dynamic Host Configuration Protocol)源码解析 DHCP是一种网络协议,用于自动分配IP地址、子网掩码、默认网关等网络配置信息给网络中的设备。在Linux环境中,DHCP服务器通常使用isc-dhcp-...
分类源码,信息发布网站源码,信息源码,信息港网站源码,asp信息港源码,信息类网站源码,多种分类源码,信息网源码下载,asp信息网源码,信息发布系统源码,物流信息源码,房产信息网源码.net源码,公安信息网源码,家教信息...