`
jiq408694711
  • 浏览: 36515 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

利用MSMQ发送消息(对象)到NServiceBus终结点(不采用Send-Only方式)

 
阅读更多

待解决的问题:

某些应用程序需要异步发送消息到远端机器的NServiceBus终结点,但是又不希望知道接收并处理异步消息的是NServiceBus这种ESB终结点,也许异步消息处理者将来会换成其他的ESB终结点,所以不能采用Send-Only的方式,否则应用程序(消息发送方)还是需要引入NserviceBus.dll等类库。


所以我的思路是:

应用程序调用我的接口发送消息到指定的私有消息队列,然后NServiceBus终结点有一个线程,接收这样一个消息,这个消息没有实现IComman/Imessage接口,所以还要再将这个消息转换为NServiceBus能够处理的消息类型(实现ICommand,IMessage等接口,可以出发MessageHandler调用),然后将这个消息通过:

Bus.Send("EndPointName@localhost", message);

发送给本机的NServiceBus终结点。

后面会展示详细的代码。。。。


其实我还想过提供给应用程序的接口是一个WCF服务,服务的实现中采用Send-Only的方式将接收到的消息转换为NServiceBus能够识别的消息,然后采用Send方式发送到本机的NServiceBus终结点,但是我试了一下,WCF服务的实现中配置Send-Only方式那段代码会抛出一个很奇怪的异常,到现在我还没有分析出来为什么。


所以综上所述,提供给应用程序一个接口,将消息发送到我的ESB终结点,同时又没有任何耦合,我们采用的方式是直接操作消息队列,代码如下(关键片段),如要下载我的完整代码,地址为百度云盘


提供给应用程序的接口,包括消息定义,和消息发送接口:

namespace MSMQMessageSender
{
    [Serializable]
    public class AsynMessage
    {
        public AsynMessage() { }

        public int id { set; get; }
        public string body { set; get; }
    }
}
public static bool SendMessage(AsynMessage msg)
        {
            try
            {
                MessageQueue msmq = new MessageQueue(@"FormatName:Direct=TCP:192.1.11.186\Private$\NServiceBus.EndPoint"); //这里的消息队列地址是能够将消息发送到远端机器的消息队列的关键。
                msmq.Formatter = new XmlMessageFormatter(new Type[] { typeof(AsynMessage) });

                System.Messaging.Message message = new System.Messaging.Message();
                message.Label = "消息标题";
                message.Body = msg;
                msmq.Send(message);
            }
            catch (Exception ee)
            {
                Console.WriteLine("发送消息出现异常,原因是:" + ee.Message);
                return false;
            }
            return true;
        }


下面是NserviceBus中,首先是启动的时候需要创建消息队列:

namespace CBIP.Server
{
    class EndConfigPoint : IConfigureThisEndpoint, AsA_Publisher { }

    class ConfiguringTheDistributorWithTheFluentApi : INeedInitialization
    {
        public void Init()
        {
            if (!MessageQueue.Exists(@".\Private$\NServiceBus.EndPoint"))
            {
                System.Messaging.MessageQueue.Create(@".\Private$\NServiceBus.EndPoint");
            }
        }
    }
}

然后定义NserviceBus中能够识别的消息:

namespace NServiceBus.Messages
{
    [Serializable]
    public class CBIPAsynMessage : ICommand
    {
        public int id { get; set; }
        public string body { get; set; }
    }
}

然后是在NServiceBus的Start()方法中开启一个轮训线程,轮训消息队列中的消息:

namespace CBIP.Server
{
    public class ServerEndPoint : IWantToRunWhenBusStartsAndStops
    {
        public IBus Bus { get; set; }

        public void Start()
        {
            Thread thread = new Thread(MyThread);
            thread.Start();        
        }

        public void MyThread()
        {

            while (true)
            {
                MessageQueue MQ = new MessageQueue(@".\Private$\NServiceBus.EndPoint");

                //调用MessageQueue的Receive方法接收消息
                System.Messaging.Message message = null;
                try
                {
                    message = MQ.Receive(TimeSpan.FromSeconds(5));
                }
                catch (MessageQueueException ee)
                {
                    message = null;
                    Console.WriteLine("超时:" + ee.Message);
                }

                if (message != null)
                {                    
                    message.Formatter = new XmlMessageFormatter(new Type[] { typeof(MSMQMessageSender.AsynMessage) });
                    AsynMessage msg = (AsynMessage)message.Body;
                    Console.WriteLine(msg.id + ", " + msg.body);
                    
                    CBIPAsynMessage cbipMsg = new CBIPAsynMessage() //转换成NServiceBus能够识别的消息
                    {
                        id = msg.id,
                        body = msg.body
                    };
                    Bus.Send("CBIP.Server@localhost", cbipMsg);	//发送到本机NServiceBus节点
                    Console.WriteLine("发送到ESB完成");
                    
                }
                else
                {
                    Console.WriteLine("没有找到消息!");
                }                
            }
        }

        public void Stop()
        {

        }

    }
}

就这样在NServiceBus的消息处理者中就能正确收到消息了:

namespace CBIP.Server
{
    public class AsynMessageHandler : IHandleMessages<CBIPAsynMessage>
    {
        public void Handle(CBIPAsynMessage message)
        {
            Console.WriteLine("AsynMessageHandler收到一个消息");
        }
    }
}


直接以进程的方式启动NServiceBus终结点,上面的代码是可以正确工作的,但是当我将我的NServiceBus终结点安装位Windows NT服务之后,问题来了,轮询线程无论如何也收不到消息,然后我去查看消息队列NServiceBus.EndPoint,发现里面是有消息的,那就是轮询线程没办法取出来。


后来调试才发现时没有访问权限,错误是“Access to Message Queuing System id denied”

网上查了一下是需要在创建消息队列的时候,给指定用户配置访问权限的,NServiceBus中创建队列的代码改为下面这样就可以了:

if (!MessageQueue.Exists(@".\Private$\NServiceBus.EndPoint"))
{
        MessageQueue mq = System.Messaging.MessageQueue.Create(@".\Private$\NServiceBus.EndPoint");
	mq.SetPermissions("Administrator", MessageQueueAccessRights.FullControl);
}



======================================= 华丽的分割线 ==============================

另外,正常来说,NserviceBus的Master终结点安装了NServiceBus的软件,有实现了IConfigureThisEndpoint, AsA_Publisher { }这两个接口的类,有消息处理者,寄宿到NServiceBus.Host.exe,填写好命令行参数NServiceBus.Integration和NServiceBus.Master,并且电脑安装了消息队列之后,是可以没有问题地启动起来,并自动创建消息队列的,我的项目中开始也是这样,但是不知道为什么一段时间之后启动报错:“无法创建消息队列,或者没有对应权限”。


这个问题还没找到原因,目前采用的办法只能是显示判断是否存在消息队列,不存在就显示创建一个。

分享到:
评论

相关推荐

    MSMQ使用总结(事务性、线程监控、文件发送接收和远程信息交互)

    在本文中,我们将深入探讨MSMQ的核心特性,包括事务性消息传输、单线程监控、文件发送与接收以及远程信息交互。** ### 1. 事务性消息传输 事务性消息传输是MSMQ的关键特性之一,它确保了消息的可靠性和一致性。在...

    msmq.rar_java msmq_java 消息队列_java消息队列_msmq_消息队列

    接着,你可以创建一个QueueSender对象来发送消息,和一个QueueReceiver对象来接收消息。以下是一个简单的示例: ```java import javax.jms.*; QueueConnectionFactory factory = new QpidJmsConnectionFactory(...

    ESB--NServiceBus

    5. **消息分发和路由**:NServiceBus可以智能地路由消息到正确的接收方,同时支持多级消息转发和消息代理。 6. **版本控制和迁移**:NServiceBus允许服务独立升级,通过兼容旧版本的消息格式,确保在服务更新时不会...

    基于消息队列-MSMQ 的开发

    - 发送消息:首先需要建立与目标队列的连接,设定消息格式,然后提供消息内容,最后调用`Send`方法发送消息。 - 序列化消息:MSMQ支持多种消息格式,如XML、二进制和ActiveX。通常,我们使用`XmlMessageFormatter`...

    WPF实现MSMQ的接收和发送

    使用MSMQ开发分布式应用  首先,实现服务器端。创建一个控制台项目,添加System.Messaging引用,因为消息队列的类全部封装在System.Messaging.dll程序集里。 界面采用WPF开发

    C#MSMQ 消息队列工具

    使用`MessageQueue.Send`方法可以将消息发送到队列,该方法接受`Message`对象和可选的队列传输选项。消息内容可以是任何.NET兼容的数据类型,通过`Message.Body`属性设置。 5. **接收消息** 接收消息通常有两种...

    NServiceBus-develop

    1. **配置**: 配置是使用 NServiceBus 的第一步,包括设置消息传输机制(如 MSMQ 或 RabbitMQ)、错误处理策略等。 2. **消息契约**: 定义消息(Messages)和事件(Events)的契约,确保服务间通信的数据一致性。 3....

    MSMQ微软消息队列收发信息源码

    它与电子邮件很相似,它们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址,而且它们都不需要发送者和接收者同时在场,就可以将其信息存储在消息队列或邮件服务器中。但是它们是大有区别,其...

    利用windows组件(MSMQ)实现消息队列

    消息队列(Message Queuing,简称MSMQ)是Windows操作系统内置的一个组件,它提供了一种可靠且异步的通信方式,使得应用程序之间可以交换数据,即使它们不在同一时间或者网络状态不佳的情况下也能正常工作。...

    java msmq 接收和发送消息(支持64位)

    MSMQ是一种消息队列系统,允许不同进程或计算机之间异步通信,即使在发送方或接收方不可用时也能确保消息传递的可靠性。在64位操作系统上运行Java应用并与其交互时,需要特定的库来处理兼容性问题。 **Java MSMQ库...

    MSMQ消息队列演示程序

    **MSMQ消息队列演示程序**是一个在Visual Studio 2005环境下使用C#编写的实例,旨在帮助初学者理解和应用消息队列(Message Queuing,简称MSMQ)技术。该程序包含了完整的源代码,以及一个封装好的类,使得消息的...

    msmq消息队列相关

    它允许应用程序在不可用的网络环境中发送消息,确保数据的可靠传输,即使接收方暂时离线也能正常工作。MSMQ的核心概念是消息队列,它通过将消息存储在队列中,待接收方恢复时再进行处理,从而实现了网络的容错性和高...

    MSMQ应用--传命令

    4. **发送消息**:使用`MessageQueue`对象的`Send`方法将消息放入指定的队列中。这一步骤通常是异步的,意味着即使接收端应用程序当前不可用,消息也会被保存在队列中等待处理。 5. **接收消息**:在接收端,应用...

    Msmq自用-请勿下载

    在Java中使用MSMQ通常需要依赖于第三方库,如`jmsmq`或`msmq-jms`,这些库提供了API,使得Java应用能够创建、读取、删除MSMQ队列,并发送和接收消息。64位版本可能意味着这个库或代码是为了在64位操作系统上运行而...

    VC++MSMQ微软消息队列简单使用举例

    在VC++环境中,我们可以利用MSMQ的API接口来实现对消息队列的操作,包括创建队列、发送消息和读取消息。以下是一个简单的VC++使用MSMQ的例子: 1. **创建队列** - 首先,需要确定队列的类型,例如私有队列或公共...

    MSMQ实例源码 C#

    - **Send方法**:向队列中发送消息,可以选择同步或异步发送。 - **Peek方法**:查看队列顶部的消息,但不移除它。 - **TransactionScope类**:用于事务处理,确保消息操作的一致性。 3. MSMQ的高级特性: - **...

    msmq消息队列相关demo

    MSMQ,全称为Microsoft Message Queuing,是微软提供的一种异步通信技术,它允许应用程序在不同时刻通过网络进行通信,即使发送方和接收方在通信时不在线也能完成消息传递。这种技术对于构建可靠且容错的分布式系统...

    MSMQ.rar_MSMQ 取完_msmq_消息队列

    MSMQ允许应用程序在不同时刻、甚至在网络不稳定的情况下发送和接收消息,确保数据传输的可靠性和一致性。在本案例中,"MSMQ.rar_MSMQ 取完_msmq_消息队列"是一个压缩包,包含有关如何使用MSMQ进行业务处理的代码示例...

    .net使用和配置MSMQ(消息队列)的范例和详细讲解

    .NET中的MSMQ(Message Queuing,消息队列)是一种可靠的消息传递机制,它允许应用程序在不同时刻异步通信,即使发送方和接收方在发送或接收消息时可能不在线。MSMQ提供了一种安全、高效的方式,确保消息在分布式...

    微软-消息队列,msmq

    8. **Send方法**:将消息发送到指定的队列。 9. **Purge方法**:清空队列中的所有消息。 #### 四、发送和序列化消息 **消息**是由一个主体和若干属性组成的结构,它可以包含文本、二进制数据等形式的内容,并且...

Global site tag (gtag.js) - Google Analytics