本指南以1.4.5版本的java客户端为起点编写。
- 简单例子
- 消息
- 客户端配置
- 会话工厂MessageSessionFactory
- 发送消息MessageProducer
- 订阅消息MessageConsumer
- 遍历消息TopicBrowser
- Spring框架支持
- 高级主题
这里包括一些MetaQ的高级应用,比如使用log4j appender发送消息作为日志框架,Twitter storm集成以及发送顺序消息等。
消息过滤
在一些场景里,你可能希望消费者只消费一个topic下的部分满足特定要求的消息,而不是全部消费。通常,我会建议使用消息属性(attribute)来过滤消息,在MessageListener
接收到消息的时候,判断message.getAttribute()
返回是否符合要求来决定是否消费。也就是将过滤做到客户端。这样的代价是客户端还是会拉取不想消费的消息,浪费带宽。从1.4.6开始,MetaQ同时提供服务端和客户端过滤消息的接口ConsumerMessageFilter
,用来过滤消息。
客户端过滤
具体见订阅消息MessageConsumer的消息过滤一节。
服务端过滤
同样,你需要实现ConsumerMessageFilter
接口,并将你的实现打包成jar文件,放到服务器Broker的provided目录,接下来,配置server.ini文件,假设你的实现是com.xxxx.MyMessageFilter
类,你想为消费分组log-processor
过滤topic是log
下的消息,那么你应该这样配置:
[topic=log]
group.log-processor=com.xxxx.MyMessageFilter
配置之后,重启Broker,消息过滤将立即生效。
总结来说,服务端消息过滤需要五个步骤:
- 实现
ConsumerMessageFilter
接口,实现你的消息过滤器。 - 打包实现成jar文件,可以用maven等构建工具,也可以用eclipse导出,如果你的过滤器实现用到了第三方库,也请一起打包进jar包,或者拷贝到服务器的provided目录。
- 将打包后的jar和依赖包,拷贝到服务器的provided目录。
- 配置server.ini,找到你想过滤的topic配置,添加
group.xxx=MyFilter
,其中xxx
是你的消费分组名称,而MyFilter
就是你的过滤器实现类名。 - 重启Broker,过滤即时生效。
消息去重
首先,MetaQ会尽量避免消息重复,每个topic的每个分区都只会被一个consumer消费,但是在consumer做负载均衡的过程中,可能因为consumer列表的变更,导致分区分配规则不一致,从而导致部分消息会被重复消费。这种情况可以通过下列手段来避免:
- 合理设置订阅的maxSize,这个缓冲区大小,最好只是略大于你的最大的消息大小(包括消息头部20个字节)。比如你的最大消息是1024字节,那么建议maxSize可以设置成1044字节以上。如果有消息属性,这个值还应该加上消息属性的长度,并加上4个字节的大小。
- 通过1.4.6引入的
MessageIdCache
接口的消息缓冲来去重。通过将消费过的消息id在缓冲中标示为已经处理,来避免重复消费。
我们重点介绍下MessageIdCache
,这个接口如下:
package com.taobao.metamorphosis.client.consumer;
/**
* Message id cache to prevent duplicated messages for the same consumer group.
*
* @author dennis<killme2008@gmail.com>
* @since 1.4.6
*
*/
public interface MessageIdCache {
/**
* Added key value to cache
*
* @param key
* @param exists
*/
public void put(String key, Byte exists);
/**
* Get value from cache,it the item is exists,it must be returned.
*
* @param key
* @return
*/
public Byte get(String key);
}
默认1.4.6版本有一个实现ConcurrentLRUHashMap
,使用LRU算法维护一个缓存map。默认启用这个实现,固定大小为4096,可以通过metaq.consumer.message_ids.lru_cache.size
环境变量修改这个大小。这个实现是全局共享的,也就是所有的MessageConsumer
都使用同一个缓存来做消息去重。
默认的这个实现仍然是JVM级别的去重,如果你的消费者是分布式的,那么可能需要一个集中式的全局缓冲来去重,比如在example里我们提供了一个基于memcached的实现:
package com.taobao.metamorphosis.example.cache;
import java.util.concurrent.TimeoutException;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.exception.MemcachedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.taobao.metamorphosis.client.consumer.MessageIdCache;
public class MemcachedMessageIdCache implements MessageIdCache {
private final MemcachedClient memcachedClient;
private int expireInSeconds = 60;
private static final Log log = LogFactory.getLog(MemcachedMessageIdCache.class);
public MemcachedMessageIdCache(MemcachedClient client) {
this.memcachedClient = client;
}
public void setExpireInSeconds(int expireInSeconds) {
this.expireInSeconds = expireInSeconds;
}
public int getExpireInSeconds() {
return this.expireInSeconds;
}
@Override
public void put(String key, Byte exists) {
try {
this.memcachedClient.set(key, this.expireInSeconds, exists);
}
catch (MemcachedException e) {
log.error("Added message id cache failed", e);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (TimeoutException e) {
log.error("Added message id cache timeout", e);
}
}
@Override
public Byte get(String key) {
try {
return this.memcachedClient.get(key);
}
catch (MemcachedException e) {
log.error("Get item from message id cache failed", e);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (TimeoutException e) {
log.error("Get item from message id cache timeout", e);
}
return null;
}
}
默认会将处理过的消息保存在memcached里,并设置过期时间为1分钟。你也可以实现自己的MessageIdCache
,实现后通过SimpleFetchManager.setMessageIdCache
这个静态方法设置进去就可以使用你的实现。
使用数据库事务去重
如果你消费消息的目的是操作某个数据库,比如将消息的内容写入数据库,或者根据消息内容更新数据库。那么通过将offset存储到同一个数据库,在消费消息的同时更新offset到数据库,也可以实现消息去重。
我们已经提供了MysqlOffsetStorage
,你也可以实现其他数据库的offset存储,在MessageListener.recieveMessages
方法接收消息的时候,你可以通过SimpleFetchManager.currentTopicRegInfo
静态方法,获取当前消费消息的offset信息,并在一个事务里同时更新offset和消费消息。
使用log4j扩展发送消息
参见 使用Log4j发送消息
使用MetaQ作为twitter storm数据源
Maven引用MetaQ storm spout:
<dependency>
<groupId>com.taobao.metamorphosis</groupId>
<artifactId>metamorphosis-storm-spout</artifactId>
<version>1.4.6.2</version>
</dependency>
一个示范性的Topology(在example工程里):
package com.taobao.metamorphosis.example.storm;
import static com.taobao.metamorphosis.example.Help.initMetaConfig;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.storm.scheme.StringScheme;
import com.taobao.metamorphosis.storm.spout.MetaSpout;
public class TestTopology {
public static class FailEveryOther extends BaseRichBolt {
OutputCollector _collector;
int i = 0;
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector collector) {
this._collector = collector;
}
@Override
public void execute(Tuple tuple) {
this.i++;
if (this.i % 2 == 0) {
this._collector.fail(tuple);
}
else {
this._collector.ack(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",
new MetaSpout(new MetaConfig(), new ConsumerConfig("storm-spout"), new StringScheme()), 10);
builder.setBolt("bolt", new FailEveryOther()).shuffleGrouping("spout");
Config conf = new Config();
// Set the consume topic
conf.put(MetaSpout.TOPIC, "neta-test");
// Set the max buffer size in bytes to fetch messages.
conf.put(MetaSpout.FETCH_MAX_SIZE, 1024 * 1024);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
}
}
MetaSpout
接收三个参数,首先是MetaClientConfig
和ConsumerConfig
,这跟配置一个普通的消息消费者没有什么区别,具体见前面的章节。第三个参数scheme
除了用于declareOutputFields
之外,还用来反序列化MetaQ的消息data:
//MetaSpout.java
@Override
public void nextTuple() {
if (this.messageConsumer != null) {
try {
final MetaMessageWrapper wrapper = this.messageQueue.poll(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS);
if (wrapper == null) {
return;
}
final Message message = wrapper.message;
this.collector.emit(this.scheme.deserialize(message.getData()), message.getId());
}
catch (final InterruptedException e) {
// interrupted while waiting for message, big deal
}
}
}
默认提供了一个StringScheme
:
package com.taobao.metamorphosis.storm.scheme;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class StringScheme implements Scheme {
public List<Object> deserialize(byte[] bytes) {
try {
return new Values(new String(bytes, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
public Fields getOutputFields() {
return new Fields("str");
}
}
声明output fields为str
,并且认为消息的data是一个字符串。
Topology需要配置订阅的topic
和fetchSize
,最终提交到storm集群。
转自 metaq's wiki
相关推荐
Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...
总结,MetaQ-server-1.4.6.2版本提供了一个完整的消息中间件解决方案,包括服务端、客户端和相应的文档支持。通过使用MetaQ,开发者可以构建出高效、可靠的分布式系统,同时利用Javadoc文档来加速开发过程,确保代码...
《MetaQ服务器1.4.6.2版的深度解析》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于提供高可靠、高可用的消息传输服务。在本文中,我们将深入探讨MetaQ Server 1.4.6.2版本的核心特性、架构设计以及使用...
《MetaQ服务器1.4.6.2版本详解》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。在1.4.6.2这个版本中,它继续保持着与原版一致的核心特性,提供高效、稳定、可扩展的...
《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...
Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...
在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...
3. **安装环境**:MetaQ 的运行需要 Java 运行环境(JRE),因此在安装前需确保系统已安装 JRE 或 JDK。此外,由于 MetaQ 使用 ZooKeeper,所以也需要安装并配置好 ZooKeeper 服务。 4. **安装步骤**: - 下载 ...
异常的根本原因在于,Metaq的代码设计依赖于`DirectByteBuffer`的私有方法`viewedBuffer()`,而在JDK 7的某个更新版本中,这个方法的访问权限被限制,仅限于`java.nio`包内部使用。当Metaq在JDK 7环境下运行时,由于...
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
- Metaq 1.x: 由开源社区killme2008维护,较为活跃。 - Metaq 2.x: 为后续版本。 - RocketMQ 3.x: 在2012年10月推出,被淘宝内部广泛使用,并开源。 9. RocketMQ衍生项目 RocketMQ衍生项目包括结合特定业务需求...
MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...
- **RocketMQ**的前身是MetaQ,最初可以看作是LinkedIn的Kafka(Scala版)的一个Java版本,并在此基础上增加了事务支持。 - **RocketMQ**相对于原生Kafka的特点在于除了基本的日志收集功能外,还支持高可用(HA)、...
- **纯Java实现**:无论是通信层还是存储层,MetaQ均使用Java语言实现,这对于支付宝这样的大型企业而言非常重要,因为Java是业界广泛使用的编程语言之一,这意味着更容易找到熟练掌握该语言的开发人员。 - **事务...
Java开发的特点是可以很早地进行TDDL,METAQ等等的对接,而脚本开发的特点是可以在进行批处理的时候非常方便。 在实际开发中,我们可能会遇到一些问题,例如,如何将抓取的数据进行打包,如何通过脚本进行抓取,...
Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...
Metaq在JDk 7下的异常及解决方案.docx mqvsmq.pdf RocketMQ_原理简介.pdf RocketMQ_admin.pdf RocketMQ_benchmark.pdf RocketMQ_calvinzhan - 类图.pdf RocketMQ_calvinzhan.pdf RocketMQ_design-整体设计文档.pdf ...
这些消息队列在互联网应用中得到了广泛应用,如淘宝的MetaQ等开源产品。 JMS是Java消息服务的简称,它是一种消息中间件,主要用于两个应用程序间或分布式系统间发送消息,进行异步消息通信。JMS是与平台无关的实现...