- 浏览: 467559 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (371)
- Java (44)
- Flex (25)
- Design Pattern / UML (5)
- JVM (14)
- Software Engineer (4)
- Testing (30)
- Web Backend (60)
- Linux (7)
- Database (11)
- SOA (28)
- English (6)
- FX (5)
- SAP Support (25)
- SAP Development (5)
- SAP Impl & Oprn (8)
- Articles (15)
- Music (4)
- Recipe (14)
- Multi-thread (4)
- Lucene (2)
- CS basic / Algorithm (12)
- Tour (3)
- C C++ (21)
- Others (10)
- Code Repository C++ (1)
- Code Repository Java (1)
- Code Repository Algorithm (1)
- Code Repository SQL (1)
- Web Frontend (4)
最新评论
-
xiangxm:
Java 强引用、 软引用、 弱引用、虚引用 -
nml-lyb:
123
Mule 的第一个例子 -
nml-lyb:
http://www.baidu.com
Mule 的第一个例子 -
white___ant:
...
Java 强引用、 软引用、 弱引用、虚引用 -
joeyon:
...
Java NIO介绍
例一:
1、首先 启动 jms server。 通过 bin/activemq.bat
2、在 server 上 创建一个queue:
先登陆 http://localhost:8161/admin, 再选
(在 这个 queue 中,通过控制台,可以看到具体的收到的 信息)
3、producer 和 consumer 的代码如下:
TCPQueueSender.java
package com.bobo.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPQueueSender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("zcy_queue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPQueueSender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("zcy_queue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <=SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("This is the ActiveMq Message " + i);
System.out.println("system out " + i);
producer.send(message);
}
}
}
TCPQueueReceiver.java
package com.bobo.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPQueueReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("zcy_queue");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (null != message) {
System.out.println("Receive " + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPQueueReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("zcy_queue");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (null != message) {
System.out.println("Receive " + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
--------------------------------------------------------------------
TCPTopicSender.java
package test.jmsnew;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPTopicSender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//destination = session.createQueue("zcy_queue");
destination = session.createTopic("zcy_topic");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <=SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("This is the ActiveMq Message " + i);
System.out.println("system out " + i);
producer.send(message);
}
}
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPTopicSender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//destination = session.createQueue("zcy_queue");
destination = session.createTopic("zcy_topic");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <=SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("This is the ActiveMq Message " + i);
System.out.println("system out " + i);
producer.send(message);
}
}
}
TCPTopicReceiver.java
package test.jmsnew;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPTopicReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//destination = session.createQueue("zcy_queue");
destination = session.createTopic("zcy_topic");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (null != message) {
System.out.println("Receive " + message.getText());
} else {
//break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TCPTopicReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//destination = session.createQueue("zcy_queue");
destination = session.createTopic("zcy_topic");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (null != message) {
System.out.println("Receive " + message.getText());
} else {
//break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
对于queue来说
,produce 和 consume 的顺序没关系。 先produce一个消息,后consume这个消息 可以。反之,也可。
对于topic来说
,先注册consume,再produce; consume才可以收到消息。
-------------------------------------------------------------------------------------
需要的4个jar 包:
activemq-core-5.2.0.jar
commons-logging.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
jms.jar
可以直接运行例子。
例二: QueueTest.java
package test.jms;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class QueueTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// consumer1
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// consumer2
MessageConsumer comsumer2 = session.createConsumer(queue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// producer
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class QueueTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// consumer1
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// consumer2
MessageConsumer comsumer2 = session.createConsumer(queue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// producer
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
例三:TopicTest.java
package test.jms;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class TopicTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Topic topic= new ActiveMQTopic("topic_test");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// consumer1
MessageConsumer comsumer1 = session.createConsumer(topic);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// consumer2
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// producer
MessageProducer producer = session.createProducer(topic);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
Thread.sleep(3000);
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class TopicTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Topic topic= new ActiveMQTopic("topic_test");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// consumer1
MessageConsumer comsumer1 = session.createConsumer(topic);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// consumer2
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// producer
MessageProducer producer = session.createProducer(topic);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
Thread.sleep(3000);
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
例四
VMQueueTestProducer.java
package test.jmsnew;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class VMQueueTestProducer {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// producer
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class VMQueueTestProducer {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// producer
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
VMQueueTestConsumer.java
package test.jmsnew;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class VMQueueTestConsumer {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// consumer1
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class VMQueueTestConsumer {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// consumer1
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
例五: DeliveryMode用法
DeliveryModeSendTest.java
package test.jms;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class DeliveryModeSendTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class DeliveryModeSendTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
// 这里 设置了 persistent, 在关掉这个connection后,consumer仍然可以收到这条消息。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("A persistent Message"));
// 这里 设置了 non_persistent, 在关掉这个connection后,consumer就收不到这条消息了。
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage("A non persistent Message"));
System.out.println("Send messages sucessfully!");
}
}
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("A persistent Message"));
// 这里 设置了 non_persistent, 在关掉这个connection后,consumer就收不到这条消息了。
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage("A non persistent Message"));
System.out.println("Send messages sucessfully!");
}
}
DeliveryModeReceiveTest
package test.jms;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class DeliveryModeReceiveTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(2000);
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class DeliveryModeReceiveTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(2000);
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
connection.close();
}
}
}
例六:JMSReplyTo的用法
package test.jmsnew;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class MessageSendReceiveAndReply {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class MessageSendReceiveAndReply {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
// 第一个 queue
Queue queue = new ActiveMQQueue("testQueue");
Queue queue = new ActiveMQQueue("testQueue");
// 用来接受回复的 queue
Queue replyQueue = new ActiveMQQueue("replyQueue");
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 设置 往 第一个 queue 发的消息
Message message = session.createTextMessage("Andy");
Queue replyQueue = new ActiveMQQueue("replyQueue");
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 设置 往 第一个 queue 发的消息
Message message = session.createTextMessage("Andy");
// 给 消息 设定 返回的 queue
message.setJMSReplyTo(replyQueue);
MessageProducer producer = session.createProducer(queue);
message.setJMSReplyTo(replyQueue);
MessageProducer producer = session.createProducer(queue);
// 发送 这个 消息
producer.send(message);
// 在 第一个 queue 上,消费这个消息:“Andy”
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
producer.send(message);
// 在 第一个 queue 上,消费这个消息:“Andy”
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
// 往 replyQueue上,发送返回的消息
MessageProducer producer = session.createProducer(m.getJMSReplyTo());
producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));
} catch (JMSException e1) {
e1.printStackTrace();
}
}
});
// 在 replyQueue上,消费 消息
MessageConsumer comsumer2 = session.createConsumer(replyQueue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println(((TextMessage) m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
MessageProducer producer = session.createProducer(m.getJMSReplyTo());
producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));
} catch (JMSException e1) {
e1.printStackTrace();
}
}
});
// 在 replyQueue上,消费 消息
MessageConsumer comsumer2 = session.createConsumer(replyQueue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println(((TextMessage) m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
例七:Selector 的用法
一种过滤接受消息的方法
package test.jms;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class JMSSelectorTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer comsumerA = session.createConsumer(queue, "receiver = 'A'");
comsumerA.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
System.out.println("ConsumerA get " + ((TextMessage) m).getText());
} catch (JMSException e1) {
}
}
});
MessageConsumer comsumerB = session.createConsumer(queue, "receiver = 'B'");
comsumerB.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
System.out.println("ConsumerB get " + ((TextMessage) m).getText());
} catch (JMSException e) {
}
}
});
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
String receiver = (i % 3 == 0 ? "A" : "B");
TextMessage message = session.createTextMessage("Message" + i + ", receiver:" + receiver);
message.setStringProperty("receiver", receiver);
producer.send(message);
}
}
}
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class JMSSelectorTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Queue queue = new ActiveMQQueue("testQueue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer comsumerA = session.createConsumer(queue, "receiver = 'A'");
comsumerA.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
System.out.println("ConsumerA get " + ((TextMessage) m).getText());
} catch (JMSException e1) {
}
}
});
MessageConsumer comsumerB = session.createConsumer(queue, "receiver = 'B'");
comsumerB.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
System.out.println("ConsumerB get " + ((TextMessage) m).getText());
} catch (JMSException e) {
}
}
});
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
String receiver = (i % 3 == 0 ? "A" : "B");
TextMessage message = session.createTextMessage("Message" + i + ", receiver:" + receiver);
message.setStringProperty("receiver", receiver);
producer.send(message);
}
}
}
发表评论
-
java8 Stream Lazy 解释 (非实现原理)
2015-07-22 07:49 1582http://blog.csdn.net/dm_vincen ... -
java8 系列2 深入理解Java 8 Lambda(类库篇——Streams API,Collectors和并行)
2015-07-22 07:00 2481http://zh.lucida.me/blog/java ... -
java8 系列1 深入理解Java 8 Lambda(语言篇——lambda,方法引用,目标类型和默认方法)
2015-07-22 06:58 1067http://zh.lucida.me/blog/java- ... -
Java 8 Stream探秘 具体实现
2015-07-22 06:31 896http://colobu.com/2014/11/18/J ... -
Arrays.asList() 的了解
2015-07-21 23:05 1161为什么Arrays.asList() 返回的list是 i ... -
java8 Lambda expressions
2015-07-14 13:09 876The single most important chan ... -
java8 new features
2015-07-08 02:06 700http://www.javacodegeeks.com/2 ... -
ReentrantLock与synchronized
2014-09-02 23:34 548http://uule.iteye.com/blog/148 ... -
PriorityQueue的使用
2014-05-29 12:14 958http://blog.csdn.net/yangzhon ... -
java的移位操作详解,左移和右移 << >>, 补码计算
2014-04-17 10:28 807---------------------补码--- ... -
深入Java集合学习系列:ArrayList的实现原理
2014-02-13 09:04 565http://zhangshixi.iteye.com/bl ... -
java集合类 ------------ 实现 细节
2014-02-12 23:12 532深入Java集合学习系列:ArrayList的 ... -
两种简单的方式快速实现hashCode 和 equals方法
2014-02-04 10:46 647http://hi.baidu.com/coolcat_p ... -
集合类 Set
2013-12-30 03:50 504http://www.cnblogs.com/yuchao ... -
java1.5新特性
2013-10-23 10:59 6701.自动包装和解包(Autoboxing and unboxi ... -
正则表达式30分钟入门教程
2013-10-22 00:21 543http://blog.csdn.net/cumt168/a ... -
总结Java方法(函数)传值和传引用的问题
2013-09-16 08:38 614http://java.chinaitlab.com/bas ... -
有状态,无状态对象是什么概念
2013-06-21 06:00 1119http://blog.csdn.net/showwair/ ... -
Dom4j的使用(全而好的文章)
2013-06-09 21:12 541http://xhy0422.iteye.com/blog/ ... -
serialVersionUID 用来表明类的不同版本间的兼容性
2013-06-03 23:12 815http://blog.csdn.net/hulefei29 ...
相关推荐
下面我们将通过一个简单的Java JMS代码示例来理解其基本用法: 1. **JMS提供者选择** 在开始之前,你需要选择一个JMS提供者,例如Apache ActiveMQ、IBM WebSphere MQ或RabbitMQ。这里假设我们使用ActiveMQ,首先...
在实际开发中,开发者会使用JMS API或者第三方库(如Spring JMS)来简化上述步骤,提高代码的可读性和可维护性。通过学习和理解JMS的基础知识,我们可以构建出稳定、可靠的分布式系统,实现不同服务间的解耦和异步...
`javax.jms.jar` 包是实现JMS规范的核心库,包含了JMS接口和实现类,使得开发者可以轻松地在Java应用程序中集成消息服务。 JMS主要包含两种消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe...
3. **JMS服务器集成**:JMS Storage库集成了JMS协议,使得前端应用可以直接与JMS服务器通信,实现消息的发布和订阅,这对于实时数据更新和分布式系统中的数据同步至关重要。 4. **安全与加密**:考虑到数据的安全性...
5. **示例代码**:示例代码可以帮助开发者快速理解和开始使用库,展示如何初始化JMS连接,发送和接收消息。 在实际使用`jms-deploy`时,开发者需要注意以下几点: - **安全**:确保JMS服务器的连接信息在传输过程...
`jms-1.1.jar` 是一个包含了JMS 1.1版本规范实现的库文件,是开发基于JMS的应用程序所必需的依赖。 JMS的核心概念主要包括以下几点: 1. **消息**:在JMS中,消息是数据传输的基本单元,它封装了要传递的信息。...
标题中的"jms.rar_jar j_java jms_jms_jms jar_jms.j"可能是指一个包含JMS相关代码和资源的压缩文件,其中包含了`.jar`文件,可能是实现JMS功能的库或示例代码。"jms.jar"可能是一个封装了JMS API的Java类库,而"jms...
6. **编写生产者和消费者代码**:使用JMS API(如`javax.jms.Connection`,`Session`,`MessageProducer`,`MessageConsumer`等)来创建和发送消息,以及接收和处理消息。 在描述中提到的jar包下载,通常包括以下...
5. **配置应用程序**:在应用中引入JMS相关的依赖库,如JMS API、Spring框架的JMS支持和其他必要的库(如WebLogic客户端库)。 6. **编写接收端和发送端代码**:接收端通过监听器等待消息,发送端则负责创建消息并...
“工具”标签则可能意味着ActiveMQ作为工具被介绍,同时博主可能也提到了其他与JMS相关的工具或库。 **文件名称列表:** 1. **JMS、XML和异构企业--使用JMS和XML来促进企业应用的互操作性.doc** - 这份文档可能...
Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准API。它允许应用程序创建、发送、...记得在部署环境中添加这些依赖,并在代码中正确引用,以实现高效、可靠的JMS通信。
以上代码展示了如何通过JMS API与ActiveMQ交互,创建一个连接,发送一条消息到名为"MyQueue"的队列,并从该队列接收一条消息。在实际应用中,你可能需要处理异常、使用事务、设置消息属性等更复杂的功能。 总结起来...
在上述代码中,我们首先从数据库获取模型,然后使用 JMS 序列化器将模型对象转换为 JSON 格式,最后通过 Laravel 的内置响应助手返回给客户端。 总之,JMS Format Laravel 为 Laravel 开发者提供了一种强大的工具,...
在标签中提到的"jms-1.1.jar"是JMS 1.1版本的实现库,它是Java应用程序使用JMS时需要的依赖。而"消息队列"和"ActiveMQ"是实际的消息中间件产品。消息队列是一种软件系统,用于存储和转发消息,确保消息的可靠传输。...
1. 添加依赖:在项目中,我们需要引入ActiveMQ的JMS客户端库。这通常可以通过Maven或Gradle的依赖管理来完成。 2. 创建连接工厂:使用`ActiveMQConnectionFactory`创建一个连接工厂,这将用于建立与ActiveMQ服务器...
在描述中提到的"activemq实现的jms",意味着JavaScript代码可能使用了ActiveMQ提供的API或者基于Stomp协议的库,来创建连接、发布/订阅消息、创建队列或主题等。 在**标签**中,"js activemq 源码"进一步确认了我们...
标题中的"JMS所需jar包"指的是实现JMS规范所必需的库文件,这些jar包通常包含了JMS API和其他相关的类库,使得开发者可以在Java环境中使用JMS功能。在描述中提到的"官方最新版 JMS 5.11jar包",可能是指ActiveMQ的一...
首先,我们需要导入相关的JMS和Java标准库。在代码中,`Main`类展示了如何创建一个JMS Topic消息生产者。通过`InitialContext`实例化上下文,我们可以查找并获取连接工厂和目的地。 2. **获取连接工厂**: 这是...
4. **增强应用程序的便携性**:JMS的应用程序可以更容易地移植到不同的消息中间件上,提高了代码的复用性。 #### 二、JMS模型详解 JMS定义了一组接口和相关的语义,这些接口和语义规定了Java程序如何与消息中间件...
ActiveMQ提供了多种客户端库,包括Java、C++、Python等,这些库实现了JMS接口,使得开发人员可以方便地在各种环境中使用ActiveMQ。在视频教程中,你将看到如何在Java项目中集成ActiveMQ客户端,以及如何编写代码来...