`
kanpiaoxue
  • 浏览: 1777432 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

python通过stomp协议和hornetq进行连接

 
阅读更多

===================================== 2014-03-28 =========================================

stomp.py 的官网地址:https://pypi.python.org/pypi/stomp.py

 

例子地址: https://github.com/jasonrbriggs/stomp.py/wiki/Simple-Example

stomp.py 4.0.11 见附件

例子代码:

# -*-coding:utf-8-*-
'''
Created on 2014-3-28

@author: xuepeng
'''

from logging.handlers import TimedRotatingFileHandler
import logging
import os
import stomp
import sys
import time


def initLogger(logFileName):
        if not os.path.isfile(logFileName):
            logPath = logFileName[0:logFileName.rfind('/') + 1]
            if not os.path.isdir(logPath):
                os.makedirs(logPath)
            f = open(logFileName, 'w')
            f.close()
        format = '%(asctime)s [%(threadName)s](%(levelname)s) %(pathname)s(%(funcName)s:%(lineno)s) --> %(message)s'
        filemode = 'a'
        # level = logging.DEBUG
        level = logging.INFO
        logging.basicConfig(filemode=filemode, level=level, format=format)
        hdlr0 = TimedRotatingFileHandler(logFileName, when='D', interval=1, backupCount=0, encoding='utf-8', delay=False, utc=False)
        formatter = logging.Formatter(format)
        hdlr0.setFormatter(formatter)
        logger = logging.getLogger()
        logger.addHandler(hdlr0)
        return logger


logger = initLogger('e:/logs/simple.log')

class MyListener(object):
    def on_error(self, headers, message):
        print('received an error %s' % message)
    def on_message(self, headers, message):
        print('----->received a message %s' % message)

def main():         
    dest = 'jms.queue.com.wanmei.mq.test.xuepeng'
    
    conn = stomp.Connection(host_and_ports=[ ('mq-node1', 61613) ])
    conn.set_listener('', MyListener())
    conn.start()
    conn.connect()
    conn.subscribe(destination=dest, id=1, ack='auto')
    
    msg = ' '.join(sys.argv[1:])
    num = 0
    while num < 1000:
        conn.send(body=msg, destination=dest)
        print 'send message ', msg
        num = num + 1 
    
    time.sleep(2)
    conn.disconnect()

if __name__ == '__main__':
    main()

 执行这段代码:

python E:\workspace_python\Demo\src\com\wanmei\stomp\simple.py hello world 123

==============================================================================

再看HornetQ,因为自己学了python,所以不仅仅希望用Java来连接HornetQ,也希望用python来连接,进行开发。看了HornetQ的手册,里面说的很清楚,HornetQ不支持对stomp消息的持久化。这算是很大的一个缺点。但是毕竟支持了跨计算机语言的功能。我尝试了,并且写了简单的测试。发现开始运行的时候,会有数据丢失。这个问题在JBoss那里有提到,具体的我还有继续研究。HornetQ的stomp支持,请参考手册。测试了很久,发现就在开始的短暂时间有数据丢失,中间运行还是很稳定的。如果对于数据丢失不是很敏感的应用,可以进行测试。需要进行深入研究。

下面是例子的代码:

 

#-*-coding:utf-8-*-
'''
Created on 2012-2-20

'''
import logging
import stomp
import time


 
logging.basicConfig()
dest = 'jms.queue.TestQueue' 
#dest = 'jms.topic.TestTopic' 

logging.basicConfig()
class MyListener(stomp.ConnectionListener):     
    def on_error(self, headers, message):         
        print('received an error %s' % message)     
    def on_message(self, headers, message):
        print '--------------------------------------'
        #for k, v in headers.iteritems():             
        #    print('header: key %s , value %s' % (k, v))         
        print('received message\n %s' % message)   

    def on_disconnected(self):
        """
        Called by the STOMP connection when a TCP/IP connection to the
        STOMP server has been lost.  No messages should be sent via
        the connection until it has been reestablished.
        """
        pass
    
    def on_connecting(self, host_and_port):
        """
        Called by the STOMP connection once a TCP/IP connection to the
        STOMP server has been established or re-established. Note that
        at this point, no connection has been established on the STOMP
        protocol level. For this, you need to invoke the "connect"
        method on the connection.

        \param host_and_port a tuple containing the host name and port
        number to which the connection has been established.
        """
        pass

    def on_connected(self, headers, body):
        """
        Called by the STOMP connection when a CONNECTED frame is
        received, that is after a connection has been established or
        re-established.

        \param headers a dictionary containing all headers sent by the
        server as key/value pairs.

        \param body the frame's payload. This is usually empty for
        CONNECTED frames.
        """
        pass
        
    def on_heartbeat_timeout(self):
        """
        Called by the STOMP connection when a heartbeat message has not been
        received beyond the specified period.
        """
        pass

    def on_receipt(self, headers, body):
        """
        Called by the STOMP connection when a RECEIPT frame is
        received, sent by the server if requested by the client using
        the 'receipt' header.

        \param headers a dictionary containing all headers sent by the
        server as key/value pairs.

        \param body the frame's payload. This is usually empty for
        RECEIPT frames.
        """
        pass

    
    def on_send(self, headers, body):
        """
        Called by the STOMP connection when it is in the process of sending a message
        
        \param headers a dictionary containing the headers that will be sent with this message
        
        \param body the message payload
        """
        pass

try:
    conn = stomp.Connection([('192.168.123.74', 61613)]) 
    conn.set_listener('somename', MyListener())
    print('set up Connection') 
    
    conn.start() 
    print('started connection')  
    conn.connect(wait=True) 
    print('connected') 
     
    while True:
        num = 0
        count = 99999
        while num < count:
            try:
                num += 1
                message = 'hello world ' + str(num) 
                conn.send(message=message, destination=dest, headers={'type':'textMessage'}, ack='auto') 
                #print 'sent message:', message
            except Exception , e:
                print '==============', e
        print 'It has produce ' + str(count) + ' messages'
        time.sleep(2) 
except Exception , e:
    print '----------------- ', e
    
print('slept') 
conn.disconnect() 
print('disconnected')
    
    

 

 

#-*-coding:utf-8-*-
'''
Created on 2012-2-20

'''
import logging
import stomp
import time

logging.basicConfig()
class MyListener(stomp.ConnectionListener):
    def __init__(self,conn,headers): 
        super(MyListener,self).__init__()
        self.conn = conn
        self.headers = headers
    def on_error(self, headers, message):         
        print('received an error %s' % message)     
    def on_message(self, headers, message):
        print '--------------------------------------'
        #for k, v in headers.iteritems():             
        #    print('header: key %s , value %s' % (k, v))         
        print('received message\n %s' % message)   


    def on_disconnected(self):
        """
        Called by the STOMP connection when a TCP/IP connection to the
        STOMP server has been lost.  No messages should be sent via
        the connection until it has been reestablished.
        """
        if not  self.conn.is_connected():
            print 'Error: conn failure! try to connection again'
        sleepTime = 5
        print '+++++++++++++++++++++++++++++++++++++++++++'
        print 'it will sleep ' + str(sleepTime) + ' seconds.'
        time.sleep(sleepTime)

        consume()
        pass
    
    def on_connecting(self, host_and_port):
        """
        Called by the STOMP connection once a TCP/IP connection to the
        STOMP server has been established or re-established. Note that
        at this point, no connection has been established on the STOMP
        protocol level. For this, you need to invoke the "connect"
        method on the connection.

        \param host_and_port a tuple containing the host name and port
        number to which the connection has been established.
        """
        pass

    def on_connected(self, headers, body):
        """
        Called by the STOMP connection when a CONNECTED frame is
        received, that is after a connection has been established or
        re-established.

        \param headers a dictionary containing all headers sent by the
        server as key/value pairs.

        \param body the frame's payload. This is usually empty for
        CONNECTED frames.
        """
        pass
        
    def on_heartbeat_timeout(self):
        """
        Called by the STOMP connection when a heartbeat message has not been
        received beyond the specified period.
        """
        pass

    def on_receipt(self, headers, body):
        """
        Called by the STOMP connection when a RECEIPT frame is
        received, sent by the server if requested by the client using
        the 'receipt' header.

        \param headers a dictionary containing all headers sent by the
        server as key/value pairs.

        \param body the frame's payload. This is usually empty for
        RECEIPT frames.
        """
        pass

    
    def on_send(self, headers, body):
        """
        Called by the STOMP connection when it is in the process of sending a message
        
        \param headers a dictionary containing the headers that will be sent with this message
        
        \param body the message payload
        """
        pass


def consume():
    
    dest = 'jms.queue.TestQueue' 
    clientId = 919191
    headers={'client-id':clientId}
    #dest = 'jms.topic.TestTopic' 
    
    conn = stomp.Connection([('192.168.123.74', 61613)]) 
    print('set up Connection') 
    conn.set_listener('somename', MyListener(conn,headers)) 
    print('Set up listener')  
    conn.start()
    print('started connection')  
    conn.connect(wait=True,headers=headers) 
    print('connected')
    
    conn.subscribe(destination=dest, ack='auto') 
    print('subscribed')
    while True:
        pass
    print('slept') 
    conn.disconnect() 
    print('disconnected')
    
if __name__ == '__main__':
    
    consume()

 

分享到:
评论

相关推荐

    如何在微信小程序的websocket上使用stomp协议1

    而 STOMP(Simple Text Oriented Message Protocol)是一种简单文本导向的消息协议,它允许通过 WebSocket 进行消息传递。与 MQTT 相比,STOMP 更易于理解和实现,因为它是基于文本的,而 MQTT 是基于二进制的,这...

    STOMP 协议规范,版本 1.2

    该规范涵盖了 STOMP 协议的设计理念、协议概述、帧结构、命令、标头、错误处理、连接和订阅等方面的内容。 设计理念 STOMP 设计的主要理念是简单性和互操作性。STOMP 被设计为一种轻量级协议,易于在客户端和...

    springboot websocket集群(stomp协议)连接时候传递参数

    在Spring Boot应用中实现WebSocket集群并使用STOMP协议进行通信时,传递参数是一个重要的环节,尤其是在多节点集群环境中。WebSocket允许双向通信,而STOMP(Simple Text Oriented Messaging Protocol)则提供了一种...

    stomp协议客户端调试助手工具

    于是乎自己造一个,详细介绍参见博客:《Python实现基于WebSocket的stomp协议的小巧调试助手工具》,文章地址:https://blog.csdn.net/yyz_1987/article/details/143111710,可以用来调试stomp协议的通信和订阅、...

    STOMP协议详解1

    这些命令通过TCP连接发送,通常是纯文本格式,使得使用telnet或nc(netcat)这样的简单工具就可以直接与STOMP服务器进行交互,这对于测试和调试非常方便。 在实际应用中,有许多开源项目实现了STOMP协议。例如,...

    stomp.min.js(stomp协议的客户端脚本)、sockjs.min.js(SockJS的客户端脚本)以及jQuery

    使用jQuery处理DOM操作和事件,通过SockJS建立到服务器的稳定连接,再利用STOMP协议在客户端和服务器之间传递消息,实现高效的数据同步。 以下是一些关键知识点: 1. **STOMP协议**:了解其基本命令(CONNECT、...

    stomp.py:“ stomp.py”是一个Python客户端库,用于使用STOMP协议(版本1.0、1.1和1.2)访问消息传递服务器(例如ActiveMQ或RabbitMQ)。 它也可以作为独立的命令行客户端运行以进行测试

    “ stomp.py”是一个Python客户端库,用于使用协议( , 和 )访问消息传递服务器(例如 , 或 )。 它也可以作为独立的命令行客户端运行以进行测试。 注意:Stomp.py已正式结束对Python2.x的支持。 有关更多信息,...

    .net 连接HornetQ,需要的dll

    下面将详细介绍如何使用这些库来实现与HornetQ的连接以及进行消息的发送和接收。 1. 引入库:首先,你需要在你的.NET项目中引入Apache.NMS.Stomp.dll这个库。这可以通过NuGet包管理器完成,搜索并添加Apache.NMS....

    基于SpringBoot+STOMP协议实现的web聊天室.zip

    标题 "基于SpringBoot+STOMP协议实现的web聊天室.zip" 描述了一个使用SpringBoot框架和STOMP(Simple Text Oriented Messaging Protocol)协议构建的Web聊天室项目。这个项目不仅包含了源代码,还提供了项目说明文档...

    原生Stomp.js文件,stomp协议,websocket使用tcp协议

    stomp协议

    电信设备-基于STOMP协议的通信方法和装置.zip

    STOMP(Simple (or Streaming) Text Oriented Messaging Protocol)是一种简单文本导向的消息协议,常用于在客户端和消息代理之间进行通信。在电信设备领域,STOMP协议被广泛应用于实现高效、可靠的异步通信,尤其在...

    stomp协议脚本

    STOMP协议有客户端脚本,STOMP协议有客户端脚本。。。。

    基于Java语言的STOMP协议长连接消息收发设计源码

    本项目为基于Java语言的STOMP协议长连接消息收发设计源码,包含66个文件,涉及32个Java源文件、6个JSON配置文件、5个Gradle构建脚本、5个XML配置文件以及少量其他辅助文件。该系统专注于实现STOMP协议的长连接消息...

    Python库 | django-stomp-1.0.4.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:django-stomp-1.0.4.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    rabbitmq+vue+stomp.zip

    STOMP(Simple Text Oriented Messaging Protocol)是一种简单文本消息协议,它允许应用程序通过标准的TCP连接与消息代理进行交互,如RabbitMQ。STOMP提供了一种统一的接口,使得多种语言和平台可以方便地与消息...

    php实现通过stomp协议连接ActiveMQ操作示例

    标题中的"php实现通过stomp协议连接ActiveMQ操作示例"指的是使用PHP编程语言通过STOMP(Simple Text Oriented Messaging Protocol)协议与Apache ActiveMQ消息代理进行交互的实践教程。ActiveMQ是Apache软件基金会...

    stompest-mq python

    标题中的"stompest-mq python"指的是使用Python语言实现的Stomp协议库,用于消息队列(MQ)通信。Stomp是一种简单易用、跨平台的网络协议,专门用于在分布式环境中传输消息。在Python中,stompest库提供了与Stomp...

Global site tag (gtag.js) - Google Analytics