4.6 Topic
概要:
Hazelcast 提供了发布消息给多个消费者的分布式机制.即众所周知的publish/subscribe(pub/sub)消息模型.
在cluster层面进行生产及消费操作.在topic中,当一个新的member加入后,你需要为其添加一个监听器,实际上是为在cluster中的一些member注册消息的发布机制.
NOTE: Publish operation is async. It does not wait for operations to run in remote nodes, it works as fire and forget.
简单的topic例子
import com.hazelcast.core.Topic; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.MessageListener; public class Sample implements MessageListener<MyEvent> { public static void main( String[] args ) { Sample sample = new Sample(); HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(); ITopic topic = hazelcastInstance.getTopic( "default" ); topic.addMessageListener( sample ); topic.publish( new MyEvent() ); } public void onMessage( Message<MyEvent> message ) { MyEvent myEvent = message.getMessageObject(); System.out.println( "Message received = " + myEvent.toString() ); if ( myEvent.isHeavyweight() ) { messageExecutor.execute( new Runnable() { public void run() { doHeavyweightStuff( myEvent ); } } ); } } // ... private final Executor messageExecutor = Executors.newSingleThreadExecutor(); }
4.6.2 统计 Statistics
在Topic中有两种统计变量可以进行访问查询操作.这些值是由本地成员负责维护的,一般为增加.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(); ITopic<Object> myTopic = hazelcastInstance.getTopic( "myTopicName" ); myTopic.getLocalTopicStats().getPublishOperationCount(); myTopic.getLocalTopicStats().getReceiveOperationCount();
getPublishOperationCount()
and getReceiveOperationCount
( ) 方法会分别返回在此节点启动后生产的消息数量以及接收到的消息数量.请注意,这些值是不会被备份的,此节点关闭后,这些值也会丢失.
关于Topic的设置特性请参照 Topic Configuration.
NOTE: These statistics values can be also viewed in Management Center. Please see Topics.
4.6.3 内部构件 Internals
每一个节点拥有一个本cluster中所有节点的注册列表.当Topic注册一个新的节点时,它将给本cluster中的所有成员发送一条注册信息.同样的,当一个新节点加入cluster中时,它将会收到截止到它注册时,本cluster中所有节点的注册信息.
配置globalOrderEnabled可改变topic中的节点行为.
- If
globalOrderEnabled
is disabled:
消息将是有序的,举个例子,如果消费者发布一条消息时,它会将一条消息推送到order队列.假如cluster成员M发布了很多消息 m1, m2, m3,...,mn到topic T,接下来Hazelcast会确保所有topic T中的消费者接收到的信息顺序也为m1, m2, m3,...,mn.
这就是它的工作原理.比如说我们现在有三个节点(node1,node2,node3),node1和node2注册到名为news的topic中.请注意,所有的三个节点都会知道node1和node2注册到了topic news中喔.
在这个例子中node1推送了两条消息:a1和a2.node3推送了两条消息:c1和c2.当node1和node3发布消息的时候,他们会检查他们各自本地的注册节点清单.他们发现node1和node2在这个列表中.于是,他们将消息发送到这些在列表中的节点.下面是他们可能受到的消息顺序:
Node1 -> c1, a1, a2, c2
Node2 -> c1, c2, a1, a2
- If
globalOrderEnabled
is enabled:
当globalOrderEnabled设置为可用时,它将保证所有监听相同topic的节点,收到的消息顺序相同.
我们再看看这次会怎么样.现在我们还是有三个节点(node1,node2,node3),node1和node2也注册到名为news的topic中.注意,所有的节点都知道node1和node2注册到了news中.
在这个例子里,node1发布两天消息:a1,a2.node3发布两天消息:c1,c2.当一个节点通过topic news发布消息时,它首先会计算news的id与哪一个分区相符.接下来给该分区的拥有者发送一个操作,于是该节点发布消息.我们假设news与node2拥有的分区一致.接下来node1
和node3首先将发送给node2所有信息.假设将用以下顺序发送信息:
Node1 -> a1, c1, a2, c2
此时,node2将把这些小心发送给本地存储的节点注册列表中的各个节点.它实际上会将这些消息发送给node1和node2(它将为它本身建立一个分配机制).
Node1 -> a1, c1, a2, c2
Node2 -> a1, c1, a2, c2
使用这种方式时,会保证所有节点接收到消息事件的顺序相同.
在这两种情况下, 都会使用EventService的StripedExecutor来分配接收到的信息.对于Hazelcast中的所有事件,都通过StripedExecutor来处理,以保证它们是有序的.
In StripedExecutor
, there are as much threads specified in the property hazelcast.event.thread.count
(default is 5). For a specific event source (for topic, for a particular topic name), hash of that source's name % 5gives the ID of responsible thread. Note that, there can be another event source (entry listener of a map, item listener of a collection, etc.) corresponding to the same thread. In order not to make other messages to block, heavy process should not be done in this thread. If there is a time consuming work needs to be done, the work should be handed over to another thread. Please see Sample Topic Code.
4.6.4 Topic配置 Topic Configuration
注解式配置:
<hazelcast> ... <topic name="yourTopicName"> <global-ordering-enabled>true</global-ordering-enabled> <statistics-enabled>true</statistics-enabled> <message-listeners> <message-listener>MessageListenerImpl</message-listener> </message-listeners> </topic> ... </hazelcast>
在程序中配置:
TopicConfig topicConfig = new TopicConfig(); topicConfig.setGlobalOrderingEnabled( true ); topicConfig.setStatisticsEnabled( true ); topicConfig.setName( "yourTopicName" ); MessageListener<String> implementation = new MessageListener<String>() { @Override public void onMessage( Message<String> message ) { // process the message } }; topicConfig.addMessageListenerConfig( new ListenerConfig( implementation ) ); HazelcastInstance instance = Hazelcast.newHazelcastInstance()
缺省值:
-
Global ordering is false, meaning there is no global order guarantee by default.
-
Statistics are true, meaning statistics are calculated by default.
Topic有关的设置,但不是topic特有的配置变量:
-
hazelcast.event.queue.capacity
: default value is 1,000,000 -
hazelcast.event.queue.timeout.millis
: default value is 250 -
hazelcast.event.thread.count
: default value is 5
RELATED INFORMATION
For description of these parameters, please see Global Event Configuration
相关推荐
包含翻译后的API文档:hazelcast-3.7.2-javadoc-API文档-中文(简体)版.zip; Maven坐标:com.hazelcast:hazelcast:3.7.2; 标签:hazelcast、中文文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开...
包含翻译后的API文档:shiro-hazelcast-1.4.0-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.shiro:shiro-hazelcast:1.4.0; 标签:apache、shiro、hazelcast、中英对照文档、jar包、java; ...
包含翻译后的API文档:shiro-hazelcast-1.4.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.shiro:shiro-hazelcast:1.4.0; 标签:apache、shiro、hazelcast、中文文档、jar包、java; 使用方法:解压...
包含翻译后的API文档:hazelcast-3.7.2-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:com.hazelcast:hazelcast:3.7.2; 标签:hazelcast、中英对照文档、jar包、java; 使用方法:解压翻译后的API文档...
Hazelcast是一款开源的内存数据网格,它提供了一个分布式内存计算平台,用于处理大量数据并提高应用程序的性能。Hazelcast Center是其配套的管理工具,可以帮助用户监控、管理和配置运行中的Hazelcast实例。在你提供...
Hazelcast是一个开源内存数据网格,提供分布式内存计算能力,包括内存存储、分布式计算、分布式事件处理和缓存功能。版本3.12.9是一个稳定版本,包含了对Hazelcast平台的优化和改进。 在"Management Center"中,你...
1. **Hazelcast**: 了解 Hazelcast 的核心概念,如分布式数据结构(如分布式Map、Queue、Topic等)、分布式计算和缓存机制,以及如何在微服务架构中使用它来提高可伸缩性和性能。 2. **STAX API**: 学习 STAX 解析...
用于Kubernetes的Hazelcast Discovery插件... 版本兼容性: hazelcast-kubernetes 2.0+与hazelcast 4+兼容hazelcast-kubernetes 1.3+与hazelcast 3.11.x,3.12.x兼容对于较旧的hazelcast版本,您需要使用hazelcast-kub
2. **分布式 Map**:Hazelcast 的分布式 Map 支持多节点间的共享数据存储,确保数据的一致性和可用性。 3. **分布式队列和列表**:允许在集群中的不同节点之间同步数据,实现任务调度和数据共享。 4. **分布式事件...
Hazelcast 是一款流行的开源内存数据网格解决方案,它提供分布式缓存、分布式Map、队列、 topic 和其他并发数据结构。Spring 框架是Java开发中的常用企业级应用框架,提供了依赖注入、AOP(面向切面编程)等功能。...
Hazelcast是一个内存数据网格,能够帮助提升应用程序的性能和可扩展性,而Hibernate是Java领域广泛使用的对象关系映射(ORM)框架。这个版本3.1.5的Hazelcast-Hibernate3适配器使得开发者可以利用Hazelcast的分布式...
Hazelcast是一个开源的、无服务器的数据管理平台,它提供了分布式内存数据存储、计算以及消息传递功能。对于Python开发者来说,`hazelcast-python-client`提供了与Hazelcast集群通信的接口,使Python应用程序能够...
atmosphere-hazelcast-1.0.15-sources.jar
atmosphere-hazelcast-1.0.14-sources.jar
atmosphere-hazelcast-1.0.13-sources.jar
atmosphere-hazelcast-1.0.9-sources.jar
atmosphere-hazelcast-1.0.8-sources.jar
atmosphere-hazelcast-1.0.7-sources.jar
atmosphere-hazelcast-1.0.6-sources.jar