网络连接器
网络连接器是两个broker之间的通道,让他们可以互相交互.一个网络连接器默认是单向的称作转发桥.但是ActiveMq提供一种双工连接器能保证用一个通道接送发送信息.
下图是一个包含两种工作方式的网络代理例子
网络连接器的配置通过以下节点来配置
< networkConnectors >
< networkConnector name="default-nc" uri="multicast://default" />
</ networkConnectors >
在深入理解之前,我们需要先理解一个概念 discovery(挖掘).它是一个可以侦测远程broker的进程.Client和broker都需要发现远程的broker,以便生成网络代理.
静态网络连接器
使用静态化配置网络连接需要通过配置一系列可用的broker URL,首先你得知道所有的URL,协议使用一种复合的URL(url中带着url).一个复合url中包含多个broker地址。
例如:static:(uri1,uri2,uri3,...)?key=value,具体的xml配置如下
<networkConnectors> <networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/> </networkConnectors>
以下图片是两个broker在静态网络之间交换数据的例子.消费者附着在brokerB上,生产者附着在brokerA上,生产者生产的消息会被brokerA转发到brokerB上供消费者消费.
让我们来配置这个例子,首先你需要启动两个broker.我们先启动brokerB.brokerB的配置如下.只需修改activemq.xml中transportConnectors节点配置
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://localhost:61617" /> </transportConnectors>
然后再来配置brokerA.brokerA配置如下.
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="BrokerA" dataDirectory="${activemq.base}/data"> ...... <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" /> </transportConnectors> <networkConnectors> <networkConnector uri="static:(tcp://localhost:61617)" /> </networkConnectors> .... </broker>
启动brokerA,会发现报错,因为brokerB启动的时候会启动jetty,brokerA启动的时候也会启动jetty,端口冲突了.只需要改变jetty.xml中的端口8186为8162即可.
这时使用三个java类来进行测试.一个生产者,两个消费者.两个消费者分别附着于不同的broker端口.会发现两个消费者都接收到了生产者的消息.
public class SimpleTopicProducer {
private static String brokerURL = "tcp://localhost:61616";
private static ConnectionFactory factory= null;
private static Connection connection= null;
private static Session session = null;
private static Destination destination = null;
private static MessageProducer mp = null;
public static void main(String[] args) throws JMSException {
try{
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("topic");
mp = session.createProducer(destination);
//发送10条信息到topic
for(int i = 0 ; i < 10 ;i++){
Message message = session.createTextMessage("hah");
mp.send(message);
}
}finally{
mp.close();
session.close();
connection.close();
}
}
}
/**
* 异步接受消息
* @author cfzhou
*
*/
public class SimpleTopicResumerAsync {
private static String brokerURL = "tcp://localhost:61616";
private static ConnectionFactory factory= null;
private static Connection connection= null;
private static Session session = null;
public static void main(String[] args) throws Exception {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
new Thread(new ResumerThread()).start();
}
private static class ResumerThread implements Runnable{
@Override
public void run() {
for(int i = 0 ;i < 3;i++){
try {
Destination destination = session.createTopic("topic");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
public class SimpleTopicResumerAsync2 {
private static String brokerURL = "tcp://localhost:61617";
private static ConnectionFactory factory= null;
private static Connection connection= null;
private static Session session = null;
public static void main(String[] args) throws Exception {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
new Thread(new ResumerThread()).start();
}
private static class ResumerThread implements Runnable{
@Override
public void run() {
for(int i = 0 ;i < 3;i++){
try {
Destination destination = session.createTopic("topic");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
试想一种情况.有一个本地的broker,有许多远程的客户端需要连接这个broker.这样会引起很多不必要的网络开销.为了最小化连接,我们可以在每个远程主机上启动一个broker,然后配置远程broker和本地broker的静态网络连接.这样不仅最小化了远程和本地之间的连接数量,还可以让远程的客户端更有效的操作,消除网络中的长连接意味着更少的出错,以及客户端更少的等待时间.
故障转移协议
之前的例子,一个client都是只连接到一个broker上的.如果这个broker出现故障,client要么失效要么连接到另外一个功能相同的broker上去.
有两种方法可以实现故障转移
1.提供一个可用的broker的列表,用于故障转移传输协议.例如:failover:(uri1,...,uriN)?key=value或者failover:uri1,...,uriN
2.动态的发现可用的broker.
这种协议使用一种随机算法选择连接器,如果连接失败,会自动选择其它的url连接.默认的配置实现了延迟重连的逻辑,意味着在10ms后进行第一次重连,之后的重连间隔的时间是前一次的2倍,知道重连时间等于30000ms.
消费者:
package com.zcf.activemq.failoverexample; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class Consumer { private static Logger logger = Logger.getLogger(Consumer.class); private static String brokerURL = "failover:(tcp://localhost:61617,tcp://localhost:61616)";// private static String brokerURL = private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private String jobs[] = new String[] { "suspend", "delete" }; public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException, InterruptedException { Consumer consumer = new Consumer(); for (String job : consumer.jobs) { Destination destination = consumer.getSession().createQueue("JOBS." + job); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { //do something here System.out.println( ((ObjectMessage)message).getObject()); logger.info(((ObjectMessage)message).getObject()); } catch (Exception e) { e.printStackTrace(); } } }); } } public Session getSession() { return session; } }
生产者:
package com.zcf.activemq.failoverexample; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Publisher { private static String brokerURL = "failover:(tcp://localhost:61617,tcp://localhost:61616)"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private transient MessageProducer producer; private static int count = 10; private static int total; private static int id = 1000000; private String jobs[] = new String[]{"suspend", "delete"}; public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher(); while (total < 1) { for (int i = 0; i < count; i++) { publisher.sendMessage(); } total += count; System.out.println("Published '" + count + "' of '" + total + "' job messages"); try { Thread.sleep(1000); } catch (InterruptedException x) { } } publisher.close(); } public void sendMessage() throws JMSException { int idx = 0; while (true) { idx = (int)Math.round(jobs.length * Math.random()); if (idx < jobs.length) { break; } } String job = jobs[idx]; Destination destination = session.createQueue("JOBS." + job); Message message = session.createObjectMessage(id++); System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination); producer.send(destination, message); } }
log4j配置
log4j.rootLogger= debug, stdout log4j.logger.org.apache.activemq=debug ### \u8F93\u51FA\u5230\u63A7\u5236\u53F0 ### log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern =%d{yyyy-MM-dd HH:mm:ss}%m%n
在没启动Activemq之前,启动Consumer.java,会发现一直提示连接失败(61616,61617),每次连接时间都会翻倍
启动两个broker之后,会提示2016-11-24 17:56:24Successfully connected to tcp://localhost:61616.当我们关闭61616的broker.会提示 Successfully reconnected to tcp://localhost:61617。
默认情况下,如果client与broker直接的connection断开,则client会新起一个线程,不断的从url参数中获取一个url来重试连接。这个机制对于在容器(spring)中使用的connection木有问题。
但是对于简单实现的一个独立运行client,一般重连一次就会出现进程退出的bug.
程序退出不再重连的原因在于重连的线程是daemon的,连接出错以后,其他线程都退出了,这个线程也随即被销毁掉了。
官方修复过一次,在 ActiveMQ Connection Executor 上设置了daemon=false,但是这个线程不一定被创建出来。所以bug依然在。
修复的方法很简单:
FailoverTransport.java 的132 行
reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.setDaemon(false); // to set daemon=false by kimmking
reconnectTaskFactory.init();
把重连的线程设置成daemon=false就成。然后再按照上面的步骤来执行,发现多次重启broker,都是可以自动重连的.即使只有一个broker,也可以使用重连机制.
动态网络连接器(用于经常会增加或者删除broker的情况)
ip广播是一种通过ip网络就可以很容易的把数据从源到一组对它感兴趣的接受者的网络机制.ip广播的一个基本概念是所谓的组地址(例如数据源和接受者的IP地址在224.0.0.0和之间的239.255.255.255).源把这些地址作为数据的目的地,接受者使用它来表达对这个组的数据的兴趣.
当ip广播被配置,ActiveMq brokers使用广播协议来公布他们的服务并且定位其它broker服务的地址用来创建代理网络.客户端使用广播来定位brokers以及建议和它们的连接.广播形式的URL如下;
multicast://ipadaddress:port?key=value
具体的配置如下:
<broker xmlns="http://activemq.apache.org/schema/ core" brokerName="multicast" dataDirectory="${activemq.base}/data"> ... <networkConnectors> <networkConnector name="default-nc" uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> </transportConnectors> ... </broker>
在上面的例子中,使用群组名称"default"来替代具体的IP地址.上面的配置代码片段中有两个地方比较重要.首先,transport connector的discoveryUri属性用于暴露这个传输连接器的URI到名称为default的群组中.所有的希望查找可用代理的客户端都可以使用这个代理。
network connector的uri属性用于查找可用的代理并与之建立代理网络.这样配置后,代理就像客户端一样,使用多点传送协议来查找其他代理。
,移除discoveryUri属性,客户端就无法通过多点协议扫描到代理,
用多点传送协议的一个缺点是侦测是自动的.如果你不想把某个代理添加到群组中,你必须十分小心的设置。
自动侦测协议
这种传输连接器是配置在客户端的.它和故障重连很相似,唯一不同的地方是它使用广播去发现可用的broker并随机选择其中之一.发现协议的URL如下:
discovery:(multicast://default)
点对点协议(PEER PROTOCOL)
它是VM连接器的一个超集,它会建立一个对等的内嵌网络代理
点对点协议的URI语法如下:
peer://peergroup/brokerName?key=value (peer://group1 )
group1代表组名,内嵌代理会与组名是group1的代理之间建立网络通信
FanOut连接器
Fanout是一种通信器群组,用于使得客户端可以同时连接到多个代理并对这些代理进行相同的操作.
Fanout协议的URI语法如下: fanout:(fanoutURI)?key=value
fanoutURI值可以使用静态的URI或者多点传送URI.参考下面的示例: fanout:(static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)) 客户端将尝试连接掉3个使用静态协议配置的静态代理.
使用动态效果 fanout:(multicast://default) 此协议的目的是发送消息到多个代理.其次,如果你使用的代理属于同一个代理网络,那么指定的消息消费者可能会接收到重复的消息.因此,通常情况下,fanout协议仅用于发布消息到多个相 互之间没有连接在一起的代理.即多个代理之间的独立的。
各个协议的总结
相关推荐
本文将详细解释计算机网络第五版第四章网络层的相关知识点,涉及网络层向上提供的服务、网络互连、转发器、网桥、路由器和网关的区别、IP、ARP、RARP、ICMP协议、IP地址分类和特点等内容。 一、网络层向上提供的...
计算机网络第五版第四章主要探讨的是网络层的关键概念和服务,包括虚电路服务和数据报服务。这两种服务在互联网中承担着不同的角色。 虚电路服务是一种面向连接的服务,它在数据传输之前需要预先建立连接。这种服务...
本章主要讲解了局域网中的网络连接与主干网络技术,涵盖了FDDI、同步与分组传送模式、ATM技术以及以太网技术等多个方面。 首先,FDDI(Fiber Distributed Data Interface)是一种高速令牌环网络技术,设计用于主干...
【计算机网络习题第二章到第五章答案】的解析涵盖了计算机网络基础知识,特别是物理层的相关概念,以及数据通信系统的模型和通信方式。以下是对这些知识点的详细阐述: 1. 物理层的主要任务:物理层关注如何在各种...
- 局域网通过转发器或网桥连接,仍视为同一网络,共享相同的网络号。 - IP地址区分网络层面的逻辑连接,而MAC地址是物理链路层的标识。 6. **IP地址与MAC地址的区别**: - IP地址是全局唯一的32位逻辑地址,用于...
本章主要聚焦于Java如何处理网络连接,下面将详细阐述相关知识点。 1. **Socket编程**: - **Socket概念**:Socket是Java提供的在网络间进行通信的接口,它是进程间通信的一种方式,可以理解为通信的端点。 - **...
通过学习《谢希仁--计算机网络教程第五版》的第一章,我们可以建立起对计算机网络的初步认识,为后续深入学习网络原理和技术打下坚实的基础。无论是对计算机科学的学生还是IT从业者来说,理解和掌握这些基础知识都是...
《计算机网络与通信技术》的第四章和第五章主要探讨了网络互联与广域网的相关知识。网络互联是现代通信和信息技术的基础,它使得分布在不同地理位置的网络和设备能够相互连接,形成一个更大的网络系统,实现资源共享...
光纤通信系统是现代通信技术的重要组成部分,它...了解并掌握不同类型的连接器及其性能参数,对于设计、安装和维护光纤通信网络至关重要。随着技术的进步,光纤连接器的性能将持续优化,满足不断增长的数据传输需求。
总的来说,华为内部培训资料之第二章“常见网络接口与电缆”涵盖了网络连接的基础知识,包括接口类型、电缆种类及其应用,这些都是网络工程师必备的专业技能。对于网络学习爱好者来说,掌握这些内容将极大地提升对...
网络设备主要分为物理层设备和数据链路层设备,物理层设备包括中继器和集线器,它们共同作用于网络信号的放大和传输,而数据链路层设备主要是网卡,它负责设备与网络介质的物理连接。 具体到网络设备的应用,我们...
【谢希仁--计算机网络教程第五版--第二章】这一章节主要聚焦于计算机网络的基础知识,特别是物理层的理论与实践。物理层是OSI(开放系统互连)模型的最底层,它负责数据的原始比特传输,是网络通信的起点。 在...
《计算机网络(第五版 谢希仁)》是一本深入浅出地介绍计算机网络的教材,涵盖了网络领域的核心概念和技术。课后习题答案则是对书中理论知识的实践检验和深化理解的重要辅助资料。以下是根据标题和描述所涉及的知识点...
综上所述,这份技术规范书为馈线连接器的产品采购设定了严格的性能和质量标准,旨在确保中标产品能满足中国移动通信集团北京有限公司的运营需求,保证通信网络的高效、稳定运行。投标方需严格遵循这些标准,提供符合...
【第5章 远程I/O连接及PLC通信网络组态】 本章主要探讨了在工业自动化领域中,如何实现远程I/O连接以及PLC(可编程逻辑控制器)之间的通信网络组态。以下是相关知识点的详细说明: 1. **CIP(Control and ...
第六章网络基础知识复习题主要涵盖了计算机网络的基本概念、硬件组成、网络分类、拓扑结构、操作系统、资源共享、网络协议以及电子邮件等内容。 1. 局域网络(LAN)硬件组成包括网络服务器、个人计算机工作站、网络...