`

随笔记录

 
阅读更多
读取Kafka中的数据失败,
private transient KafkaStream<byte[], byte[]> kafkaStream ;
原因在于 transient 关键字,不适用序列化,而kafka数据的传输即通过序列化实现(暂时这样理解)
去掉这个关键字 ,程序正常
写入到kafka的数据将写到磁盘(写入时需要序列化)


kafka
topic 与 group
两个消费模式,队列与订阅
队列:单一消费
订阅:广播方式发送,每个消费者维护一个读取位置;
消息维护时间为设置的失效时间,到失效时间无论是否读取,均丢失
若消费者同组,为队列,否则,订阅


Spout
多个Spout向同一个Bolt发送数据
需设置多个Fields在Spout中
Bolt需通过不同的FieldsName进行接收
优化:
添加流名称,Spout中设置相同的FieldsName
修改两处
1.Spout中
declare.declareFields(new Fields("name"))
改为
declarer.declareStream(ACTIVITY_VISIT_PERSON_COUNT_WEB_STREAM_ID,new Fields("userId"));
2.Topology中
Bolt接收数据
builder.setBolt("ActivitySingleCumulateInvestRecordBolt", new ActivitySingleCumulateInvestRecordBolt(), parallelismSpout).shuffleGrouping("ActivitySingleCumulateInvestmentSpout");
改为
builder.setBolt("ActivityInvestAmountBolt", new ActivityInvestAmountBolt(),parallelismSpout)
.shuffleGrouping("ActivitySingleCumulateInvestRecordBolt",ActivitySingleCumulateInvestRecordBolt.MAIN_EVENT_ID_STREAM)
.shuffleGrouping("ActiveDataStatisticsStoringBDBolt",ActiveDataStatisticsStoringBDBolt.MAIN_EVENT_ID_STREAM)
.shuffleGrouping("ActiveDataStatisticsStoringHbBolt",ActiveDataStatisticsStoringHbBolt.MAIN_EVENT_ID_STREAM);

* 1.接收KAFAK中ANDROID与IOS端的数据
* topic 数据标签
* group 与AppDataResolvingAndStoringSpout 不同
* 不同组,为订阅者模式
* 相同组,为队列,单一消费者,会与AppDataResolvingAndStoringSpout消费者同时消费,
* 会出现读取数据为空的情况
* 2.发送数据
* streamId : ActivityVisitPersonCountAppSpout-StreamId
* fields : userId
* 注:
* fields 不允许重复;但Bolt接收数据时为方便按照FieldsName进行接收,需两个Spout中设置同名的Fields
* 通过不同的streamId区分Spout中同名的Fields
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics