`
农村外出务工男JAVA
  • 浏览: 105614 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

使用storm trident消费kafka消息

阅读更多

一、前言
    storm通过保证数据至少被处理一次来保证数据的完整性,由于元祖可以重发,对于一些需要数据精确的场景,可以考虑用storm trident实现。
    传统的事物型拓扑中存在几种bolt:
 1.1 BasicBolt
   这是最基本的Bolt,BasicBolt每次只能处理一个tuple,而且必须等前一个tuple成功处理后下一个tuple才能继续处理,显然效率不高。
 1.2 BatchBolt
   storm的一个优势就是能够批量处理tuple,BatchBolt支持批量处理tuple,每一个batch中的tuple都会调用execute(),处理完成后调用finishBatch方法。
 1.3 Committer BatchBolt
    标记为Committer的BatchBolt和基本的BasicBolt的区别在于二者调用finishBatch()的时机不同,标记为Committer的BatchBolt在提交阶段就会调用finishBatch()。

二、storm trident的使用
    storm目前的版本已经将事物拓扑的实现封装trident,trident目前支持3种不同的事物接口,一种是非事物型的(不介绍,因为基本不用),一种是事务性的TransactionalTridentKafkaSpout,而我们比较常用的是透明型事物OpaqueTridentKafkaSpout(事务型应用最重要的一点是要判断一批消息是新的还是已来过的)。
 2.1 TransactionalTridentKafkaSpout   

   原理是每次在数据库中存了txid,IPartitionedTransactionalSpout的每一个tuple都会绑定在固定的批次(batch)中。
   一个批次无论重发多少次,它也只有一个唯一且相同的事务ID,它所包含的内容都是完全一致的,而一个tuple无论被重发多少次只会在同一个批次里。
    使用方式如下:

    TridentTopology topology = new TridentTopology();
        TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(zkHosts, topic, spoutId);
        tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new ConvertStringScheme());
        /**
         * 支持事物,支持失败重发
         *
         */
        TransactionalTridentKafkaSpout transactionalTridentKafkaSpout = new TransactionalTridentKafkaSpout(
                tridentKafkaConfig);
        topology.newStream("name",transactionalTridentKafkaSpout)
                .shuffle()
                .each(new Fields("msg"), new SpilterFunction(), new Fields("sentence"))
                .groupBy(new Fields("sentence"))
                .aggregate(new Fields("sentence"), new SumWord(),new Fields("sum"))
                .parallelismHint(5)
                .each(new Fields("sum"), new PrintFilter_partition());
        Config config = new Config();
        StormSubmitter.submitTopology("XXX", config,
                topology.build());

    但貌似目前TransactionalTridentKafkaSpout有个bug,启动会报:classCastException(非代码问题)

   具体可参考:

issue:https://issues.apache.org/jira/browse/STORM-1728

   然而我们可以想到的是,IPartitionedTransactionalSpout会有一个问题,假设一批消息在被bolt消费过程中失败了,需要spout重发,此时如果正巧遇到消息发送中间件故障,
例如某一个分区不可读,spout为了保证重发时每一批次包含的tuple一致,它只能等待消息中间件恢复,也就是卡在那里无法再继续发送给bolt消息了,直至消息中间件恢复(因为它必须发送一样的Batch)。
 2.2 OpaqueTridentKafkaSpout
      IOpaquePartitionedTransactionalSpout不保证每次重发一个批次的消息所包含的tuple完全一致。也就是说某个tuple可能第一次在txid=1的批次中出现,后面有可能在txid=3的批次中出现。这种情况只出现在当某一批次消息消费失败需要重发且恰巧消息中间件故障时。这时,IOpaquePartitionedTransactionalSpout不是等待消息中间件故障恢复,而是先读取可读的partition。例如txid=1的批次在消费过程中失败了,需要重发,恰巧消息中间件的16个分区有1个分区(partition=3)因为故障不可读了。这时候IOpaquePartitionedTransactionalSpout会先读另外的15个分区,完成txid=1这个批次的发送,这时候同样的批次其实包含的tuple已经少了。假设在txid=3时消息中间件的故障恢复了,那之前在txid=1且在分区partition=3的还没有被发送的tuple会被重新发送, 包含在txid=3的批次中,所以其不保证每批次的batch包含的tuple是一样的。
  2.2.1 实战
   首先搭建好zk,kafka,storm的分布式环境,先起zk,然后kafka然后storm.分别启动后效果jps看一下

  master机器:

  slave1机器:

  slave2机器:

 hosts里面配置

  2.2.1.1 创建topic

  2.2.1.2 写storm消费端

  main方法

  public static void main(String[] args) throws AlreadyAliveException,
            InvalidTopologyException, AuthorizationException {
        TridentTopology topology = new TridentTopology();
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, topic);
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(
                kafkaConfig);
        topology.newStream("test_kafka2storm_opaqueTrident",
                opaqueTridentKafkaSpout)
                .parallelismHint(3)
                .shuffle()
                .each(new Fields("str"), new SpilterFunction(), new Fields("sentence"))
                .groupBy(new Fields("sentence"))
                .aggregate(new Fields("sentence"), new SumWord(),
                        new Fields("sum")).parallelismHint(5)
                .each(new Fields("sum"), new PrintFilter_partition());
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(2);
        StormSubmitter.submitTopology("test_kafka2storm_opaqueTrident_topology", config,
                topology.build());
    }

   SpilterFunction:

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

public class SpilterFunction extends BaseFunction {

	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		String sentens = tuple.getString(0);
		String[] array = sentens.split("\\s+");
		for(int i=0;i<array.length;i++){
			System.out.println("spilter emit:" + array[i]);
			collector.emit(new Values(array[i]));
		}
	}

}
   SumWord:
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

public class SumWord extends BaseAggregator<Map<String,Integer>> {

	private static final long serialVersionUID = 1L;

	/**
	 * 属于哪个batch
	 */
	private Object batchId;
	
	/**
	 * 属于哪个分区
	 */
	private int partitionId;
	
	/**
	 * 分区数量
	 */
	private int numPartitions;
	
	/**
	 * 用来统计
	 */
	private Map<String,Integer> state;
	
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		state = new HashMap<String,Integer>();
		partitionId = context.getPartitionIndex();
		numPartitions = context.numPartitions();
	}
	@Override
	public Map<String, Integer> init(Object batchId, TridentCollector collector) {
		this.batchId = batchId;
		return state;
	}
	@Override
	public void aggregate(Map<String, Integer> val, TridentTuple tuple,
			TridentCollector collector) {
		System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions
				+",batchId:" + batchId);
		String word = tuple.getString(0);
		val.put(word, MapUtils.getInteger(val, word, 0)+1);
		System.out.println("sumWord:" + val);
	}
	@Override
	public void complete(Map<String, Integer> val, TridentCollector collector) {
		collector.emit(new Values(val));
	}

}
    PrintFilter_partition:
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrintFilter_partition extends BaseFilter {
	
	 private static final Logger LOGGER = 

			 LoggerFactory.getLogger(PrintFilter_partition.class);

	private static final long serialVersionUID = 1L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		LOGGER.info("打印出来的tuple:" + tuple);
		return true;
	}
}

 

2.2.1.2 写kafka生产端往topic推送数据

  Kafka生产者封装

package com.kafka.singleton;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaProducerSingleton {

	private static final Logger LOGGER = LoggerFactory
			.getLogger(KafkaProducerSingleton.class);

	private static KafkaProducer<String, String> kafkaProducer;

	private Random random = new Random();

	private String topic;

	private int retry;

	private KafkaProducerSingleton() {

	}
	

	/**
	 * 静态内部类
	 * 
	 * @author tanjie
	 *
	 */
	private static class LazyHandler {

		private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
	}

	/**
	 * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
	 * 
	 * @return
	 */
	public static final KafkaProducerSingleton getInstance() {
		return LazyHandler.instance;
	}

	/**
	 * kafka生产者进行初始化
	 * 
	 * @return KafkaProducer
	 */
	public void init(String topic,int retry) {
		this.topic = topic;
	    this.retry = retry;
		if (null == kafkaProducer) {
			Properties props = new Properties();
			InputStream inStream = null;
			try {
				inStream = this.getClass().getClassLoader()
						.getResourceAsStream("kafka.properties");
				props.load(inStream);
				kafkaProducer = new KafkaProducer<String, String>(props);
			} catch (IOException e) {
				LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
			} finally {
				if (null != inStream) {
					try {
						inStream.close();
					} catch (IOException e) {
						LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
					}
				}
			}
		}
	}

	/**
	 * 通过kafkaProducer发送消息
	 * 
	 * @param topic
	 *            消息接收主题
	 * @param partitionNum
	 *            哪一个分区
	 * @param retry
	 *            重试次数
	 * @param message
	 *            具体消息值
	 */
	public void sendKafkaMessage(final String message) {
		/**
		 * 1、如果指定了某个分区,会只讲消息发到这个分区上 2、如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用
		 * 3、如果没有指定分区和key,那么将会随机发送到topic的分区中 4、如果指定了key,那么将会以hash<key>的方式发送到分区中
		 */
		ProducerRecord<String, String> record = new ProducerRecord<String, String>(
				topic, random.nextInt(3), "", message);
		// send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率
		// kafka生产者是线程安全的,可以单实例发送消息
		kafkaProducer.send(record, new Callback() {
			public void onCompletion(RecordMetadata recordMetadata,
					Exception exception) {
				if (null != exception) {
					LOGGER.error("kafka发送消息失败:" + exception.getMessage(),
							exception);
					retryKakfaMessage(message);
				}
			}
		});
	}

	/**
	 * 当kafka消息发送失败后,重试
	 * 
	 * @param retryMessage
	 */
	private void retryKakfaMessage(final String retryMessage) {
		ProducerRecord<String, String> record = new ProducerRecord<String, String>(
				topic, random.nextInt(3), "", retryMessage);
		for (int i = 1; i <= retry; i++) {
			try {
				kafkaProducer.send(record);
				return;
			} catch (Exception e) {
				LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
				retryKakfaMessage(retryMessage);
			}
		}
	}

	/**
	 * kafka实例销毁
	 */
	public void close() {
		if (null != kafkaProducer) {
			kafkaProducer.close();
		}
	}

	public String getTopic() {
		return topic;
	}

	public void setTopic(String topic) {
		this.topic = topic;
	}

	public int getRetry() {
		return retry;
	}

	public void setRetry(int retry) {
		this.retry = retry;
	}

}

   kafka.properties

# Configure kafka
bootstrap.servers=kafka集群地址
acks=1
retries=0
batch.size=1000
compression.type=gzip
linger.ms=10
#buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

   HandlerProducer:

package com.kafka.singleton;

public class HandlerProducer implements Runnable {

	private String message;

	public HandlerProducer(String message) {
		this.message = message;
	}

	@Override
	public void run() {
		KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton
				.getInstance();
		kafkaProducerSingleton.init("test_find",3);
		kafkaProducerSingleton.sendKafkaMessage("发送消息" + message);
	}

}

    测试:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class Kafka生产_多线程单实例 {
	
	@Test
	public void testSendMessageSingleton() throws InterruptedException {
		ExecutorService executor = Executors.newFixedThreadPool(3);
		executor.submit(new HandlerProducer("zs is a good good man it is 18"));
	}
}

   查看storm的supervisor上的日志:

   slave2机器上

2016-12-22 16:37:51.990 STDIO [INFO] spilter emit:zs
2016-12-22 16:37:51.992 STDIO [INFO] spilter emit:is
2016-12-22 16:37:51.993 STDIO [INFO] spilter emit:a
2016-12-22 16:37:51.993 STDIO [INFO] spilter emit:good
2016-12-22 16:37:51.996 STDIO [INFO] spilter emit:good
2016-12-22 16:37:51.997 STDIO [INFO] [zs];partitionId=3;partitions=5,batchId:6087:0
2016-12-22 16:37:51.998 STDIO [INFO] [is];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:51.999 STDIO [INFO] spilter emit:man
2016-12-22 16:37:52.005 STDIO [INFO] spilter emit:it
2016-12-22 16:37:52.009 STDIO [INFO] spilter emit:is
2016-12-22 16:37:52.009 STDIO [INFO] spilter emit:18
2016-12-22 16:37:52.015 STDIO [INFO] sumWord:{zs=1}
2016-12-22 16:37:52.017 STDIO [INFO] [a];partitionId=3;partitions=5,batchId:6087:0
2016-12-22 16:37:52.017 STDIO [INFO] sumWord:{zs=1, a=1}
2016-12-22 16:37:52.018 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{zs=1, a=1}]
2016-12-22 16:37:52.019 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{zs=1, a=1}]
2016-12-22 16:37:52.020 STDIO [INFO] sumWord:{is=1}
2016-12-22 16:37:52.022 STDIO [INFO] [good];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.022 STDIO [INFO] sumWord:{is=1, good=1}
2016-12-22 16:37:52.023 STDIO [INFO] [good];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.029 STDIO [INFO] sumWord:{is=1, good=2}
2016-12-22 16:37:52.029 STDIO [INFO] [is];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.030 STDIO [INFO] sumWord:{is=2, good=2}
2016-12-22 16:37:52.031 STDIO [INFO] [18];partitionId=1;partitions=5,batchId:6087:0
2016-12-22 16:37:52.031 STDIO [INFO] sumWord:{is=2, 18=1, good=2}
2016-12-22 16:37:52.032 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{is=2, 18=1, good=2}]
2016-12-22 16:37:52.032 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{is=2, 18=1, good=2}]
2016-12-22 16:37:52.032 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{is=2, 18=1, good=2}]

    slave1机器上

2016-12-22 16:37:54.530 STDIO [INFO] [man];partitionId=2;partitions=5,batchId:6087:0
2016-12-22 16:37:54.543 STDIO [INFO] sumWord:{man=1}
2016-12-22 16:37:54.544 STDIO [INFO] [it];partitionId=2;partitions=5,batchId:6087:0
2016-12-22 16:37:54.544 STDIO [INFO] sumWord:{it=1, man=1}
2016-12-22 16:37:54.545 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{it=1, man=1}]
2016-12-22 16:37:54.545 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{it=1, man=1}]
2
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics