1)KahaDb和AMQ Message Store两种持久方式如何选择?
From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.
The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.
推荐: Amq持久方式
2)错误:Channel was inactive for too long
在建立连接的Uri中加入: wireFormat.maxInactivityDuration=0
You can do the following to fix the issues:
1) Append max inactivity duration to your Uri in the format below: wireFormat.maxInactivityDuration=0
2) Use the same Uri at the client side as well as at the server side
2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/
org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/
ActiveMQ的tcp url:tcp://localhost:61616后面要加入?wireFormat.maxInactivityDuration=0 这样的参数,否则当一段时间没有消息发送时会抛出 "Channel was inactive for too long"异常
3)错误:Wire format negotiation timeout: peer did not send his wire format.
将:log4j.rootLogger=INFO, console, logfile
2)在建立连接的Uri中加入: maxInactivityDurationInitalDelay=30000
If you get exception like this,it can mean one of three things:
1. You're connecting to the port not used by ActiveMQ TCP transport
Make sure to check that you're connecting to the appropriate host:port
2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages
Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender
3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time
If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat.maxInactivityDurationInitalDelay property on the connection URL in your client.
For example
will use 30 sec timeout.(貌似有问题!!!)
4)错误:Out of memory
1) 设置Java最大内存限制为合适大小:
Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默认是512)
2)Activemq.xml配置节:systemUsage/ systemUsage配置大小合适,并且特别注意:大于所有durable desitination设置的memoryUsage之和。
2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有durable desitination设置之和
3)SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会“slow down producer”,还是很重要的。
参考-- http://activemq.apache.org/javalangoutofmemory.html
1. 用bin/activemq命令在独立JVM中运行broker。用-Xmx和-Xss命令即可(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可);
2. 默认情况下,MQ用512M的JVM;
1. broker使用的内存并不是由JVM的内存决定的。虽然受到JVM的限制,但broker确实独立管理器内存;
2. systemUsage和destination的内存限制与broker内存息息相关;
3. MQ中内存的关系是:JVM->Broker->broker features;
4. 所有destination的内存总量不能超过broker的总内存;
1. 由于消息大小可以配置,prefetch limit往往是导致内存溢出的主要原因;
2. 减少prefetch limit的大小,会减少消费者内存中存储的消息数量;
1. 除非消息数量超过了broker资源的限制,否则生产者不会导致内存溢出;
2. 当内存溢出后,生产者会收到broker的阻塞信息提示;
1. 只有当消息在内存中存储时,才允许消息的快速匹配与分发,而当消费者很慢或者离开时,内存可能会耗尽;
2. 当destination到达它的内存临界值时,broker会用消息游标来缓存非持久化的消息到硬盘。
3. 临界值在broker中通过memoryUsage和systemUsage两个属性配置,请参考activemq.xml;
4. 对于缓慢的消费者,当尚未耗尽内存或者转变为生产者并发控制模式前,这个特性允许生产者继续发送消息到broker;
5. 当有多个destination的时候,默认的内存临界值可能被打破,而这种情况将消息缓存到硬盘就显得很有意义;
6. precentUsage配置:使用百分比来控制内存使用情况;
1. 默认情况下,MQ每个destination都对应唯一的线程;
2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可),用线程池来限制线程的数量,从而减少内存消耗;
1. destination policies--maxPageSize:控制进入内存中的消息数量;lazyDispatch:增加控制使用当前消费者列表的预取值;
2. 使用blogMessage或者streamsMessage类型来进行大量文件的传输;
1. 当session或者producer或者consumer大量存在而没有关闭的时候;
2. 使用PooledConnectionFactory;
property name |
default value |
Comments |
directory |
activemq-data |
the path to the directory to use to store the message store data and log files |
indexWriteBatchSize |
1000 |
number of indexes written in a batch |
indexCacheSize |
10000 |
number of index pages cached in memory |
enableIndexWriteAsync |
false |
if set, will asynchronously write indexes |
journalMaxFileLength |
32mb |
a hint to set the maximum size of the message data logs |
enableJournalDiskSyncs |
true |
ensure every non transactional journal write is followed by a disk sync (JMS durability requirement) |
cleanupInterval |
30000 |
time (ms) before checking for a discarding/moving message data logs that are no longer used |
checkpointInterval |
5000 |
time (ms) before checkpointing the journal |
ignoreMissingJournalfiles |
false |
If enabled, will ignore a missing message log file |
checkForCorruptJournalFiles |
false |
If enabled, will check for corrupted Journal files on startup and try and recover them |
checksumJournalFiles |
false |
create a checksum for a journal file - to enable checking for corrupted journals |
Available since version 5.4: |
archiveDataLogs |
false |
If enabled, will move a message data log to the archive directory instead of deleting it. |
directoryArchive |
null |
Define the directory to move data logs to when they all the messages they contain have been consumed. |
databaseLockedWaitDelay |
10000 |
time (ms) before trying to get acquire a the database lock (used by shared master/slave) |
maxAsyncJobs |
10000 |
the maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers) |
concurrentStoreAndDispatchTopics |
false |
enable the dispatching of Topic messages to interested clients to happen concurrently with message storage |
concurrentStoreAndDispatchQueues |
true |
enable the dispatching of Queue messages to interested clients to happen concurrently with message storage |
property name |
default value |
Comments |
directory |
activemq-data |
the path to the directory to use to store the message store data and log files |
useNIO |
true |
use NIO to write messages to the data logs |
syncOnWrite |
false |
sync every write to disk |
maxFileLength |
32mb |
a hint to set the maximum size of the message data logs |
persistentIndex |
true |
use a persistent index for the message logs. If this is false, an in-memory structure is maintained |
maxCheckpointMessageAddSize |
4kb |
the maximum number of messages to keep in a transaction before automatically committing |
cleanupInterval |
30000 |
time (ms) before checking for a discarding/moving message data logs that are no longer used |
indexBinSize |
1024 |
default number of bins used by the index. The bigger the bin size - the better the relative performance of the index |
indexKeySize |
96 |
the size of the index key - the key is the message id |
indexPageSize |
16kb |
the size of the index page - the bigger the page - the better the write performance of the index |
directoryArchive |
archive |
the path to the directory to use to store discarded data logs |
archiveDataLogs |
false |
if true data logs are moved to the archive directory instead of being deleted |
property name |
default value |
Comments |
memoryUsage |
20M |
amq使用内存大小,照amq论坛上说,这个值应该大于所有durable desitination设置的 memoryUsage之和,否则会导致硬盘swap,影响性能。 |
storeUsage |
1G |
kaha数据存储大小,如果设置不足,性能会下降到1个1个发 |
tempUsage |
100M |
非persistent的消息存储在temp区域 |
4.1)Failover Transport Options
Option Name |
Default |
Description |
transport.timeout |
-1 |
Time that a send operation blocks before failing. |
transport.initialReconnectDelay |
10 |
Time in Milliseconds that the transport waits before attempting to reconnect the first time. |
transport.maxReconnectDelay |
30000 |
The max time in Milliseconds that the transport will wait before attempting to reconnect. |
transport.backOffMultiplier |
2 |
The amount by which the reconnect delay will be multiplied by if useExponentialBackOff is enabled. |
transport.useExponentialBackOff |
true |
Should the delay between connection attempt grow on each try up to the max reconnect delay. |
transport.randomize |
true |
Should the Uri to connect to be chosen at random from the list of available Uris. |
transport.maxReconnectAttempts |
0 |
Maximum number of time the transport will attempt to reconnect before failing (0 means infinite retries) |
transport.startupMaxReconnectAttempts |
0 |
Maximum number of time the transport will attempt to reconnect before failing when there has never been a connection made. (0 means infinite retries) (included in NMS.ActiveMQ v1.5.0+) |
transport.reconnectDelay |
10 |
The delay in milliseconds that the transport waits before attempting a reconnection. |
transport.backup |
false |
Should the Failover transport maintain hot backups. |
transport.backupPoolSize |
1 |
If enabled, how many hot backup connections are made. |
transport.trackMessages |
false |
keep a cache of in-flight messages that will flushed to a broker on reconnect |
transport.maxCacheSize |
256 |
Number of messages that are cached if trackMessages is enabled. |
transport.updateURIsSupported |
true |
Update the list of known brokers based on BrokerInfo messages sent to the client. |
4.2)Connection Options
Option Name |
Default |
Description |
connection.AsyncSend |
false |
Are message sent Asynchronously. |
connection.AsyncClose |
true |
Should the close command be sent Asynchronously |
connection.AlwaysSyncSend |
false |
Causes all messages a Producer sends to be sent Asynchronously. |
connection.CopyMessageOnSend |
true |
Copies the Message objects a Producer sends so that the client can reuse Message objects without affecting an in-flight message. |
connection.ProducerWindowSize |
0 |
The ProducerWindowSize is the maximum number of bytes in memory that a producer will transmit to a broker before waiting for acknowledgement messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends where the client is responsible for managing memory usage. The default value of 0 means no flow control at the client. See also Producer Flow Control |
connection.useCompression |
false |
Should message bodies be compressed before being sent. |
connection.sendAcksAsync |
false |
Should message acks be sent asynchronously |
connection.messagePrioritySupported |
true |
Should messages be delivered to the client based on the value of the Message Priority header. |
connection.dispatchAsync |
false |
Should the broker dispatch messages asynchronously to the connection's consumers. |
4.3)OpenWire Options
Option Name |
Default |
Description |
wireFormat.stackTraceEnabled |
false |
Should the stack trace of exception that occur on the broker be sent to the client? Only used by openwire protocol. |
wireFormat.cacheEnabled |
false |
Should commonly repeated values be cached so that less marshalling occurs? Only used by openwire protocol. |
wireFormat.tcpNoDelayEnabled |
false |
Does not affect the wire format, but provides a hint to the peer that TCP nodelay should be enabled on the communications Socket. Only used by openwire protocol. |
wireFormat.sizePrefixDisabled |
false |
Should serialized messages include a payload length prefix? Only used by openwire protocol. |
wireFormat.tightEncodingEnabled |
false |
Should wire size be optimized over CPU usage? Only used by the openwire protocol. |
wireFormat.maxInactivityDuration |
30000 |
The maximum inactivity duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to appear to die, so we allow the broker to kill connections if they are inactive for a period of time. Use by some transports to enable a keep alive heart beat feature. Set to a value <= 0 to disable inactivity monitoring. |
maxInactivityDurationInitalDelay |
10000 |
The initial delay in starting the maximum inactivity checks (and, yes, the word 'Inital' is supposed to be misspelled like that) |
<bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="admin" />
<property name="authenticate" value="false" />
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
activemq思维导图2019 当前使用较多的消息中间件有RabbitMQ、RocketMQ、ActiveMQ、Kafka、...能够结合Spring/SpringBoot进行实际开发配置并能够进行MQ多节点集群的部署,最后学习MQ的高级特性和高频面试题的分析。
**ActiveMQ 概述** ActiveMQ 是 Apache 开源组织提供的一款高效、强大的消息...通过这个“MQ之ActiveMQ思维导图”,读者可以系统地理解和掌握 ActiveMQ 的核心功能和应用场景,进一步提升在消息队列领域的专业技能。
总结来说,这个压缩包的内容很可能是关于如何使用Java和ActiveMQ实现消息转发的教程或示例代码,涉及到ActiveMQ的基础知识、JMS API的使用、消息转发的配置与编程实现,以及在实际应用中的价值。对于想要学习和掌握...
Spring集成ActiveMQ配置详解 Spring框架与ActiveMQ的集成,为开发者提供了一种高效、可靠的JMS消息处理机制。在企业级应用中,这种集成能够极大地提升系统的响应速度和容错能力,特别是在需要异步通信和分布式事务...
### ActiveMQ教程与配置详解 #### 一、ActiveMQ简介 **ActiveMQ**是目前最受欢迎且功能强大的开源消息中间件之一。它完全遵循了JMS 1.1和J2EE 1.4规范,这使得它能够在各种企业级应用程序中发挥重要作用。尽管JMS...
1. 配置 activemq.xml 文件。 2. 批量发送消息。 3. 等待若干秒,消息减少。 4. 等待若干秒,消息队列清除(设置了抛弃死信队列)。 5. 等待若干秒,消息队列清除(未设置抛弃死信队列,设置了死信队列过期时间)。 ...
当我们需要在Spring应用中集成ActiveMQ时,就需要进行相应的配置。本文将深入讲解ActiveMQ与Spring的整合配置方案。 首先,我们需要在项目中引入ActiveMQ的相关依赖。这通常通过在`pom.xml`文件中添加Maven依赖来...
- **内存配置**:确保运行ActiveMQ的JVM有足够的内存,并合理配置ActiveMQ代理的内存使用限制。 ##### 2. 水平扩展 当单一的ActiveMQ实例无法满足需求时,可以考虑采用水平扩展的方式来增加系统的吞吐量和可用性。...
Spring Boot以其简化配置和快速开发的特性受到广泛欢迎,而ActiveMQ作为一款开源的消息中间件,提供了消息队列(Message Queue)功能,帮助解耦应用组件,提高系统的可靠性和可扩展性。下面我们将深入探讨这两个技术...
本篇文章将详细讲解如何配置ActiveMQ以实现静态集群。 静态集群是ActiveMQ提供的一种高可用性解决方案,通过在多个节点间共享数据,确保即使某个节点故障,服务也能不间断地运行。以下我们将按照步骤介绍配置静态...
尽管ActiveMQ 在开源JMS提供商中脱颖而出,但仍有其他竞争对手,如JBoss的jBossMQ和jBoss Messaging,OpenJMS,以及IBM WebSphereMQ、BEA WebLogic JMS等商业产品。尽管这些产品都有各自的优点,但ActiveMQ的独立性...
与重量级且需付费的 IBM MQ 相比,ActiveMQ 更适合初学者及预算有限的项目使用。 **特点**: - **开源免费**:基于 Apache 许可证发布,无需支付任何费用。 - **功能强大**:完全支持 JMS 1.1 和 J2EE 1.4 规范,...
总结起来,这个Spring Boot整合ActiveMQ的案例涵盖了如何配置和使用Queue与Topic,以及通过定时任务和Controller请求来发送消息。理解并掌握这些知识点,有助于我们在实际项目中构建高效、可靠的分布式消息传递系统...
2. **配置ActiveMQ**:在ActiveMQ的配置文件(通常是`conf/activemq.xml`)中,你需要设置`transportConnectors`元素以启用SSL。你需要指定SSL端口,并将服务器的证书和私钥加载到Keystore中,同时设置Truststore以...