`
sillycat
  • 浏览: 2536121 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

ActiveMQ and Java Consumer

 
阅读更多
ActiveMQ and Java Consumer
Start My Local Server from Console
> bin/activemq console
We can visit the Web Console then
http://127.0.0.1:8161/admin/
Default username/password is admin/admin. We can view and check the Queues.
The project name is  activemq-consumer
The pom.xml is as follow:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sillycat</groupId>
<artifactId>activemq-consumer</artifactId>
<version>1.0.0</version>
<name>ActiveMQ Consumer</name>
<description>ActiveMQ Consumer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath />
</parent>
<properties>
<orika.version>1.5.1</orika.version>
</properties>
<dependencies>
<!-- spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<!-- tools -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>ma.glasnost.orika</groupId>
<artifactId>orika-core</artifactId>
<version>${orika.version}</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>commons-discovery</groupId>
<artifactId>commons-discovery</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<executable>true</executable>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/*IntegrationTest.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<url>http://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<url>http://repo.spring.io/milestone</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<url>http://repo.spring.io/snapshot</url>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<url>http://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
</project>
The implementation class is as follow, ActiveMQServiceImpl.java
package com.sillycat.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class ActiveMQServiceImpl implements ActiveMQService {
    private static String url = "failover:(tcp://localhost:61616)";
    Session session;
    Connection connection;
    MessageProducer producer;
    MessageConsumer consumer;
    private void buildConnection() {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        try {
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("dwh_datawarehouse" + "," + "dwh_datawarehouse-raw");
            this.producer = session.createProducer(queue);
            this.consumer = session.createConsumer(queue);
        } catch (JMSException e) {
            LOGGER.error("JMSException:", e);
        }
    }
    public String consumerMessage() {
        if (consumer == null) {
            this.buildConnection();
        }
        Message message;
        try {
            message = consumer.receive(1000);
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                LOGGER.info("Received: " + text);
                return text;
            } else {
                LOGGER.info("Received: " + message);
                return message.toString();
            }
        } catch (JMSException e) {
            LOGGER.error("JMSException:", e);
        }
        return null;
    }
    public void produceMessage(String message) {
        if (producer == null) {
            this.buildConnection();
        }
        try {
            Message msg = session.createTextMessage(message);
            producer.send(msg);
        } catch (JMSException e) {
            LOGGER.error("JMSException:", e);
        }
    }
    public void releaseResource() {
        try {
            producer.close();
            consumer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            LOGGER.error("JMSException:", e);
        }
    }
    protected final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
}
No need to put the Interface here. Haha, I copy the project from a SpringBoot Project. So I need this class to run it.
package com.sillycat.activemq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
@SpringBootApplication
public class ActiveMQApplication extends SpringBootServletInitializer {
    private static final Logger logger = LoggerFactory.getLogger(ActiveMQApplication.class);
    public static void main(String[] args) throws Exception {
        logger.info("ActiveMQApplication init! ");
        SpringApplication.run(ActiveMQApplication.class);
        logger.info("ActiveMQApplication started! ");
    }
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(ActiveMQApplication.class);
    }
}
The Unit Class ActiveMQServiceTest.java is  as follow:
package com.sillycat.activemq;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActiveMQServiceTest {
    @Autowired
    ActiveMQService activeMQService;
    @Test
    public void testBeanNotNull() {
        Assert.assertNotNull(activeMQService);
    }
    //@Test
    public void testProducer() {
        String message = "{ 'name': 'carl', 'testing':true}";
        activeMQService.produceMessage(message);
        activeMQService.releaseResource();
    }
   
    @Test
    public void testConsumer() {
        String message = activeMQService.consumerMessage();
        LOGGER.info("message:" + message);
        activeMQService.releaseResource();
    }
    protected final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
}



References:
https://sillycat.iteye.com/blog/2433359
分享到:
评论

相关推荐

    ActiveMQ接受和发送工具.rar

    8. **安全性**:ActiveMQ支持多种身份验证和授权机制,包括JAAS(Java Authentication and Authorization Service),可以对用户和连接进行精细的权限控制。 在压缩包中的"ActiveMQ接受和发送工具"很可能包含了一个...

    apache-activemq-5.9.0 下载

    8. **安全性**:ActiveMQ提供了身份验证和授权功能,支持JAAS(Java Authentication and Authorization Service)和基于角色的访问控制(RBAC),确保只有授权的用户和应用程序才能访问消息。 9. **消息过滤**:...

    7道消息队列ActiveMQ面试题!.zip

    消息队列ActiveMQ是Java开发中的重要组件,尤其在分布式系统和高并发场景下,它扮演着关键角色。本文将围绕ActiveMQ展开,基于提供的标题和描述,详细讲解与ActiveMQ相关的七个面试知识点。 1. **什么是消息队列...

    activemq.rar

    Apache ActiveMQ是开源社区中最流行的Java消息代理,也是企业级集成模式的事实标准。它是一个强大的消息中间件,用于处理应用程序之间的通信,使数据能够在分布式环境中可靠地传递。在这个"activemq.rar"压缩包中,...

    ActiveMQ基础知识

    ActiveMQ是Apache软件基金会下的一个开源消息中间件,提供了基于Java的消息传递机制。ActiveMQ是一个消息代理服务器,能够将消息从生产者传递到消费者,提供了高效、可靠、灵活的消息传递机制。下面是.ActiveMQ基础...

    ActiveMQ消息中间件.zip

    5. **安全性**:ActiveMQ支持用户认证和授权,可以通过JAAS(Java Authentication and Authorization Service)配置安全策略。 6. **性能优化**:包括预取策略、消息压缩、批量发送和批量消费等,以提高消息处理...

    ActiveMQ 初识

    6. **安全机制**:ActiveMQ支持多种认证和授权机制,如JAAS(Java Authentication and Authorization Service),可以对用户和资源进行精细的权限控制。 7. **消息分发策略**:ActiveMQ支持点对点(Queue)和发布/...

    spring整合activemq

    Spring整合ActiveMQ是Java开发中常见的一种技术组合,主要用于实现应用程序间的异步消息通信。Spring框架提供了对ActiveMQ的高度集成,使得开发者能够轻松地在应用中加入消息队列功能,提高系统的可扩展性和可靠性。...

    activemq的使用

    此外,还可以通过JAAS(Java Authentication and Authorization Service)进行更复杂的安全管理。 总结,ActiveMQ是一个强大的消息中间件,提供了一套完整的消息传递解决方案,通过理解并应用上述知识点,你可以...

    ActiveMQProducer

    9. **配置与连接参数**:配置ActiveMQProducer时,需要提供服务器URL、用户名、密码等连接参数,这些可以通过JNDI(Java Naming and Directory Interface)或者直接在代码中设置。 10. **性能优化**:在使用...

    ActiveMQInAction

    ActiveMQ支持多种身份验证和授权机制,如JAAS(Java Authentication and Authorization Service)、Kerberos和简单的用户名/密码认证。通过设置用户角色和权限,可以限制客户端对资源的访问。 **9. 性能与监控** ...

    camel-manual-2.0

    Each route starts with a producer that sends messages to a consumer, and the message travels through a series of processors until it reaches the end of the route. Endpoints are the starting and ...

    Jms做的一些的demo

    1. **创建ConnectionFactory**:这是连接到消息服务器的工厂对象,你可以通过JNDI(Java Naming and Directory Interface)查找或直接实例化。 2. **创建Connection**:ConnectionFactory用于创建与消息代理的连接...

    深入掌握 JMS(java message service)

    - 在Java EE环境中,通常通过JNDI(Java Naming and Directory Interface)来查找和获取`ConnectionFactory`和`Destination`。 2. **代码实现**: - 创建一个`ConnectionFactory`实例。 - 使用`...

    网络架构师148讲视频课程

    java架构师148讲视频教程 │ ├─1-148视频教程 │ 第01节:整体课程概览.flv │ 第02节:分模块、分工程管理.avi │ 第03节:多模块多Web应用合并War包.avi │ 第04节:Git基本原理和安装配置使用.avi │ 第05节...

    J2EE中的JMS 消息服务

    在开发JMS应用时,开发者需要编写代码来创建`ConnectionFactory`,这通常是通过JNDI(Java Naming and Directory Interface)查找完成的。接着,创建`Connection`,并从连接中获取`Session`,`Session`是线程安全的...

    《activmq in action 》

    An Introduction to Messaging and ActiveMQ ............................................. 1 1. Introduction to Apache ActiveMQ ....................................................... 2 1.1. What is ...

    struts2文件上传(转)

    例如,除了Struts2的核心库,还需要Apache Commons IO和Commons FileUpload库,以及可能的JMS提供者的jar,如ActiveMQ或IBM MQ。 总结,Struts2的文件上传功能结合了Java Web开发的多个方面,包括表单提交、文件流...

Global site tag (gtag.js) - Google Analytics