目录
Storm项目:流数据监控 <4>. 1
---流数据监控MetaQ接口... 1
1 文档说明... 1
2 MetaQ与Storm接口... 1
2.1 MetaqSpout. 1
2.1.1 接口说明... 1
2.1.2 上代码... 2
2.2 MetaqBolt. 5
2.2.1 接口说明... 5
2.2.1 上代码... 5
3 代码改动说明... 6
4 关于Metaq的报错... 7
4.1 报错... 7
4.2 解决... 7
1 文档说明
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。
关于MetaQ生产者及消费者实例,参考自官方生产者与消费者实例:
https://github.com/killme2008/Metamorphosis/tree/master/metamorphosis-storm-spout
关于MetaQ与Storm之间的spout接口参考自官方MetaQ与storm的接口:
https://github.com/killme2008/Metamorphosis/tree/master/metamorphosis-storm-spout
MetaQ作为一种开源的消息中间件,有完善的开源社区,并且版本更新稳定,作为国人的开源软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ作为Storm的消息源之一是必须掌握的。
该模拟项目中,使用MetaQ作为storm的消息源,MetaqSpout从指定的zkconnect及topic中读取数据,发布到节点中。此外,还写了MetaQ与storm的生产者接口,即MetaqBolt指定Topic将数据写入Metaq中供其他业务系统继续使用。
2 MetaQ与Storm接口
2.1 MetaqSpout
2.1.1 接口说明
该接口参考自Github,作了部分修改。项目设计中,使用storm.xml.MetaqSpoutXml读取MetaqSpout对应的配置文件MetaqSpout.xml
配置文件中,指明zkconnect的地址及端口号、metaq的root目录、对应的消费topic及其消费组(这个很重要)。
读取配置之后,将配置传递到spout的open部分进行初始化工作,主要是进行消费者参数设定(包括zkconnect、root目录、Topic及Group设置)等。
在nextTuple方法中,进行消息(message)拉取(poll),每次拉取一条记录,发布到下一个拓扑节点中。
2.1.2 上代码
贴部分主要代码(详细参考代码包):
//构造函数,传递xml地址
public MetaqSpout(String MetaqSpoutXml) {
super();
this.metaqspoutxml = MetaqSpoutXml;
}
//实例化参数配置类
private ZKConfig zkConfig = new ZKConfig();
private MetaClientConfig metaClientConfig = new MetaClientConfig();
private final Scheme scheme = new StringScheme();
//初始化调用
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
//从xml中获取参数
new MetaqSpoutXml(this.metaqspoutxml).read();
this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;//"192.168.2.240:2181";
this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;//"/meta";
String topic = MetaqSpoutXml.topic;
String group = MetaqSpoutXml.group;
this.metaClientConfig.setZkConfig(this.zkConfig);
this.consumerConfig = new ConsumerConfig(group);
//final String topic = (String) conf.get(TOPIC);
if (topic == null) {
throw new IllegalArgumentException(TOPIC + " is null");
}
Integer maxSize = (Integer) conf.get(FETCH_MAX_SIZE);
if (maxSize == null) {
log.warn("Using default FETCH_MAX_SIZE");
maxSize = DEFAULT_MAX_SIZE;
}
this.id2wrapperMap = new ConcurrentHashMap();
this.messageQueue = new LinkedTransferQueue();
try {
this.collector = collector;
this.setUpMeta(topic, maxSize);
}
catch (final MetaClientException e) {
log.error("Setup meta consumer failed", e);
}
}
private void setUpMeta(final String topic, final Integer maxSize) throws MetaClientException {
this.sessionFactory = new MetaMessageSessionFactory(this.metaClientConfig);
this.messageConsumer = this.sessionFactory.createConsumer(this.consumerConfig);
this.messageConsumer.subscribe(topic, maxSize, new MessageListener() {
public void recieveMessages(final Message message) {
final MetaMessageWrapper wrapper = new MetaMessageWrapper(message);
MetaqSpout.this.id2wrapperMap.put(message.getId(), wrapper);
MetaqSpout.this.messageQueue.offer(wrapper);
try {
wrapper.latch.await();
}
catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
// 消费失败,抛出运行时异常
if (!wrapper.success) {
throw new RuntimeException("Consume message failed");
}
}
public Executor getExecutor() {
return null;
}
}).completeSubscribe();
}
//关闭时调用,进行consumer的shutdown操作
public void close() {
try {
this.messageConsumer.shutdown();
}
catch (final MetaClientException e) {
log.error("Shutdown consumer failed", e);
}
try {
this.sessionFactory.shutdown();
}
catch (final MetaClientException e) {
log.error("Shutdown session factory failed", e);
}
}
//消息发布
public void nextTuple() {
if (this.messageConsumer != null) {
try {
//进行消息拉取
final MetaMessageWrapper wrapper = this.messageQueue.poll(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS);
if (wrapper == null) {
return;
}
final Message message = wrapper.message;
this.collector.emit(this.scheme.deserialize(message.getData()), message.getId());
}
catch (final InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消息操作成功确认机制
public void ack(final Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = true;
wrapper.latch.countDown();
}
else {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
}
}
//消费失败时返回
public void fail(final Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = false;
wrapper.latch.countDown();
}
else {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
}
}
2.2 MetaqBolt
2.2.1 接口说明
该部分代码修改自Github上的Metaq异步生产者实例。设计这个Bolt的原因是,部分业务有这种需求,当经过storm实时处理后,数据发送到下一个业务系统,当下一个业务系统也是从metaq拉取数据时,就需要我们把处理过的数据写入到metaq中去,所以有了这个接口。
其读取配置文件的过程与MetaqSpout相似,但是没有组(Group)的概念,只需指定地址、目录及Topic(前提是Metaq上有该Topic),则可以把数据写入metaq中。
2.2.1 上代码
该部分代码较简单,可以参考AsyncConsumer代码。
//构造,传递配置路径
public MetaqBolt(String MetaqSpoutXml) {
super();
this.metaqspoutxml = MetaqSpoutXml;
}
//初始化操作
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
System.out.println("MetaqBolt -- Start!");
this.collector = collector;
// 初始化metaq的一些设置,包括zk链接地址,根目录等
this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;// "192.168.2.240:2181";
this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;// "/meta";
this.topic = MetaqSpoutXml.topic;
this.metaClientConfig.setZkConfig(this.zkConfig);
try {
this.sessionFactory = new MetaMessageSessionFactory(
this.metaClientConfig);
} catch (MetaClientException e) {
e.printStackTrace();
}
this.producer = this.sessionFactory.createProducer();
this.producer.publish(this.topic);// 发布topic
}
public void execute(Tuple input) {
String str = input.getString(0);
try {
this.sendResult = producer.sendMessage(new Message(this.topic, str
.getBytes()));
} catch (MetaClientException | InterruptedException e) {
e.printStackTrace();
}
//当生产失败时打印失败数据
if (!this.sendResult.isSuccess()) {
System.err.println("Send message failed,error message:"
+ sendResult.getErrorMessage());
}
}
3 代码改动说明
关于此次代码变动较大,加了一个spout源的接口,一个bolt的数据落地接口,对topology进行了优化。
具体如下:
(1) 增加了MetaqSpout接口,实现从MetaQ中读取数据(重点)
(2) 增加了MetaqBolt接口,实现新的数据落地接口,将数据写入MetaQ中
(3) 修改了Topology主类,实现了节点可配置,通过配置文件列表,即不同类型的spout及bolt可动态搭配,想要实现不同拓扑功能,不用修改代码,而只需修改配置即可(重点)
4 关于Metaq的报错
4.1 报错
根据错误提示,总是没找到其原因。
4.2 解决
才发现metaq往zk注册的服务器ip是192.168.122.1不是我本机的ip,之前对metaq进行配置的时候,并没有进行hostName配置,因为metaq据说默认的注册ip是localhost所以就没有注意了,但是好像这种情况来看,他进行zk注册的时候使用的是其代码内部的预留ip进行注册。
我在metaq的server.ini中进行了hostName配置,这个问题就解决了。
代码未能上传,有需要邮件留给我
转自新浪blog
相关推荐
此外,MetaQ还支持多级消费,允许消费者组内的不同实例并行消费消息,提升处理效率。 三、序列化对象的发送 在1.4.3版本中,MetaQ客户端支持发送序列化对象。序列化是将对象转换为字节流的过程,以便在网络间传输...
- **集群消费**:Consumer Group 内的实例平均分摊消息消费,类似于 JMS 中的点对点消息传递,保证每条消息只有一个消费者,且消费者和发送者之间无时间依赖性。 - **主动消费**:消费者主动向 Broker 请求消息,...
在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...
3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5《zookeeper统一配置》 6、storm项目-流数据监控系列6《最新代码树及详解》。希望能对大家有所...
3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5《zookeeper统一配置》 6、storm项目-流数据监控系列6《最新代码树及详解》。希望能对大家有所...
3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5《zookeeper统一配置》 6、storm项目-流数据监控系列6《最新代码树及详解》。希望能对大家有所...
《MetaQ服务器1.4.6.2版的深度解析》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于提供高可靠、高可用的消息传输服务。在本文中,我们将深入探讨MetaQ Server 1.4.6.2版本的核心特性、架构设计以及使用...
MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...
《Metaq在JDK 7下的异常及其解决策略》 Metaq是一款高性能的消息中间件,广泛应用于分布式系统中,提供高效、稳定的消息传递服务。然而,在JDK 7环境下,Metaq可能会遇到一些运行异常,其中最常见的就是与物理文件...
《Metaq详细手册》 Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息...
3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5《zookeeper统一配置》 6、storm项目-流数据监控系列6《最新代码树及详解》。希望能对大家有所...
《MetaQ服务器1.4.6.2版本详解》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。在1.4.6.2这个版本中,它继续保持着与原版一致的核心特性,提供高效、稳定、可扩展的...
MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...
软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ作为Storm的消息源之一是必须掌握的。该模拟项目中,使用MetaQ作为storm的消息源,MetaqSpout从指定的...
阿里消息中间件MetaQ学习Demo
"Storm实时处理方案架构" 本文档介绍了基于Storm的实时处理架构,该架构包括数据收集部分、实时处理部分和数据落地部分。本文将详细解释每个部分的技术选型和业务需求,并对相关技术的熟悉度进行分析。 1. 数据...