前面,我们已经把Broker存储最重要的一个类具体分析了一遍,接下来,我们分析一下其删除的策略。前面介绍过Messagestore采用的多文件存储的组织方式,而存储空间不可能无限大,得有一定的删除策略对其进行删除以腾出空间给新的消息。
MetaQ允许自定义删除策略,需要实现接口DeletePolicy,默认提供了两种删除策略:过期删除(DiscardDeletePolicy)和过期打包删除(ArchiveDeletePolicy)。DiscardDeletePolicy和ArchiveDeletePolicy都比较简单,DiscardDeletePolicy主要是对于超过一定时期的文件进行删除,ArchiveDeletePolicy则是先打包备份再删除。
自定义策略是如何被识别和使用的呢,MetaQ定义了DeletePolicyFactory,所有删除策略的实例都由DeletePolicyFactory提供,DeletePolicyFactory对外提供了注册机制,利用反射机制生成实例,每个自定义的删除策略都必须有一个无参构造,DeletePolicyFactory生成实例代码如下:
public static DeletePolicy getDeletePolicy(String values) { String[] tmps = values.split(","); String name = tmps[0]; Class<? extends DeletePolicy> clazz = policyMap.get(name); if (clazz == null) { throw new UnknownDeletePolicyException(name); } try { //直接调用class的newInstance()方法,该方法必须要求有一个无参构造 DeletePolicy deletePolicy = clazz.newInstance(); String[] initValues = null; if (tmps.length >= 2) { initValues = new String[tmps.length - 1]; System.arraycopy(tmps, 1, initValues, 0, tmps.length - 1); } deletePolicy.init(initValues); return deletePolicy; } catch (Exception e) { throw new MetamorphosisServerStartupException("New delete policy `" + name + "` failed", e); } }
DeletePolicy和MessageStore如何结合在一起的呢?则是粘合剂MessageStoreManager,MessageStoreManager是存储模块的管家,负责与其他模块联系,也是MessageStore管理器,管理所有的MessageStore以及其删除策略,MessageStoreManager也是要好好分析的一个类。
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* partition */, MessageStore>> stores = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>(); //前面的存储组织方式介绍过一个主题对应多一个分区,每个分区对应一个MessageStore实例,分区号使用数值来表示,stores就是按照该方式组织管理的 private final MetaConfig metaConfig; //参数配置 private ScheduledThreadPoolExecutor scheduledExecutorService;// = // Executors.newScheduledThreadPool(2); //调度服务,对不同的MessageStore实例flush,将数据提到到硬盘 private final DeletePolicy deletePolicy; //删除策略选择器,这里采用的一个topic对应一种策略,而不是一个MessageStore对应一个策略实例,一个策略实例在同一个topic的不同MessageStore实例间是重用的 private DeletePolicySelector deletePolicySelector; public static final int HALF_DAY = 1000 * 60 * 60 * 12; //topic 集合 private final Set<Pattern> topicsPatSet = new HashSet<Pattern>(); private final ConcurrentHashMap<Integer, ScheduledFuture<?>> unflushIntervalMap = new ConcurrentHashMap<Integer, ScheduledFuture<?>>(); //前面曾介绍过MessageStore的提交方式有两种:组提交和定时提交,unflushIntervalMap是存放 //定时提交的任务 private Scheduler scheduler; //定时调度器,用于定时调度删除任务 public MessageStoreManager(final MetaConfig metaConfig, final DeletePolicy deletePolicy) { this.metaConfig = metaConfig; this.deletePolicy = deletePolicy; //生成策略选择器 this.newDeletePolicySelector(); //添加匿名监听器,监听topic列表变化,如果列表发生变化,则新增列表并重新生成选择器 this.metaConfig.addPropertyChangeListener("topics", new PropertyChangeListener() { public void propertyChange(final PropertyChangeEvent evt) { MessageStoreManager.this.makeTopicsPatSet(); MessageStoreManager.this.newDeletePolicySelector(); } }); //添加匿名监听,监听unflushInternal变化,如果发生变化 this.metaConfig.addPropertyChangeListener("unflushInterval", new PropertyChangeListener() { public void propertyChange(final PropertyChangeEvent evt) { MessageStoreManager.this.scheduleFlushTask(); } }); this.makeTopicsPatSet(); //初始化调度 this.initScheduler(); // 定时flush,该方法作者有详细注释就不在解释了 this.scheduleFlushTask(); }
MessageStoreManager实现接口Service,在启动是会调用init方法,关闭时调用dispose方法
public void init() { // 加载已有数据并校验 try { this.loadMessageStores(this.metaConfig); } catch (final IOException e) { log.error("load message stores failed", e); throw new MetamorphosisServerStartupException("Initilize message store manager failed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } this.startScheduleDeleteJobs(); } // private Set<File> getDataDirSet(final MetaConfig metaConfig) throws IOException { final Set<String> paths = new HashSet<String>(); // public data path //公共数据目录 paths.add(metaConfig.getDataPath()); // topic data path //私有数据目录 for (final String topic : metaConfig.getTopics()) { final TopicConfig topicConfig = metaConfig.getTopicConfig(topic); if (topicConfig != null) { paths.add(topicConfig.getDataPath()); } } final Set<File> fileSet = new HashSet<File>(); for (final String path : paths) { //验证数据目录是否存在 fileSet.add(this.getDataDir(path)); } return fileSet; } private void loadMessageStores(final MetaConfig metaConfig) throws IOException, InterruptedException { //加载数据目录列表,再加载每个目录下的数据 for (final File dir : this.getDataDirSet(metaConfig)) { this.loadDataDir(metaConfig, dir); } } private void loadDataDir(final MetaConfig metaConfig, final File dir) throws IOException, InterruptedException { log.warn("Begin to scan data path:" + dir.getAbsolutePath()); final long start = System.currentTimeMillis(); final File[] ls = dir.listFiles(); int nThreads = Runtime.getRuntime().availableProcessors() + 1; ExecutorService executor = Executors.newFixedThreadPool(nThreads); int count = 0; //将加载验证每个分区的数据包装成一个个任务 List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>(); for (final File subDir : ls) { if (!subDir.isDirectory()) { log.warn("Ignore not directory path:" + subDir.getAbsolutePath()); } else { final String name = subDir.getName(); final int index = name.lastIndexOf('-'); if (index < 0) { log.warn("Ignore invlaid directory:" + subDir.getAbsolutePath()); continue; } //包装任务 tasks.add(new Callable<MessageStore>() { //回调方法,方法将具体的加载验证分区数据 @Override public MessageStore call() throws Exception { log.warn("Loading data directory:" + subDir.getAbsolutePath() + "..."); final String topic = name.substring(0, index); final int partition = Integer.parseInt(name.substring(index + 1)); //构造MessageStore实例的时候会自动加载验证数据,在初始化MessageStore实例的时候会给该MessageStore实例选择该topic的删除策略 final MessageStore messageStore = new MessageStore(topic, partition, metaConfig, MessageStoreManager.this.deletePolicySelector.select(topic, MessageStoreManager.this.deletePolicy)); return messageStore; } }); count++; if (count % nThreads == 0 || count == ls.length) { //如果配置了并行加载,则使用并行加载 if (metaConfig.isLoadMessageStoresInParallel()) { this.loadStoresInParallel(executor, tasks); } else { //串行加载验证数据 this.loadStores(tasks); } } } } executor.shutdownNow(); log.warn("End to scan data path in " + (System.currentTimeMillis() - start) / 1000 + " secs"); }
在init方法中做的一件事情就是加载校验已有的数据,加载校验的方式有两种个,串行和并行。
//串行加载验证数据,则在主线程上完成验证加载工作,其缺点是较慢,好处是不会打乱日志顺序 private void loadStores(List<Callable<MessageStore>> tasks) throws IOException, InterruptedException { for (Callable<MessageStore> task : tasks) { MessageStore messageStore; try { messageStore = task.call(); ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); if (map == null) { map = new ConcurrentHashMap<Integer, MessageStore>(); this.stores.put(messageStore.getTopic(), map); } map.put(messageStore.getPartition(), messageStore); } catch (IOException e) { throw e; } catch (InterruptedException e) { throw e; } catch (Exception e) { throw new IllegalStateException(e); } } tasks.clear(); } //并行加载数据,当数据过多的时候,启动并行加载数据可以加快启动速度;但是会打乱启动的日志顺序,默认不启用。 private void loadStoresInParallel(ExecutorService executor, List<Callable<MessageStore>> tasks) throws InterruptedException { CompletionService<MessageStore> completionService = new ExecutorCompletionService<MessageStore>(executor); for (Callable<MessageStore> task : tasks) { completionService.submit(task); } for (int i = 0; i < tasks.size(); i++) { try { //确保任务都已经运行完毕 MessageStore messageStore = completionService.take().get(); ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); if (map == null) { map = new ConcurrentHashMap<Integer, MessageStore>(); this.stores.put(messageStore.getTopic(), map); } map.put(messageStore.getPartition(), messageStore); } catch (ExecutionException e) { throw ThreadUtils.launderThrowable(e); } } tasks.clear(); }
MessageStoreManager关闭时调用dispose方法,确保资源都正确释放。
public void dispose() { //关闭调度器和调度池 this.scheduledExecutorService.shutdown(); if (this.scheduler != null) { try { this.scheduler.shutdown(true); } catch (final SchedulerException e) { log.error("Shutdown quartz scheduler failed", e); } } //确保每一个 MessageStore实例都正确关闭 for (final ConcurrentHashMap<Integer/* partition */, MessageStore> subMap : MessageStoreManager.this.stores .values()) { if (subMap != null) { for (final MessageStore msgStore : subMap.values()) { if (msgStore != null) { try { msgStore.close(); } catch (final Throwable e) { log.error("Try to run close " + msgStore.getTopic() + "," + msgStore.getPartition() + " failed", e); } } } } } //清空stores列表 this.stores.clear(); }
MessageStoreManager对外提供了获取的MessageStore的方法getMessageStore(final String topic, final int partition)和getOrCreateMessageStore(final String topic, final int partition) throws IOException。
getMessageStore()从stores列表查找对应的MessageStore,如果不存在则返回空;而getOrCreateMessage()则先检查对应的topic是否曾经配置,如果没有则抛出异常,如果有则判断stores是否已有MessageStore实例,如果没有,则生成MessageStore实例放入到stores列表并返回,如果有,则直接返回。
public MessageStore getMessageStore(final String topic, final int partition) { final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); if (map == null) { //如果topic对应的MessageStore实例列表不存在,则直接返回null return null; } return map.get(partition); } Collection<MessageStore> getMessageStoresByTopic(final String topic) { final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); if (map == null) { return Collections.emptyList(); } return map.values(); } public MessageStore getOrCreateMessageStore(final String topic, final int partition) throws IOException { return this.getOrCreateMessageStoreInner(topic, partition, 0); } public MessageStore getOrCreateMessageStore(final String topic, final int partition, final long offsetIfCreate) throws IOException { return this.getOrCreateMessageStoreInner(topic, partition, offsetIfCreate); } private MessageStore getOrCreateMessageStoreInner(final String topic, final int partition, final long offsetIfCreate) throws IOException { //判断topic是否可用,即是否在topicsPatSet列表中 if (!this.isLegalTopic(topic)) { throw new IllegalTopicException("The server do not accept topic " + topic); } //判断分区号是否正确 if (partition < 0 || partition >= this.getNumPartitions(topic)) { log.warn("Wrong partition " + partition + ",valid partitions (0," + (this.getNumPartitions(topic) - 1) + ")"); throw new WrongPartitionException("wrong partition " + partition); } ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); //如果topic对应的列表不存在,则生成列表,放进stores中 if (map == null) { map = new ConcurrentHashMap<Integer, MessageStore>(); final ConcurrentHashMap<Integer/* partition */, MessageStore> oldMap = this.stores.putIfAbsent(topic, map); if (oldMap != null) { map = oldMap; } } //判断列表中是否有存在分区号位partition为的MessageStore实例,如果有,直接返回;如果没有,则生成实例并放进列表中 MessageStore messageStore = map.get(partition); if (messageStore != null) { return messageStore; } else { // 对string加锁,特例 synchronized (topic.intern()) { messageStore = map.get(partition); // double check if (messageStore != null) { return messageStore; } messageStore = new MessageStore(topic, partition, this.metaConfig, this.deletePolicySelector.select(topic, this.deletePolicy), offsetIfCreate); log.info("Created a new message storage for topic=" + topic + ",partition=" + partition); map.put(partition, messageStore); } } return messageStore; } boolean isLegalTopic(final String topic) { for (final Pattern pat : this.topicsPatSet) { if (pat.matcher(topic).matches()) { return true; } } return false; }
通过MessageStoreManager,我们把MessageStore和删除策略很好的组织在一起,并在MessageStoreManager提供定时提交的功能,提升了数据的可靠性;通过MessageStoreManager也为其他模块访问存储模块提供了接口。
我觉得MessageStoreManager设计不好的地方在于topicsPatSet,在topic列表发生变化的时候,没有先清空topicsPatSet,而是直接添加,而且没有对topic对应的MessageStore实例进行重新初始化,如果MessageStore实例已经存在,新删除策略配置不能生效。个人建议是一旦topic列表发生变化的时候,重新初始化整个存储模块,保证一致性。
至此, Broker的消息存储模块基本分析完毕。下一篇,进入Broker网络相关以及消息处理流程分析。
相关推荐
内容概要:本文详细介绍了如何利用Matlab构建、优化和应用决策分类树。首先,讲解了数据准备阶段,将数据与程序分离,确保灵活性。接着,通过具体实例展示了如何使用Matlab内置函数如fitctree快速构建决策树模型,并通过可视化工具直观呈现决策树结构。针对可能出现的过拟合问题,提出了基于成本复杂度的剪枝方法,以提高模型的泛化能力。此外,还分享了一些实用技巧,如处理连续特征、保存模型、并行计算等,帮助用户更好地理解和应用决策树。 适合人群:具有一定编程基础的数据分析师、机器学习爱好者及科研工作者。 使用场景及目标:适用于需要进行数据分类任务的场景,特别是当需要解释性强的模型时。主要目标是教会读者如何在Matlab环境中高效地构建和优化决策分类树,从而应用于实际项目中。 其他说明:文中不仅提供了完整的代码示例,还强调了代码模块化的重要性,便于后续维护和扩展。同时,对于初学者来说,建议从简单的鸢尾花数据集开始练习,逐步掌握决策树的各项技能。
《营销调研》第7章-探索性调研数据采集.pptx
Assignment1_search_final(1).ipynb
美团优惠券小程序带举牌小人带菜谱+流量主模式,挺多外卖小程序的,但是都没有搭建教程 搭建: 1、下载源码,去微信公众平台注册自己的账号 2、解压到桌面 3、打开微信开发者工具添加小程序-把解压的源码添加进去-appid改成自己小程序的 4、在pages/index/index.js文件搜流量主广告改成自己的广告ID 5、到微信公众平台登陆自己的小程序-开发管理-开发设置-服务器域名修改成
《计算机录入技术》第十八章-常用外文输入法.pptx
基于Andorid的跨屏拖动应用设计实现源码,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。
《网站建设与维护》项目4-在线购物商城用户管理功能.pptx
区块链_房屋转租系统_去中心化存储_数据防篡改_智能合约_S_1744435730
《计算机应用基础实训指导》实训五-Word-2010的文字编辑操作.pptx
《移动通信(第4版)》第5章-组网技术.ppt
ABB机器人基础.pdf
《综合布线施工技术》第9章-综合布线实训指导.ppt
很不错的一套站群系统源码,后台配置采集节点,输入目标站地址即可全自动智能转换自动全站采集!支持 https、支持 POST 获取、支持搜索、支持 cookie、支持代理、支持破解防盗链、支持破解防采集 全自动分析,内外链接自动转换、图片地址、css、js,自动分析 CSS 内的图片使得页面风格不丢失: 广告标签,方便在规则里直接替换广告代码 支持自定义标签,标签可自定义内容、自由截取、内容正则截取。可以放在模板里,也可以在规则里替换 支持自定义模板,可使用标签 diy 个性模板,真正做到内容上移花接木 调试模式,可观察采集性能,便于发现和解决各种错误 多条采集规则一键切换,支持导入导出 内置强大替换和过滤功能,标签过滤、站内外过滤、字符串替换、等等 IP 屏蔽功能,屏蔽想要屏蔽 IP 地址让它无法访问 ****高级功能*****· url 过滤功能,可过滤屏蔽不采集指定链接· 伪原创,近义词替换有利于 seo· 伪静态,url 伪静态化,有利于 seo· 自动缓存自动更新,可设置缓存时间达到自动更新,css 缓存· 支持演示有阿三源码简繁体互转· 代理 IP、伪造 IP、随机 IP、伪造 user-agent、伪造 referer 来路、自定义 cookie,以便应对防采集措施· url 地址加密转换,个性化 url,让你的 url 地址与众不同· 关键词内链功能· 还有更多功能等你发现…… 程序使用非常简单,仅需在后台输入一个域名即可建站,不限子域名,站群利器,无授权,无绑定限制,使用后台功能可对页面进行自定义修改,在程序后台开启生 成功能,只要访问页面就会生成一个本地文件。当用户再次访问的时候就直接访问网站本地的页面,所以目标站点无法访问了也没关系,我们的站点依然可以访问, 支持伪静态、伪原创、生成静态文件、自定义替换、广告管理、友情链接管理、自动下载 CSS 内的图。
【自然语言处理】文本分类方法综述:从基础模型到深度学习的情感分析系统设计
基于Andorid的下拉浏览应用设计实现源码,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。
内容概要:本文详细介绍了一个原创的P2插电式混合动力系统Simulink模型,该模型基于逻辑门限值控制策略,涵盖了多个关键模块如工况输入、驾驶员模型、发动机模型、电机模型、制动能量回收模型、转矩分配模型、运行模式切换模型、档位切换模型以及纵向动力学模型。模型支持多种标准工况(WLTC、UDDS、EUDC、NEDC)和自定义工况,并展示了丰富的仿真结果,包括发动机和电机转矩变化、工作模式切换、档位变化、电池SOC变化、燃油消耗量、速度跟随和最大爬坡度等。此外,文章还深入探讨了逻辑门限值控制策略的具体实现及其效果,提供了详细的代码示例和技术细节。 适合人群:汽车工程专业学生、研究人员、混动汽车开发者及爱好者。 使用场景及目标:①用于教学和科研,帮助理解和掌握P2混动系统的原理和控制策略;②作为开发工具,辅助设计和优化混动汽车控制系统;③提供仿真平台,评估不同工况下的混动系统性能。 其他说明:文中不仅介绍了模型的整体架构和各模块的功能,还分享了许多实用的调试技巧和优化方法,使读者能够更好地理解和应用该模型。
内容概要:本文详细介绍了基于ADMM(交替方向乘子法)算法在电力系统分布式调度中的应用,特别是并行(Jacobi)和串行(Gauss-Seidel)两种不同更新模式的实现。文中通过MATLAB代码展示了这两种模式的具体实现方法,并比较了它们的优劣。并行模式适用于多核计算环境,能够充分利用硬件资源,尽管迭代次数较多,但总体计算时间较短;串行模式则由于“接力式”更新机制,通常收敛更快,但在计算资源有限的情况下可能会形成瓶颈。此外,文章还讨论了惩罚系数rho的自适应调整策略以及在电-气耦合系统优化中的应用实例。 适合人群:从事电力系统优化、分布式计算研究的专业人士,尤其是有一定MATLAB编程基础的研究人员和技术人员。 使用场景及目标:①理解和实现ADMM算法在电力系统分布式调度中的应用;②评估并行和串行模式在不同应用场景下的性能表现;③掌握惩罚系数rho的自适应调整技巧,提高算法收敛速度和稳定性。 其他说明:文章提供了详细的MATLAB代码示例,帮助读者更好地理解和实践ADMM算法。同时,强调了在实际工程应用中需要注意的关键技术和优化策略。
内容概要:本文深入研究了交错并联Buck变换器的工作原理、性能优势及其具体实现。文章首先介绍了交错并联Buck变换器相较于传统Buck变换器的优势,包括减小输出电流和电压纹波、降低开关管和二极管的电流应力、减小输出滤波电容容量等。接着,文章详细展示了如何通过MATLAB/Simulink建立该变换器的仿真模型,包括参数设置、电路元件添加、PWM信号生成及连接、电压电流测量模块的添加等。此外,还探讨了PID控制器的设计与实现,通过理论分析和仿真验证了其有效性。最后,文章通过多个仿真实验验证了交错并联Buck变换器在纹波性能、器件应力等方面的优势,并分析了不同控制策略的效果,如P、PI、PID控制等。 适合人群:具备一定电力电子基础,对DC-DC变换器特别是交错并联Buck变换器感兴趣的工程师和技术人员。 使用场景及目标:①理解交错并联Buck变换器的工作原理及其相对于传统Buck变换器的优势;②掌握使用MATLAB/Simulink搭建交错并联Buck变换器仿真模型的方法;③学习PID控制器的设计与实现,了解其在电源系统中的应用;④通过仿真实验验证交错并联Buck变换器的性能,评估不同控制策略的效果。 其他说明:本文不仅提供了详细的理论分析,还给出了大量可运行的MATLAB代码,帮助读者更好地理解和实践交错并联Buck变换器的设计与实现。同时,通过对不同控制策略的对比分析,为实际工程应用提供了有价值的参考。
《综合布线施工技术》第8章-综合布线工程案例.ppt
内容概要:本文详细介绍了基于STM32F103C8T6的K型热电偶温度控制仪的设计与实现。硬件部分涵盖了热电偶采集电路、OLED显示模块、蜂鸣器电路、风扇控制电路以及EEPROM存储模块。软件部分则涉及ADC配置、OLED刷新、PID控温算法、EEPROM参数存储、风扇PWM控制等多个方面的具体实现。文中不仅提供了详细的代码示例,还分享了许多调试经验和注意事项,如冷端补偿、DMA传输优化、I2C时钟配置、PWM频率选择等。 适合人群:具有一定嵌入式系统开发经验的工程师和技术爱好者。 使用场景及目标:适用于需要进行温度监测与控制的应用场景,如工业自动化、实验室设备等。目标是帮助读者掌握STM32F103C8T6在温度控制领域的应用技巧,提升硬件设计和软件编程能力。 其他说明:本文提供的工程文件包含Altium Designer的原理图PCB文件,便于二次开发。此外,文中还提到了一些扩展功能,如加入Modbus通信协议,供有兴趣的读者进一步探索。