一、前言
storm通过保证数据至少被处理一次来保证数据的完整性,由于元祖可以重发,对于一些需要数据精确的场景,可以考虑用storm trident实现。
传统的事物型拓扑中存在几种bolt:
1.1 BasicBolt
这是最基本的Bolt,BasicBolt每次只能处理一个tuple,而且必须等前一个tuple成功处理后下一个tuple才能继续处理,显然效率不高。
1.2 BatchBolt
storm的一个优势就是能够批量处理tuple,BatchBolt支持批量处理tuple,每一个batch中的tuple都会调用execute(),处理完成后调用finishBatch方法。
1.3 Committer BatchBolt
标记为Committer的BatchBolt和基本的BasicBolt的区别在于二者调用finishBatch()的时机不同,标记为Committer的BatchBolt在提交阶段就会调用finishBatch()。
二、storm trident的使用
storm目前的版本已经将事物拓扑的实现封装trident,trident目前支持3种不同的事物接口,一种是非事物型的(不介绍,因为基本不用),一种是事务性的TransactionalTridentKafkaSpout,而我们比较常用的是透明型事物OpaqueTridentKafkaSpout(事务型应用最重要的一点是要判断一批消息是新的还是已来过的)。
2.1 TransactionalTridentKafkaSpout
原理是每次在数据库中存了txid,IPartitionedTransactionalSpout的每一个tuple都会绑定在固定的批次(batch)中。
一个批次无论重发多少次,它也只有一个唯一且相同的事务ID,它所包含的内容都是完全一致的,而一个tuple无论被重发多少次只会在同一个批次里。
使用方式如下:
TridentTopology topology = new TridentTopology();
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(zkHosts, topic, spoutId);
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new ConvertStringScheme());
/**
* 支持事物,支持失败重发
*
*/
TransactionalTridentKafkaSpout transactionalTridentKafkaSpout = new TransactionalTridentKafkaSpout(
tridentKafkaConfig);
topology.newStream("name",transactionalTridentKafkaSpout)
.shuffle()
.each(new Fields("msg"), new SpilterFunction(), new Fields("sentence"))
.groupBy(new Fields("sentence"))
.aggregate(new Fields("sentence"), new SumWord(),new Fields("sum"))
.parallelismHint(5)
.each(new Fields("sum"), new PrintFilter_partition());
Config config = new Config();
StormSubmitter.submitTopology("XXX", config,
topology.build());
但貌似目前TransactionalTridentKafkaSpout有个bug,启动会报:classCastException(非代码问题)
具体可参考:
issue:https://issues.apache.org/jira/browse/STORM-1728
然而我们可以想到的是,IPartitionedTransactionalSpout会有一个问题,假设一批消息在被bolt消费过程中失败了,需要spout重发,此时如果正巧遇到消息发送中间件故障,
例如某一个分区不可读,spout为了保证重发时每一批次包含的tuple一致,它只能等待消息中间件恢复,也就是卡在那里无法再继续发送给bolt消息了,直至消息中间件恢复(因为它必须发送一样的Batch)。
2.2 OpaqueTridentKafkaSpout
IOpaquePartitionedTransactionalSpout不保证每次重发一个批次的消息所包含的tuple完全一致。也就是说某个tuple可能第一次在txid=1的批次中出现,后面有可能在txid=3的批次中出现。这种情况只出现在当某一批次消息消费失败需要重发且恰巧消息中间件故障时。这时,IOpaquePartitionedTransactionalSpout不是等待消息中间件故障恢复,而是先读取可读的partition。例如txid=1的批次在消费过程中失败了,需要重发,恰巧消息中间件的16个分区有1个分区(partition=3)因为故障不可读了。这时候IOpaquePartitionedTransactionalSpout会先读另外的15个分区,完成txid=1这个批次的发送,这时候同样的批次其实包含的tuple已经少了。假设在txid=3时消息中间件的故障恢复了,那之前在txid=1且在分区partition=3的还没有被发送的tuple会被重新发送, 包含在txid=3的批次中,所以其不保证每批次的batch包含的tuple是一样的。
2.2.1 实战
首先搭建好zk,kafka,storm的分布式环境,先起zk,然后kafka然后storm.分别启动后效果jps看一下
master机器:
slave1机器:
slave2机器:
hosts里面配置
2.2.1.1 创建topic
2.2.1.2 写storm消费端
main方法
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, AuthorizationException {
TridentTopology topology = new TridentTopology();
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, topic);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(
kafkaConfig);
topology.newStream("test_kafka2storm_opaqueTrident",
opaqueTridentKafkaSpout)
.parallelismHint(3)
.shuffle()
.each(new Fields("str"), new SpilterFunction(), new Fields("sentence"))
.groupBy(new Fields("sentence"))
.aggregate(new Fields("sentence"), new SumWord(),
new Fields("sum")).parallelismHint(5)
.each(new Fields("sum"), new PrintFilter_partition());
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(2);
StormSubmitter.submitTopology("test_kafka2storm_opaqueTrident_topology", config,
topology.build());
}
SpilterFunction:
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;
public class SpilterFunction extends BaseFunction {
private static final long serialVersionUID = 1L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentens = tuple.getString(0);
String[] array = sentens.split("\\s+");
for(int i=0;i<array.length;i++){
System.out.println("spilter emit:" + array[i]);
collector.emit(new Values(array[i]));
}
}
}
SumWord:
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;
public class SumWord extends BaseAggregator<Map<String,Integer>> {
private static final long serialVersionUID = 1L;
/**
* 属于哪个batch
*/
private Object batchId;
/**
* 属于哪个分区
*/
private int partitionId;
/**
* 分区数量
*/
private int numPartitions;
/**
* 用来统计
*/
private Map<String,Integer> state;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
state = new HashMap<String,Integer>();
partitionId = context.getPartitionIndex();
numPartitions = context.numPartitions();
}
@Override
public Map<String, Integer> init(Object batchId, TridentCollector collector) {
this.batchId = batchId;
return state;
}
@Override
public void aggregate(Map<String, Integer> val, TridentTuple tuple,
TridentCollector collector) {
System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions
+",batchId:" + batchId);
String word = tuple.getString(0);
val.put(word, MapUtils.getInteger(val, word, 0)+1);
System.out.println("sumWord:" + val);
}
@Override
public void complete(Map<String, Integer> val, TridentCollector collector) {
collector.emit(new Values(val));
}
}
PrintFilter_partition:
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PrintFilter_partition extends BaseFilter {
private static final Logger LOGGER =
LoggerFactory.getLogger(PrintFilter_partition.class);
private static final long serialVersionUID = 1L;
@Override
public boolean isKeep(TridentTuple tuple) {
LOGGER.info("打印出来的tuple:" + tuple);
return true;
}
}
2.2.1.2 写kafka生产端往topic推送数据
Kafka生产者封装
package com.kafka.singleton;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class KafkaProducerSingleton {
private static final Logger LOGGER = LoggerFactory
.getLogger(KafkaProducerSingleton.class);
private static KafkaProducer<String, String> kafkaProducer;
private Random random = new Random();
private String topic;
private int retry;
private KafkaProducerSingleton() {
}
/**
* 静态内部类
*
* @author tanjie
*
*/
private static class LazyHandler {
private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
}
/**
* 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
*
* @return
*/
public static final KafkaProducerSingleton getInstance() {
return LazyHandler.instance;
}
/**
* kafka生产者进行初始化
*
* @return KafkaProducer
*/
public void init(String topic,int retry) {
this.topic = topic;
this.retry = retry;
if (null == kafkaProducer) {
Properties props = new Properties();
InputStream inStream = null;
try {
inStream = this.getClass().getClassLoader()
.getResourceAsStream("kafka.properties");
props.load(inStream);
kafkaProducer = new KafkaProducer<String, String>(props);
} catch (IOException e) {
LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
} finally {
if (null != inStream) {
try {
inStream.close();
} catch (IOException e) {
LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
}
}
}
}
}
/**
* 通过kafkaProducer发送消息
*
* @param topic
* 消息接收主题
* @param partitionNum
* 哪一个分区
* @param retry
* 重试次数
* @param message
* 具体消息值
*/
public void sendKafkaMessage(final String message) {
/**
* 1、如果指定了某个分区,会只讲消息发到这个分区上 2、如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用
* 3、如果没有指定分区和key,那么将会随机发送到topic的分区中 4、如果指定了key,那么将会以hash<key>的方式发送到分区中
*/
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", message);
// send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率
// kafka生产者是线程安全的,可以单实例发送消息
kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata,
Exception exception) {
if (null != exception) {
LOGGER.error("kafka发送消息失败:" + exception.getMessage(),
exception);
retryKakfaMessage(message);
}
}
});
}
/**
* 当kafka消息发送失败后,重试
*
* @param retryMessage
*/
private void retryKakfaMessage(final String retryMessage) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", retryMessage);
for (int i = 1; i <= retry; i++) {
try {
kafkaProducer.send(record);
return;
} catch (Exception e) {
LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
retryKakfaMessage(retryMessage);
}
}
}
/**
* kafka实例销毁
*/
public void close() {
if (null != kafkaProducer) {
kafkaProducer.close();
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getRetry() {
return retry;
}
public void setRetry(int retry) {
this.retry = retry;
}
}
kafka.properties
# Configure kafka
bootstrap.servers=kafka集群地址
acks=1
retries=0
batch.size=1000
compression.type=gzip
linger.ms=10
#buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
HandlerProducer:
package com.kafka.singleton;
public class HandlerProducer implements Runnable {
private String message;
public HandlerProducer(String message) {
this.message = message;
}
@Override
public void run() {
KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton
.getInstance();
kafkaProducerSingleton.init("test_find",3);
kafkaProducerSingleton.sendKafkaMessage("发送消息" + message);
}
}
测试:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class Kafka生产_多线程单实例 {
@Test
public void testSendMessageSingleton() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new HandlerProducer("zs is a good good man it is 18"));
}
}
查看storm的supervisor上的日志:
slave2机器上
2016-12-22 16:37:51.990 STDIO [INFO] spilter emit:zs
2016-12-22 16:37:51.992 STDIO [INFO] spilter emit:is
2016-12-22 16:37:51.993 STDIO [INFO] spilter emit:a
2016-12-22 16:37:51.993 STDIO [INFO] spilter emit:good
2016-12-22 16:37:51.996 STDIO [INFO] spilter emit:good
2016-12-22 16:37:51.997 STDIO [INFO] [zs];partitionId=3;partitions=5,batchId:6087:0
2016-12-22 16:37:51.998 STDIO [INFO] [is];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:51.999 STDIO [INFO] spilter emit:man
2016-12-22 16:37:52.005 STDIO [INFO] spilter emit:it
2016-12-22 16:37:52.009 STDIO [INFO] spilter emit:is
2016-12-22 16:37:52.009 STDIO [INFO] spilter emit:18
2016-12-22 16:37:52.015 STDIO [INFO] sumWord:{zs=1}
2016-12-22 16:37:52.017 STDIO [INFO] [a];partitionId=3;partitions=5,batchId:6087:0
2016-12-22 16:37:52.017 STDIO [INFO] sumWord:{zs=1, a=1}
2016-12-22 16:37:52.018 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{zs=1, a=1}]
2016-12-22 16:37:52.019 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{zs=1, a=1}]
2016-12-22 16:37:52.020 STDIO [INFO] sumWord:{is=1}
2016-12-22 16:37:52.022 STDIO [INFO] [good];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.022 STDIO [INFO] sumWord:{is=1, good=1}
2016-12-22 16:37:52.023 STDIO [INFO] [good];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.029 STDIO [INFO] sumWord:{is=1, good=2}
2016-12-22 16:37:52.029 STDIO [INFO] [is];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.030 STDIO [INFO] sumWord:{is=2, good=2}
2016-12-22 16:37:52.031 STDIO [INFO] [18];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.031 STDIO [INFO] sumWord:{is=2, 18=1, good=2}
2016-12-22 16:37:52.032 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{is=2, 18=1, good=2}]
2016-12-22 16:37:52.032 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{is=2, 18=1, good=2}]
2016-12-22 16:37:52.032 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{is=2, 18=1, good=2}]
slave1机器上
2016-12-22 16:37:54.530 STDIO [INFO] [man];partitionId=2;partitions=5,batchId:6087:0
2016-12-22 16:37:54.543 STDIO [INFO] sumWord:{man=1}
2016-12-22 16:37:54.544 STDIO [INFO] [it];partitionId=2;partitions=5,batchId:6087:0
2016-12-22 16:37:54.544 STDIO [INFO] sumWord:{it=1, man=1}
2016-12-22 16:37:54.545 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{it=1, man=1}]
2016-12-22 16:37:54.545 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{it=1, man=1}]
分享到:
相关推荐
在Trident中,这通过实现`ISpout`接口来完成,可以是简单的消息队列(如Kafka)消费者,也可以是自定义的数据源。 2. **预处理(Transform)**:接收到的数据流可能需要预处理,例如解析日志条目,提取与PV相关的...
首先,storm-kafka是专门为Apache Storm设计的Kafka消费者组件,它允许Storm拓扑从Kafka主题中消费消息。在storm-kafka-0.8-plus版本中,它提供了一种高效且可靠的从Kafka读取数据的方式。开发者可以通过GitHub...
Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...
- 掌握Storm Trident API使用,它为高级流处理提供了状态管理和事务处理机制。 - 了解Storm的事务拓扑和状态管理机制,以及它们对运维调优的影响。 7. 实时数据分析: - 利用Storm的实时分析能力进行数据聚合、...
Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容
- **Storm Trident项目**:特别值得一提的是,其中一个项目是完全使用Storm Trident开发完成的。Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 *...
卡夫卡风暴这是从Kafka中读取并在Elastic Search中建立索引的简单Apache Storm Trident拓扑。 ##运行此Storm拓扑所需的设置### 1)Zookeeper。 Download from ...
三叉戟-kafka-without-mongodb-state-impl ... ./storm jar /home/ec2-user/software/trident-kafka-without-state-0.0.1-SNAPSHOT.jar com.poc.trident.topology.cluster.GeoPubAggByMinuteTopology 0
4. **Storm与Kafka集成**:Kafka作为一个分布式消息系统,常与Storm结合使用,形成实时数据管道。课程将涵盖如何配置和使用Storm消费者连接Kafka,实现数据的实时处理和传输。 5. **HighCharts图表开发与实时数据...
在本项目中,Storm作为分布式实时计算框架,负责处理从Kafka消费的消息,进行业务逻辑处理,并将结果数据存入HBase,同时通过HighCharts生成可视化图表展示在Web端。 在项目的第一阶段,重点在于地区销售额的实时...
在项目中,我们使用了storm-kafka模块,它允许Storm与Kafka进行无缝集成,使Spout能够消费Kafka中的消息。 3. **HighCharts图表**: - HighCharts是一个JavaScript库,用于创建高质量的图表。在项目中,我们利用...
项目中讲解了Kafka的基本概念、集群搭建、Java API开发测试以及storm-kafka插件的使用,强调了Kafka的拉取(Pull)消费模式和分区与副本的概念。 3. **CDH5**:Cloudera Data Hub (CDH5) 是一个全面的Hadoop发行版...
01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka...06.Kafka基本操作和最优设置07.Kafka Java API 简单开发测试08.storm...
01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka...06.Kafka基本操作和最优设置07.Kafka Java API 简单开发测试08.storm...
- **Spout**: Spout 是数据流的源头,它可以是任何数据源,如 Kafka、Kinesis 或者简单的消息队列。在 Storm 0.9 中,理解 Spout 如何发布数据到拓扑至关重要。 - **Topology**: 一个 Storm 应用由多个 Bolt 和 ...
2. **事务处理**:通过 Trident API 提供了可靠的消息处理能力,确保消息的处理一致性。 3. **并发与通信机制**:每个Spout和Bolt可以有多个实例并发运行,通过TCP/IP进行进程间的通信。 **六、Storm应用场景** 1....
01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka...06.Kafka基本操作和最优设置07.Kafka Java API 简单开发测试08.storm...
5. **Storm与其他大数据工具的集成**:Storm可以与其他大数据生态系统中的组件,如Hadoop HDFS、Cassandra、Kafka等无缝集成,形成强大的实时数据处理管道。了解这些集成可以帮助优化整体解决方案。 6. **配置与...
2. **Kafka与Storm集成**:Storm-kafka是Storm用于连接和消费Kafka数据的组件。理解如何配置和使用storm-kafka,确保数据能够正确地从Kafka流到Storm的Spout,是项目实施的关键步骤。 3. **双纵轴图表开发**:...
9. **进阶主题**:如多语言支持、 Trident API的使用、与其他大数据组件(如Hadoop、Kafka)的集成等。 通过这个压缩包的学习,用户可以从基础到进阶全面了解并掌握Apache Storm,从而在实际工作中有效地利用这一...