`
simomo
  • 浏览: 26254 次
  • 性别: Icon_minigender_1
  • 来自: 郑州
社区版块
存档分类
最新评论

AmFast源码浅析

 
阅读更多

一直以来,很好奇amfast是如何实现服务器端基于长/普通轮询通道的主动推送的。最近看源码有了收获,记录如下。

 

 

正本清源——主动推送

 

      正所谓欲将取之必先予之,要想理解服务器端的主动推送,就必须要坚定心中对于http协议:

一次请求,一次回复;没有请求,没有回复!

的信念,坚信除了websocket和flash socket,所有的浏览器上的消息更新都是基于浏览器的主动发出的http请求,所谓的“主动推送”只是个美丽的名字。

 

隐藏的真相——广播即固化

amfast提供的消息广播的接口如下:

 

from amfast.remoting import flex_messages

#使用channel_set对象的publishMessage方法进行广播
# 创建一个待广播的flex消息对象
msg = flex_messages.AsyncMessage(headers=None,
    body="Hello World", destination="topic-name")
#广播刚才创建的flex消息对象
channel_set.publishMessage(msg)


#使用channel_set对象的publishObject方法进行广播
channel_set.publishObject("Hello World", "topic-name",
    sub_topic="sub-topic-name", headers=None, ttl=30000)

 publishMessage和publishObject接口本质上是一样的,publishiObject会将参数包装成一个flex消息,然后利用publishMessage方法发出去。所以以publishMessage方法为例。

我们可以在amfast/remoting/channel.py里发现这个方法的实现:

 

    def publishMessage(self, msg):
        """Publish a pre-formed message.

        arguments:
        ===========
         * msg - AbstractMessage, the Flex message to publish.
        """
        #关键一
        self.subscription_manager.publishMessage(msg)

        if self.notify_connections is True:
            topic = msg.destination
            if hasattr(msg, 'headers') and \
                msg.headers is not None and \
                messaging.AsyncMessage.SUBTOPIC_HEADER in msg.headers:
                sub_topic = msg.headers[messaging.AsyncMessage.SUBTOPIC_HEADER]
            else:
                sub_topic = None
            #关键二
            self.notifyConnections(topic, sub_topic)

 publishMessage接到待广播的消息后,第一步,先将转手将这个msg交给了subscription_manager.publishMessage 处理。我们可以在subscription_manager

找到这个方法的实现:

 

    def publishMessage(self, msg):
        #对消息过期时间做一些处理
        # Update timestamp to current server time.
        # Is this the correct thing to do???
        msg.timestamp = time.time() * 1000
        if msg.timeToLive is None or msg.timeToLive == 0:
            # Set timeToLive if it has not been pre-set.
            msg.timeToLive = self.ttl
        #关键!
        self.persistMessage(msg)

 这是几道手了?? 待广播的msg在做了一些简单加工后,被传递给了persistMessage方法。

 

这个方法就比较有意思了,它在subscription_manager的基类中并没有实现,而是在subscription_manager基类的三个子类中实现了,按照oop的叫法,这个persistMessage应该算是一个接口了。我们以这个接口在MemcacheSubscriptionManager中的实现为例:

 

    def persistMessage(self, msg):
        """Store a message."""

        topic = msg.destination
        if hasattr(msg, 'headers') and \
            msg.headers is not None and \
            messaging.AsyncMessage.SUBTOPIC_HEADER in msg.headers:
            sub_topic = msg.headers[messaging.AsyncMessage.SUBTOPIC_HEADER]
        else:
            sub_topic = None
        topic = self.getTopicKey(topic, sub_topic)

        # Remove connection data,
        # so that it is not pickled
        tmp_connection = getattr(msg, 'connection', None)
        if tmp_connection is not None:
            msg.connection = None

        key = self.getKeyName(topic, self.MSG_ATTR)
        lock_name = self.getLockName(key)
        self._lock.acquire(lock_name)
        try:
            messages = self.mc.get(key)
            if messages is None:
                messages = []
            messages.append(msg)
            self.mc.set(key, messages)
        finally:
            self._lock.release(lock_name)

            # Restore connection data
            if tmp_connection is not None:
                msg.connection = tmp_connection

 这段代码的作用其实故名思议,就是将这个待广播的msg以topic和subtopic为key存入到memcache缓存系统里。就这么简单。广播来广播去,最后被存起来了。这就是标题:广播即固化的意思。

 

回归本质,原来神马都是浮云

如果清醒的话,就会回想到,我们刚才是在说channel_set对象的publishMessage方法,然后发现msg被n倒手,最终固化到了缓存系统里,下面接着分析channel_set对象的publishMessage方法。

 

    def publishMessage(self, msg):
        """Publish a pre-formed message.

        arguments:
        ===========
         * msg - AbstractMessage, the Flex message to publish.
        """

        self.subscription_manager.publishMessage(msg)

        if self.notify_connections is True:
            topic = msg.destination
            if hasattr(msg, 'headers') and \
                msg.headers is not None and \
                messaging.AsyncMessage.SUBTOPIC_HEADER in msg.headers:
                sub_topic = msg.headers[messaging.AsyncMessage.SUBTOPIC_HEADER]
            else:
                sub_topic = None

            self.notifyConnections(topic, sub_topic)

 下面就是分支判断,如果你在实例化channel_set对象的时候,设置的notify_connection为True时,就会调用notifyConnections方法。

这个publishMessage方法做的事情很简单,就是把msg固化了一下。下面换个思路,就是从request处理的角度分析。

在客户端发来的request的处理过程中,有一个_pollForMessage的方法会被调用(这个方法之前的调用,在这里就不再说了)。这个方法就是关键所在。

 

def _pollForMessage(self, packet, message, connection):
        """Repeatedly polls for a new message until
        message is available or wait_interval is reached.

        This is blocking, and should only be used
        for Channels where each connection is a thread.

        Synchronous servers should override this method.
        """
        # If True, don't store persistent 'last_polled' value every poll operation.
        soft_touch = hasattr(self.channel_set.connection_manager, "softTouchPolled")

        total_time = 0
        poll_secs = float(self.poll_interval) / 1000
        wait_secs = float(self.wait_interval) / 1000
        while True:
            event = threading.Event()
            event.wait(poll_secs)
            msgs = self.channel_set.subscription_manager.pollConnection(connection, soft_touch)
            if len(msgs) > 0:
                if soft_touch is True:
                    # Store 'last_polled' value.
                    connection.touchPolled()
                #这一步,就是传说中的广播了            
                return msgs
            
            total_time += poll_secs
            if total_time > wait_secs or connection.connected is False:
                if soft_touch is True:
                    # Store 'last_polled' value.
                    connection.touchPolled()
                return ()

 它会调用pollConnection方法,这个方法的作用和刚才的persistMsg是相反的,这个方法会根据topic和subtopic为key,从缓存中取出属于这个客户端的msg,然后把这些msg们返回。如此,那些待广播的消息终于被“广播”了~

 

最后可以发现,其实广播也好,主动推送也好,其实是一个异步过程。

一个只负责将待广播的msg固化到数据库,缓存或内存中

另一头只负责按照自己的topic,subtopic去数据库,缓存或内存中将属于自己的msg取走。

 

另外,我们甚至可以根据amfast的格式,给具体某个客户端存些msg,这样就能实现针对某个客户端而不是某些客户端的消息推送了

 

0
0
分享到:
评论

相关推荐

    SFTP 源码浅析1

    SFTP 源码浅析 1 SFTP(Secure File Transfer Protocol,安全文件传输协议)是一种安全的文件传输协议,主要用于在网络中安全地传输文件。该协议基于 SSH(Secure Shell,安全shell)协议,提供了身份验证、加密和...

    内存分配器dlmalloc_2.8.3源码浅析

    dlmalloc内存分配器源码浅析 内存分配器dlmalloc_2.8.3源码浅析是学习 Linux 经典代码的重要资源,本文将对dlmalloc的源码进行详细分析,探索其内存分配和回收机制。 边界标记法 dlmalloc使用边界标记法来管理...

    微信小程序商城系统源码

    微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)...

    SSCOM源码 DELPHI 源码

    SSCOM源码 DELPHI 源码 绝对源码!欢迎下载

    微信小程序源码下载 微信小程序源码下载 2000套微信小程序源码

    本资源包含2000套微信小程序的源码,对于开发者来说是一份宝贵的参考资料,可以用来学习、研究或者作为开发新项目的起点。 源码下载是开发者获取程序原始代码的方式,对于学习和理解编程逻辑至关重要。这些微信小...

    电商微信小程序源码+后台

    电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序...

    饿了么源码 百度外卖源码 美团外卖源码 外卖系统源码

    订餐网,外卖网源码,带积分商城,商家系统,外卖网站建设! 系统特点: 周密策划、项目为先 "项目指导技术,技术服从项目",这是我们一贯秉承的原则,也是我们与其他系统开发商、网站建设公司的本质区别所在!我们...

    Android源码浅析(一)——VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置-附件资源

    Android源码浅析(一)——VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置-附件资源

    变速齿轮 易语言源码 变速齿轮源码 变速器源码

    易语言源码就是用这种语言编写的程序代码,通过阅读和理解这些源码,开发者可以学习到如何利用易语言来实现特定功能,比如变速齿轮。 在易语言中实现变速齿轮功能,主要涉及到以下几个关键知识点: 1. **系统时间...

    移动医疗APP源码 android (安卓版)妙手医生源码

    移动医疗APP源码是开发医疗健康应用的核心组成部分,它包含了应用程序的所有逻辑和界面设计。在Android平台上,这种源码通常是用Java或Kotlin语言编写的,并使用Android Studio作为集成开发环境(IDE)。在这个案例...

    ssh框架项目源码ssh框架项目源码ssh框架项目源码

    ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh...

    cocos creator完整麻将源码下载

    《cocos creator完整麻将源码解析与开发指南》 cocos Creator是一款强大的2D游戏开发引擎,被广泛应用于游戏开发,尤其是休闲娱乐类游戏,如麻将。本篇将深入探讨"麻将源码"这一主题,结合cocos Creator的特性,为...

    捕鱼游戏源码 下载 最新完整版

    捕鱼游戏源码是一种基于计算机编程技术,用于开发模拟海洋捕鱼场景的电子游戏的代码集合。这类源码通常包含了游戏逻辑、图形渲染、音频处理、用户交互等多个方面的详细实现,为开发者提供了一个深入理解游戏开发过程...

    非常漂亮的个人博客网站源码

    标题中的“非常漂亮的个人博客网站源码”表明这是一个关于个人博客网站的设计与开发资源,它包含了一套完整的源代码,可以用于创建一个美观且个性化的博客平台。这种源码通常包括HTML、CSS、JavaScript等前端代码,...

    51套经典企业网站源码(一)

    0001-2科技发展有限公司升级版源码 0001科技发展有限公司修正版源码 0002机械配件制造销售公司修正版源码 0003家具地板公司修正版源码 0004-1机械有限公司修正版源码 0004机械有限公司修正版源码 0005机械产品公司...

    Linux系统下dhcp源码

    Linux系统下的DHCP(Dynamic Host Configuration Protocol)源码解析 DHCP是一种网络协议,用于自动分配IP地址、子网掩码、默认网关等网络配置信息给网络中的设备。在Linux环境中,DHCP服务器通常使用isc-dhcp-...

    DIY个性T恤定制网站源码

    【DIY个性T恤定制网站源码】是一个用于创建在线个性化商品定制平台的软件系统,主要专注于T恤、杯子、台历和挂历等产品。这个源码允许用户通过简单的界面设计自己的产品,体现个人风格和创意。接下来,我们将深入...

    仿58同城赶集网源码

    分类源码,信息发布网站源码,信息源码,信息港网站源码,asp信息港源码,信息类网站源码,多种分类源码,信息网源码下载,asp信息网源码,信息发布系统源码,物流信息源码,房产信息网源码.net源码,公安信息网源码,家教信息...

Global site tag (gtag.js) - Google Analytics