The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages.
This pattern is implemented using the IdempotentConsumer class. This uses an Expression to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the IdempotentRepository to see if it has been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository.
Idempotent Conumser用来处理重复的Message。它使用IdempotentConsumer进行Message的重复检测。检测原理很简单:每一个Message都有一个唯一的ID,在Message被处理之前,先在IdempotentRepository 里搜索ID,看看是否已经处理了该条Message,如果没有的,处理该条Message,然后将ID存到IdempotentRepository 中。
The Idempotent Consumer essentially acts like a Message Filter to filter out duplicates.
Camel will add the message id eagerly to the repository to detect duplication also for Exchanges currently in progress.
On completion Camel will remove the message id from the repository if the Exchange failed, otherwise it stays there.
Camel 提供的几个已有的IdempotentRepository:
看看几个主要的属性:
Option
Default
Description
eager |
true |
Camel 2.0: Eager controls whether Camel adds the message to the repository before or after the exchange has been processed. If enabled before then Camel will be able to detect duplicate messages even when messages are currently in progress. By disabling Camel will only detect duplicates when a message has successfully been processed.
这个主要用来配置ID重复检测与Message处理的关系,如果启用,在消息在处理的时候,就可以检测ID是否重复,否则就等到消息处理完毕再检测。
|
messageIdRepositoryRef |
null |
A reference to a IdempotentRepository to lookup in the registry. This option is mandatory when using XML DSL.
存放ID和进行ID重复检测的IdempotentRepository
|
skipDuplicate |
true |
Camel 2.8: Sets whether to skip duplicate messages. If set to false then the message will be continued. However the Exchange has been marked as a duplicate by having the Exchange.DUPLICATE_MESSAG exchange property set to a Boolean.TRUE value.
设置是否处理重复的消息,如果设为false,消息会继续执行,但是Exchange.DUPLICATE_MESSAG属性将被设为TRUE
|
removeOnFailure |
true |
Camel 2.9: Sets whether to remove the id of an Exchange that failed.
是否删除执行失败的消息ID
|
注意:消息的ID是由一个表达式提供的,比如Header("someHeader")
使用MemoryIdempotentRepository:
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("seda:a")
.idempotentConsumer(header("myMessageId"),
MemoryIdempotentRepository.memoryIdempotentRepository(200))
.to("seda:b");
}
};
这里使用myMessageId作为消息的ID
使用jpaMessageIdRepository:
from("direct:start").idempotentConsumer(
header("messageId"),
jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME)
).to("mock:result");
这里使用myMessageId作为消息的ID,而PROCESSOR_NAME则作为消息ID存储的集合的标识(内部也是一个key-value结构?可能考虑到要支持多个Repository公用)。
下面看一个如何处理重复消息的示例:
from("direct:start")
// instruct idempotent consumer to not skip duplicates as we will filter then our self
.idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
.filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
// filter out duplicate messages by sending them to someplace else and then stop
.to("mock:duplicate")
.stop()
.end()
// and here we process only new messages (no duplicates)
.to("mock:result");
在处理完以后,对于Exchange.DUPLICATE_MESSAGE为ture的消息,转发到mock:duplicate。
分享到:
相关推荐
《Camel in Action》是一本由 Claus Ibsen 和 Jonathan Anstey 共同撰写的书籍,这本书是学习 Apache Camel 的重要参考资料之一。书中涵盖了以下内容: - **基础知识介绍:** 第一部分“First Steps”通过两个章节...
3. **JDBC**: JDBC(Java Database Connectivity)组件允许 Camel 直接与数据库进行交互,执行 SQL 查询、存储过程和其他数据库操作。这为数据集成提供了便利,使得 Camel 可以整合不同来源的数据。 4. **Jetty**: ...
Apache Camel 的强大之处在于其灵活的集成能力。JDBC组件可以与其他组件(如定时任务、消息队列等)配合使用,形成更复杂的业务流程。 通过学习和实践"Apache Camel-JDBC",开发者可以更好地利用Camel的灵活性和...
- **模块化**:Camel 允许你按需选择和组合组件,降低复杂性。 - **高可测试性**:由于其声明式的路由定义,单元测试和模拟变得更加简单。 - **社区支持**:拥有广泛的社区支持和丰富的文档资源。 5. **实战应用...
- 路由是 Camel 的核心功能之一,它允许开发者定义消息应该如何在不同的端点之间流动。这包括简单的单向路由、复杂的分发路由等。 **3. 数据转换** - Camel 提供了丰富的工具来转换消息的数据格式。无论是简单的...
apache骆驼23核心组件下载,bean browse calss contreol bus date format 等等等等23核心组件,谷歌翻译后手动修改常用语法,耗时一个月成果,markdown文件
基于 Eclipse MQTT Paho 的 MQTT Apache Camel 组件 以下项目为基于 Eclipse Foundation 的 MQTT Paho 项目的 Apache Camel 组件提供源代码(请参阅 )。 该组件是作为研究工作的一部分开发的,因为原始可用的 MQTT ...
Camel拥有丰富的组件库,每个组件都封装了特定的集成技术。例如,HTTP组件用于处理HTTP请求,FTP组件则用于与FTP服务器交互。通过这些组件,开发者无需深入了解底层技术细节,即可轻松实现集成。 6. **Camel EIPs*...
### Apache Camel 实战知识点概述 #### 一、Apache Camel 简介 - **定义**:Apache Camel 是一个强大的开源框架...书中不仅提供了丰富的理论知识,还有大量的实践案例和代码示例,是学习Camel不可或缺的经典教材之一。
用法(尚未工作) 您可以像大多数其他 Camel 组件一样通过 URI 参数配置此组件。 下面列出了可能的参数: String host = "localhost";Integer port = 5432;String database;String channel;String user = "postgres...
标题中的“该组件现在在Camel2.19.0中可用”指的是Apache Camel项目中的一个特定组件在版本2.19.0中已经可以使用。Apache Camel 是一个流行的开源集成框架,它允许开发者通过一系列预定义的组件(如HTTP、FTP、MQTT...
Apache Camel 框架之 JMS 路由 Apache Camel 框架实现的 JMS 路由是指使用 Apache Camel 框架来实现 Java 消息服务(JMS)的路由。JMS 是一个 Java API,用于在 Java 应用程序之间进行消息传递。Apache Camel 框架...
这些模式是解决企业系统集成中常见问题的最佳实践,而Apache Camel则将这些模式实现为可重用的组件,使得开发者可以通过简单的DSL(Domain Specific Language)来应用这些模式。 1. **DSL与路由定义**:Apache ...
Camel MyBatis组件将Camel与MyBatis集成,使得在Camel路由中可以直接执行数据库操作。示例可能包含如何定义数据源、编写XML映射文件、调用SQL查询或更新,并将结果转换为Camel可处理的数据格式。 3. **Camel ...
Timer 组件是 Apache Camel 中的一个重要部分,它提供了定时触发事件的功能,类似于 Java 中的 javax.swing.Timer 或者 Quartz Scheduler。 在 Apache Camel 中,Timer 组件主要用于创建周期性的事件源,这些事件...
Camel 的基本原则之一是不会假设任何你需要处理的数据,这是很重要的一点,因为它给你们开发者一个集成任何系统的一个机会,不需要转换你的数据为另外的一种公认格式。 1.2 Camel 的主要特点 Camel 提供了高水平的...
它允许开发者使用一系列预定义的组件(如 camel-dropbox)来轻松连接不同的系统、服务和协议。在本篇文章中,我们将深入探讨“camel-dropbox”组件,它使得与 Dropbox 的集成变得简单高效。 camel-dropbox 是 ...
- 组件附录:详细介绍各种Camel组件的功能和使用方法。 - 索引:方便用户查找文档中的关键词和短语。 如果读者需要更详细的信息,建议访问StackOverflow参与关于Camel的讨论,阅读相关帖子和评论,并浏览推荐的链接...
最后,我们来看 `mybatis` 组件,它结合了流行的 MyBatis ORM 框架,使得 Camel 能够无缝地集成到基于 MyBatis 的数据访问层。源码分析将揭示 Camel 是如何与 MyBatis 的映射器接口协作,以及如何处理 SQL 查询和...
Camel提供了丰富的组件来处理各种数据源,例如File组件用于读取本地文件,而JMS组件则用于连接到ActiveMQ并发布消息。开发者可以使用简单的DSL(Domain Specific Language)或者XML配置来定义路由规则。 在这个...