Consumer(不管pull,push) 获得数据都会发送 topic,queueId,brokerName offset ,请求数据量。
来分析分析传递的参数
topic,这个明显的不会有变化。
brokerName 每个broker是无状态的,如果broker挂了,consumer可以重新拿去其他borker的 MessageQueue 信息。所以影响也不大。
queueId 每个topic都有读写 queue,写queue是一个逻辑行为没有实际行为。一个读队列对应一个写队列。
看下 queueId在Producer的行为,每个线程轮询MessageQueue的list,queue数据不均衡而已。
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}
@Override
public String toString() {
return "ThreadLocalIndex{" +
"threadLocalIndex=" + threadLocalIndex.get() +
'}';
}
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
PushConsumer模式主动去请求了数据,正常情况不需要我们关心
PullConsumer模式需要开发自己轮询MessageQueue,一个错误行为是: 一个个messagequeue消费完。
关于queueId需要注意的地方 就是queue的增删
1. 读写队列长度必须一致
2. 不要随意增删,如果增删队列需要通知 consumer与Producer 重新拉取 queue信息,如果你有额外的操作,可能需要更多处理,所以不建议增删队列。按照最大并发量与数据库合理初始化最大 queue。
offset 最大的问题。broker 不会对 offset进行维护,也无法进行持久化。
client 需要自己维护。而维护的两个类RemoteBrokerOffsetStore与LocalFileOffsetStore。都无法正真解决这些问题。
LocalFileOffsetStore (PullComsumer 默认实现方式)直接保存在本地
RemoteBrokerOffsetStore,(PushConsumer 默认实现方式)
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
// TODO Here may be heavily overhead for Name Server,need tuning
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
//发送到服务端
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
SlaveSynchronize, 会把 salve保存的 offset 数据同步到 master
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
问题
- 但是 broker 不会对 offset数据持久化,对于我们这些小公司来说,就是一个问题。
- slave offset 同步 master 是有延迟的。
3. 如果是单Comsumer 使用RemoteBrokerOffsetStore是没有问题的。如果多个comsumer,无法保证原子操作,如果请求重复,需要过滤数据。得不偿失。无法持久化offset
这里几段很有意思的代码
brokerAllowSuspend 永远为true
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
······
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
······
·····
}
public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
final boolean subscription, final boolean classFilter) {
int flag = 0;
if (commitOffset) {
flag |= FLAG_COMMIT_OFFSET;
}
if (suspend) {
flag |= FLAG_SUSPEND;
}
if (subscription) {
flag |= FLAG_SUBSCRIPTION;
}
if (classFilter) {
flag |= FLAG_CLASS_FILTER;
}
return flag;
}
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,...
if (findBrokerResult != null) {
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setSysFlag(sysFlagInner);
public static int clearCommitOffsetFlag(final int sysFlag) {
return sysFlag & (~FLAG_COMMIT_OFFSET);
}
class PullMessageProcessor{
private RemotingCommand processRequest(final Channel channel ....
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
}
}
// 第一段代码使 storeOffsetEnable = true
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
hasCommitOffsetFlag 在client基本被应变成了 偶数。所以 storeOffsetEnable 是 false,不懂移位,所以可能哪里疏忽了。
总结来说,broker 与 client维护offset 太复杂了,太麻烦了。
首先关注 offset的持久化。
- 写入MySQL 性能方面复杂可能比较大
- 写入redis性能负载比较小,选择redis
操作原子性 首选 redis。
Comsumer设计方案
不管哪个方案,都要对 MessageQueue 进行维护
第一个方案 Producer主导。缺点是需要producer进行主动,一个消息需要一个redis请求,麻烦。
-- 每次启动消费者,获得消费队列,就去缓存看是否存在对应的队列,如果存在什么都不做,如果不存在就初始化为0,目前是足够了。
-- 这样一个生产组,对应一个消费组
-- 目前只支持当maset模式,消费者一个请求线程,几个处理线程
-- 初始化 topic 数据 ,topic+brokerName 值为 队列id,生产多少个
redis:zadd("topic_a",0,"1",1,"2" ,0,"3")
-- 查询操作 得到最大值得队列
local rek = redis:zrevrange( "topic_a" , 0 , 0 , "WITHSCORES" )
redis.log( redis.LOG_NOTICE , rek)
local production = rek[1][2]
if production > 0 then
--一次消费多少个? 是全部消费,还是固定消费
local consume = ( production >= 100 and {100} or { production })[1]
redis:zincrby("test_topic_a" , 0 - consume, rek[1][1])
--得到 是否
local queneStr = redis:hget("")
local quene = cjson.decode( queneStr )
quene["consume"] = consume
queneStr = cjson.encode( quene )
quene["offset"] = quene["offset"] + consume
redis:hset( "" , cjson.encode( quene ) )
return queneStr
else
--1
return "false"
end
相关推荐
总结来说,"d2rq-0.8.1.zip"提供了连接数据库与知识图谱的关键桥梁,使得丰富的结构化数据能够参与到Web的语义化进程中,为大数据分析、智能检索和决策支持提供了新的可能。了解并熟练运用D2RQ,能够极大地推动企业...
此外,对于研究者来说,D2RQ也是将学术数据库转换为RDF以便于进行知识发现的有力工具。 综上所述,D2RQ是一个强大且灵活的工具,用于连接传统数据库和语义网,它提供了Windows和Linux的双平台支持,满足不同用户的...
《D2RQ系统详解与知识图谱构建》 D2RQ系统是连接数据库与RDF(Resource Description Framework)模型的重要桥梁,尤其...对于希望将企业内部数据转化为可共享的开放知识资源的组织来说,理解和掌握D2RQ是至关重要的。
总的来说,《联想服务器RQ940系统用户手册》是用户了解、配置和管理RQ940服务器不可或缺的工具,它涵盖了从初始设置到日常运维的所有环节,确保用户能够有效地利用这款强大服务器,提升业务效率。
《D2RQ-0.8.1:链接数据库与语义网的桥梁》 D2RQ系统是连接传统关系数据库(RDB)与语义网(Semantic Web)的重要工具,其...对于那些希望将内部数据公开或与全球信息网络融合的组织来说,D2RQ是一个值得考虑的工具。
在IT行业中,数据处理是一项至关重要的任务...总结来说,Excel转换为RQ文件的过程涉及到数据的预处理、格式转换以及对目标系统数据规范的理解。理解这些步骤并熟练掌握,将有助于在esale系统中高效地进行数据导入工作。
d2rq.zip安装包,官网上下不下来的,我用github下下来的,需要的朋友可以下载。 D2RQ exposes the contents of relational databases as RDF. It consists of: The D2RQ Mapping Language. Use it to write ...
RQ3.0修正版 RQ3.0修正版 RQ3.0修正版 RQ3.0修正版 RQ3.0修正版
综上所述,“瑞基资料RQ系列接线”文档详细介绍了RQ系列产品的接线方式及其各部分的功能,这对于从事自动化控制领域的技术人员来说是一份非常宝贵的参考资料。通过深入理解文档中的每个知识点,可以更好地掌握RQ系列...
此外,电气特性部分的数据对于电路设计人员来说至关重要,其中包括了晶体管的开关特性、漏电流等关键参数。所有这些参数对于确定器件在特定应用中的性能表现至关重要。 RQ3E120AT的这些特性使其成为电源管理和开关...
总结来说,JENA和D2RQ是构建语义网应用的重要工具,它们帮助企业或组织利用现有的关系数据库资源,参与到语义网的世界中。通过将数据库中的数据转换为RDF,开发者可以利用JENA提供的各种功能,创建出更智能、更具互...
松下超薄磁带随身听RQ-SX70v说明书PDF版,随身听是指体积小、重量轻便于随身携带的媒体播放器,由索尼创造的Walkman为代表,松下,爱华等日系品牌,飞利浦等欧系品牌,Bose,苹果等美系品牌见证了随身听的由盛到衰。...
### 联想服务rq940:ThinkServer RQ940系统用户手册解析 #### 一、产品概述 **联想ThinkServer RQ940**是一款高性能服务器产品,适用于需要强大计算能力和高可靠性的业务场景。此款服务器不仅具备出色的性能,还...
### RQ940系统用户手册关键知识点解析 #### 一、产品概述 **RQ940系统**是一款由联想公司推出的服务器解决方案,旨在为用户提供高性能、高可靠性和易于管理的服务体验。根据用户手册提供的信息,我们可以了解到该...
开利中央空调30RB、30RQ电气维修手册
QFN32 Allergro封装(FT232RQ) 带有散热焊盘的.dra文件。可以直接用到pcb
《Rinnai林内睿擎系列壁挂炉RBS-25RQ85A-CY使用及安装详解》 Rinnai林内,作为知名的燃气设备制造商,其睿擎系列壁挂炉以其高效节能、安全可靠而备受赞誉。本文将深入解析RBS-25RQ85A-CY型号的燃气采暖热水炉的使用...
对于想要自行维修或研究RQ-SX77V的用户来说,这份资料是非常宝贵的资源。它能帮助他们理解设备的工作机制,排查并解决可能出现的问题。同时,对于专业维修人员,它提高了工作效率,减少了因不熟悉设备而导致的误操作...