前面,我们已经把Broker存储最重要的一个类具体分析了一遍,接下来,我们分析一下其删除的策略。前面介绍过Messagestore采用的多文件存储的组织方式,而存储空间不可能无限大,得有一定的删除策略对其进行删除以腾出空间给新的消息。
MetaQ允许自定义删除策略,需要实现接口DeletePolicy,默认提供了两种删除策略:过期删除(DiscardDeletePolicy)和过期打包删除(ArchiveDeletePolicy)。DiscardDeletePolicy和ArchiveDeletePolicy都比较简单,DiscardDeletePolicy主要是对于超过一定时期的文件进行删除,ArchiveDeletePolicy则是先打包备份再删除。
自定义策略是如何被识别和使用的呢,MetaQ定义了DeletePolicyFactory,所有删除策略的实例都由DeletePolicyFactory提供,DeletePolicyFactory对外提供了注册机制,利用反射机制生成实例,每个自定义的删除策略都必须有一个无参构造,DeletePolicyFactory生成实例代码如下:
public static DeletePolicy getDeletePolicy(String values) { String[] tmps = values.split(","); String name = tmps[0]; Class<? extends DeletePolicy> clazz = policyMap.get(name); if (clazz == null) { throw new UnknownDeletePolicyException(name); } try { //直接调用class的newInstance()方法,该方法必须要求有一个无参构造 DeletePolicy deletePolicy = clazz.newInstance(); String[] initValues = null; if (tmps.length >= 2) { initValues = new String[tmps.length - 1]; System.arraycopy(tmps, 1, initValues, 0, tmps.length - 1); } deletePolicy.init(initValues); return deletePolicy; } catch (Exception e) { throw new MetamorphosisServerStartupException("New delete policy `" + name + "` failed", e); } }
DeletePolicy和MessageStore如何结合在一起的呢?则是粘合剂MessageStoreManager,MessageStoreManager是存储模块的管家,负责与其他模块联系,也是MessageStore管理器,管理所有的MessageStore以及其删除策略,MessageStoreManager也是要好好分析的一个类。
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* partition */, MessageStore>> stores = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>(); //前面的存储组织方式介绍过一个主题对应多一个分区,每个分区对应一个MessageStore实例,分区号使用数值来表示,stores就是按照该方式组织管理的 private final MetaConfig metaConfig; //参数配置 private ScheduledThreadPoolExecutor scheduledExecutorService;// = // Executors.newScheduledThreadPool(2); //调度服务,对不同的MessageStore实例flush,将数据提到到硬盘 private final DeletePolicy deletePolicy; //删除策略选择器,这里采用的一个topic对应一种策略,而不是一个MessageStore对应一个策略实例,一个策略实例在同一个topic的不同MessageStore实例间是重用的 private DeletePolicySelector deletePolicySelector; public static final int HALF_DAY = 1000 * 60 * 60 * 12; //topic 集合 private final Set<Pattern> topicsPatSet = new HashSet<Pattern>(); private final ConcurrentHashMap<Integer, ScheduledFuture<?>> unflushIntervalMap = new ConcurrentHashMap<Integer, ScheduledFuture<?>>(); //前面曾介绍过MessageStore的提交方式有两种:组提交和定时提交,unflushIntervalMap是存放 //定时提交的任务 private Scheduler scheduler; //定时调度器,用于定时调度删除任务 public MessageStoreManager(final MetaConfig metaConfig, final DeletePolicy deletePolicy) { this.metaConfig = metaConfig; this.deletePolicy = deletePolicy; //生成策略选择器 this.newDeletePolicySelector(); //添加匿名监听器,监听topic列表变化,如果列表发生变化,则新增列表并重新生成选择器 this.metaConfig.addPropertyChangeListener("topics", new PropertyChangeListener() { public void propertyChange(final PropertyChangeEvent evt) { MessageStoreManager.this.makeTopicsPatSet(); MessageStoreManager.this.newDeletePolicySelector(); } }); //添加匿名监听,监听unflushInternal变化,如果发生变化 this.metaConfig.addPropertyChangeListener("unflushInterval", new PropertyChangeListener() { public void propertyChange(final PropertyChangeEvent evt) { MessageStoreManager.this.scheduleFlushTask(); } }); this.makeTopicsPatSet(); //初始化调度 this.initScheduler(); // 定时flush,该方法作者有详细注释就不在解释了 this.scheduleFlushTask(); }
MessageStoreManager实现接口Service,在启动是会调用init方法,关闭时调用dispose方法
public void init() { // 加载已有数据并校验 try { this.loadMessageStores(this.metaConfig); } catch (final IOException e) { log.error("load message stores failed", e); throw new MetamorphosisServerStartupException("Initilize message store manager failed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } this.startScheduleDeleteJobs(); } // private Set<File> getDataDirSet(final MetaConfig metaConfig) throws IOException { final Set<String> paths = new HashSet<String>(); // public data path //公共数据目录 paths.add(metaConfig.getDataPath()); // topic data path //私有数据目录 for (final String topic : metaConfig.getTopics()) { final TopicConfig topicConfig = metaConfig.getTopicConfig(topic); if (topicConfig != null) { paths.add(topicConfig.getDataPath()); } } final Set<File> fileSet = new HashSet<File>(); for (final String path : paths) { //验证数据目录是否存在 fileSet.add(this.getDataDir(path)); } return fileSet; } private void loadMessageStores(final MetaConfig metaConfig) throws IOException, InterruptedException { //加载数据目录列表,再加载每个目录下的数据 for (final File dir : this.getDataDirSet(metaConfig)) { this.loadDataDir(metaConfig, dir); } } private void loadDataDir(final MetaConfig metaConfig, final File dir) throws IOException, InterruptedException { log.warn("Begin to scan data path:" + dir.getAbsolutePath()); final long start = System.currentTimeMillis(); final File[] ls = dir.listFiles(); int nThreads = Runtime.getRuntime().availableProcessors() + 1; ExecutorService executor = Executors.newFixedThreadPool(nThreads); int count = 0; //将加载验证每个分区的数据包装成一个个任务 List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>(); for (final File subDir : ls) { if (!subDir.isDirectory()) { log.warn("Ignore not directory path:" + subDir.getAbsolutePath()); } else { final String name = subDir.getName(); final int index = name.lastIndexOf('-'); if (index < 0) { log.warn("Ignore invlaid directory:" + subDir.getAbsolutePath()); continue; } //包装任务 tasks.add(new Callable<MessageStore>() { //回调方法,方法将具体的加载验证分区数据 @Override public MessageStore call() throws Exception { log.warn("Loading data directory:" + subDir.getAbsolutePath() + "..."); final String topic = name.substring(0, index); final int partition = Integer.parseInt(name.substring(index + 1)); //构造MessageStore实例的时候会自动加载验证数据,在初始化MessageStore实例的时候会给该MessageStore实例选择该topic的删除策略 final MessageStore messageStore = new MessageStore(topic, partition, metaConfig, MessageStoreManager.this.deletePolicySelector.select(topic, MessageStoreManager.this.deletePolicy)); return messageStore; } }); count++; if (count % nThreads == 0 || count == ls.length) { //如果配置了并行加载,则使用并行加载 if (metaConfig.isLoadMessageStoresInParallel()) { this.loadStoresInParallel(executor, tasks); } else { //串行加载验证数据 this.loadStores(tasks); } } } } executor.shutdownNow(); log.warn("End to scan data path in " + (System.currentTimeMillis() - start) / 1000 + " secs"); }
在init方法中做的一件事情就是加载校验已有的数据,加载校验的方式有两种个,串行和并行。
//串行加载验证数据,则在主线程上完成验证加载工作,其缺点是较慢,好处是不会打乱日志顺序 private void loadStores(List<Callable<MessageStore>> tasks) throws IOException, InterruptedException { for (Callable<MessageStore> task : tasks) { MessageStore messageStore; try { messageStore = task.call(); ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); if (map == null) { map = new ConcurrentHashMap<Integer, MessageStore>(); this.stores.put(messageStore.getTopic(), map); } map.put(messageStore.getPartition(), messageStore); } catch (IOException e) { throw e; } catch (InterruptedException e) { throw e; } catch (Exception e) { throw new IllegalStateException(e); } } tasks.clear(); } //并行加载数据,当数据过多的时候,启动并行加载数据可以加快启动速度;但是会打乱启动的日志顺序,默认不启用。 private void loadStoresInParallel(ExecutorService executor, List<Callable<MessageStore>> tasks) throws InterruptedException { CompletionService<MessageStore> completionService = new ExecutorCompletionService<MessageStore>(executor); for (Callable<MessageStore> task : tasks) { completionService.submit(task); } for (int i = 0; i < tasks.size(); i++) { try { //确保任务都已经运行完毕 MessageStore messageStore = completionService.take().get(); ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); if (map == null) { map = new ConcurrentHashMap<Integer, MessageStore>(); this.stores.put(messageStore.getTopic(), map); } map.put(messageStore.getPartition(), messageStore); } catch (ExecutionException e) { throw ThreadUtils.launderThrowable(e); } } tasks.clear(); }
MessageStoreManager关闭时调用dispose方法,确保资源都正确释放。
public void dispose() { //关闭调度器和调度池 this.scheduledExecutorService.shutdown(); if (this.scheduler != null) { try { this.scheduler.shutdown(true); } catch (final SchedulerException e) { log.error("Shutdown quartz scheduler failed", e); } } //确保每一个 MessageStore实例都正确关闭 for (final ConcurrentHashMap<Integer/* partition */, MessageStore> subMap : MessageStoreManager.this.stores .values()) { if (subMap != null) { for (final MessageStore msgStore : subMap.values()) { if (msgStore != null) { try { msgStore.close(); } catch (final Throwable e) { log.error("Try to run close " + msgStore.getTopic() + "," + msgStore.getPartition() + " failed", e); } } } } } //清空stores列表 this.stores.clear(); }
MessageStoreManager对外提供了获取的MessageStore的方法getMessageStore(final String topic, final int partition)和getOrCreateMessageStore(final String topic, final int partition) throws IOException。
getMessageStore()从stores列表查找对应的MessageStore,如果不存在则返回空;而getOrCreateMessage()则先检查对应的topic是否曾经配置,如果没有则抛出异常,如果有则判断stores是否已有MessageStore实例,如果没有,则生成MessageStore实例放入到stores列表并返回,如果有,则直接返回。
public MessageStore getMessageStore(final String topic, final int partition) { final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); if (map == null) { //如果topic对应的MessageStore实例列表不存在,则直接返回null return null; } return map.get(partition); } Collection<MessageStore> getMessageStoresByTopic(final String topic) { final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); if (map == null) { return Collections.emptyList(); } return map.values(); } public MessageStore getOrCreateMessageStore(final String topic, final int partition) throws IOException { return this.getOrCreateMessageStoreInner(topic, partition, 0); } public MessageStore getOrCreateMessageStore(final String topic, final int partition, final long offsetIfCreate) throws IOException { return this.getOrCreateMessageStoreInner(topic, partition, offsetIfCreate); } private MessageStore getOrCreateMessageStoreInner(final String topic, final int partition, final long offsetIfCreate) throws IOException { //判断topic是否可用,即是否在topicsPatSet列表中 if (!this.isLegalTopic(topic)) { throw new IllegalTopicException("The server do not accept topic " + topic); } //判断分区号是否正确 if (partition < 0 || partition >= this.getNumPartitions(topic)) { log.warn("Wrong partition " + partition + ",valid partitions (0," + (this.getNumPartitions(topic) - 1) + ")"); throw new WrongPartitionException("wrong partition " + partition); } ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); //如果topic对应的列表不存在,则生成列表,放进stores中 if (map == null) { map = new ConcurrentHashMap<Integer, MessageStore>(); final ConcurrentHashMap<Integer/* partition */, MessageStore> oldMap = this.stores.putIfAbsent(topic, map); if (oldMap != null) { map = oldMap; } } //判断列表中是否有存在分区号位partition为的MessageStore实例,如果有,直接返回;如果没有,则生成实例并放进列表中 MessageStore messageStore = map.get(partition); if (messageStore != null) { return messageStore; } else { // 对string加锁,特例 synchronized (topic.intern()) { messageStore = map.get(partition); // double check if (messageStore != null) { return messageStore; } messageStore = new MessageStore(topic, partition, this.metaConfig, this.deletePolicySelector.select(topic, this.deletePolicy), offsetIfCreate); log.info("Created a new message storage for topic=" + topic + ",partition=" + partition); map.put(partition, messageStore); } } return messageStore; } boolean isLegalTopic(final String topic) { for (final Pattern pat : this.topicsPatSet) { if (pat.matcher(topic).matches()) { return true; } } return false; }
通过MessageStoreManager,我们把MessageStore和删除策略很好的组织在一起,并在MessageStoreManager提供定时提交的功能,提升了数据的可靠性;通过MessageStoreManager也为其他模块访问存储模块提供了接口。
我觉得MessageStoreManager设计不好的地方在于topicsPatSet,在topic列表发生变化的时候,没有先清空topicsPatSet,而是直接添加,而且没有对topic对应的MessageStore实例进行重新初始化,如果MessageStore实例已经存在,新删除策略配置不能生效。个人建议是一旦topic列表发生变化的时候,重新初始化整个存储模块,保证一致性。
至此, Broker的消息存储模块基本分析完毕。下一篇,进入Broker网络相关以及消息处理流程分析。
相关推荐
四、MetaQ的工作流程 1. 生产者:生产者负责创建和发送消息到MetaQ服务器。在1.4.3版本中,客户端库提供了方便的API,开发者可以轻松创建生产者实例,指定主题和分区策略,然后将序列化的对象发送出去。 2. 消息...
Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...
在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...
1. 日志收集:MetaQ可用于收集分布在各服务器上的日志,统一管理和分析,提高运维效率。 2. 数据同步:在分布式数据库或缓存系统中,MetaQ可以作为数据变更的传播通道,保证数据的一致性。 3. 异步处理:对于耗时...
MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...
本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...
最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...
Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...
RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...
MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...
阿里消息中间件MetaQ学习Demo
### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...
- **事务支持**:MetaQ支持两种类型的事务——本地事务和XA分布式事务。这两种事务类型能够满足支付宝钱包系统在处理复杂金融交易时对数据一致性的需求。 - **高可用复制**:MetaQ提供了异步复制和同步复制两种模式...
综上所述,实时数仓2.0是一种先进的数据处理框架,它通过优化数据模型、提升处理速度、确保数据质量,以及利用高级状态管理技术,来满足企业对实时数据分析的高要求。这一解决方案为企业提供了更敏捷的业务洞察,...
阿里巴巴企业诚信体系是基于大数据和先进技术构建的一套全面的安全架构,旨在从多个维度评估和管理企业信用风险。这个体系不仅涵盖了安全威胁情报、安全建设、应急响应和法律法规等多个关键领域,还利用自动化手段...
【大数据技术概述】 大数据,作为一个现代信息技术的关键领域,是指那些数据量庞大、增长迅速、...随着技术的不断进步,大数据将继续在数据分析、人工智能和机器学习等领域发挥关键作用,为企业和社会创造更大的价值。