Using Spring for Apache Kafka
Sending Messages
KafkaTemplate
The KafkaTemplate
wraps a producer and provides convenience methods to send data to kafka topics. Both asynchronous and synchronous methods are provided, with the async methods returning a Future
.
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);
ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
The first 3 methods require that a default topic has been provided to the template.
The metrics
and partitionsFor
methods simply delegate to the same methods on the underlying Producer
. The execute
method provides direct access to the underlying Producer
.
To use the template, configure a producer factory and provide it in the template’s constructor:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
The template can also be configured using standard <bean/>
definitions.
Then, to use the template, simply invoke one of its methods.
When using the methods with a Message<?>
parameter, topic, partition and key information is provided in a message header:
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION_ID
-
KafkaHeaders.MESSAGE_KEY
with the message payload being the data.
Optionally, you can configure the KafkaTemplate
with a ProducerListener
to get an async callback with the results of the send (success or failure) instead of waiting for the Future
to complete.
public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value, Exception exception);
boolean isInterestedInSuccess();
}
By default, the template is configured with a LoggingProducerListener
which logs errors and does nothing when the send is successful.
onSuccess
is only called if isInterestedInSuccess
returns true
.
For convenience, the abstract ProducerListenerAdapter
is provided in case you only want to implement one of the methods. It returns false
for isInterestedInSuccess
.
Notice that the send methods return a ListenableFuture<SendResult>
. You can register a callback with the listener to receive the result of the send asynchronously.
ListenableFuture<SendResult<Integer, String>> future = template.send("foo");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
The SendResult
has two properties, a ProducerRecord
and RecordMetadata
; refer to the Kafka API documentation for information about those objects.
If you wish to block the sending thread, to await the result, you can invoke the future’s get()
method. You may wish to invokeflush()
before waiting or, for convenience, the template has a constructor with an autoFlush
parameter which will cause the template to flush()
on each send. Note, however that flushing will likely significantly reduce performance.
Receiving Messages
Messages can be received by configuring a MessageListenerContainer
and providing a Message Listener, or by using the@KafkaListener
annotation.
Message Listeners
When using a Message Listener Container you must provide a listener to receive data. There are currently four supported interfaces for message listeners:
public interface MessageListener<K, V> {} (1)
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {} (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface BatchMessageListener<K, V> {} (3)
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {} (4)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
-
Use this for processing individual
ConsumerRecord
s received from the kafka consumerpoll()
operation when using auto-commit, or one of the container-managed commit methods. -
Use this for processing individual
ConsumerRecord
s received from the kafka consumerpoll()
operation when using one of the manual commit methods. -
Use this for processing all
ConsumerRecord
s received from the kafka consumerpoll()
operation when using auto-commit, or one of the container-managed commit methods.AckMode.RECORD
is not supported when using this interface since the listener is given the complete batch. -
Use this for processing all
ConsumerRecord
s received from the kafka consumerpoll()
operation when using one of the manual commit methods.
Message Listener Containers
Two MessageListenerContainer
implementations are provided:
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all message from all topics/partitions on a single thread. TheConcurrentMessageListenerContainer
delegates to 1 or more KafkaMessageListenerContainer
s to provide multi-threaded consumption.
KafkaMessageListenerContainer
The following constructors are available.
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties,
TopicPartitionInitialOffset... topicPartitions)
Each takes a ConsumerFactory
and information about topics and partitions, as well as other configuration in aContainerProperties
object. The second constructor is used by the ConcurrentMessageListenerContainer
(see below) to distribute TopicPartitionInitialOffset
across the consumer instances. ContainerProperties
has the following constructors:
public ContainerProperties(TopicPartitionInitialOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
The first takes an array of TopicPartitionInitialOffset
arguments to explicitly instruct the container which partitions to use (using the consumer assign()
method), and with an optional initial offset: a positive value is an absolute offset by default; a negative value is relative to the current last offset within a partition by default. A constructor forTopicPartitionInitialOffset
is provided that takes an additional boolean
argument. If this is true
, the initial offsets (positive or negative) are relative to the current position for this consumer. The offsets are applied when the container is started. The second takes an array of topics and Kafka allocates the partitions based on the group.id
property - distributing partitions across the group. The third uses a regex Pattern
to select the topics.
Refer to the JavaDocs for ContainerProperties
for more information about the various properties that can be set.
ConcurrentMessageListenerContainer
The single constructor is similar to the first KafkaListenerContainer
constructor:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
It also has a property concurrency
, e.g. container.setConcurrency(3)
will create 3 KafkaMessageListenerContainer
s.
For the first constructor, kafka will distribute the partitions across the consumers. For the second constructor, theConcurrentMessageListenerContainer
distributes the TopicPartition
s across the delegateKafkaMessageListenerContainer
s.
If, say, 6 TopicPartition
s are provided and the concurrency
is 3; each container will get 2 partitions. For 5 TopicPartition
s, 2 containers will get 2 partitions and the third will get 1. If the concurrency
is greater than the number of TopicPartitions
, the concurrency
will be adjusted down such that each container will get one partition.
Committing Offsets
Several options are provided for committing offsets. If the enable.auto.commit
consumer property is true, kafka will auto-commit the offsets according to its configuration. If it is false, the containers support the following AckMode
s.
The consumer poll()
method will return one or more ConsumerRecords
; the MessageListener
is called for each record; the following describes the action taken by the container for each AckMode
:
-
RECORD - commit the offset when the listener returns after processing the record.
-
BATCH - commit the offset when all the records returned by the
poll()
have been processed. -
TIME - commit the offset when all the records returned by the
poll()
have been processed as long as theackTime
since the last commit has been exceeded. -
COUNT - commit the offset when all the records returned by the
poll()
have been processed as long asackCount
records have been received since the last commit. -
COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
-
MANUAL - the message listener is responsible to
acknowledge()
theAcknowledgment
; after which, the same semantics asBATCH
are applied. -
MANUAL_IMMEDIATE - commit the offset immediately when the
Acknowledgment.acknowledge()
method is called by the listener.
Note
|
MANUAL , and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or aBatchAcknowledgingMessageListener ; see Message Listeners. |
The commitSync()
or commitAsync()
method on the consumer is used, depending on the syncCommits
container property.
The Acknowledgment
has this method:
public interface Acknowledgment {
void acknowledge();
}
This gives the listener control over when offsets are committed.
@KafkaListener Annotation
The @KafkaListener
annotation provides a mechanism for simple POJO listeners:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic")
public void listen(String data) {
...
}
}
This mechanism requires an @EnableKafka
annotation on one of your @Configuration
classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer
: by default, a bean with namekafkaListenerContainerFactory
is expected.
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
Notice that to set container properties, you must use the getContainerProperties()
method on the factory. It is used as a template for the actual properties injected into the container.
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets):
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
Each partition can be specified in the partitions
or partitionOffsets
attribute, but not both.
When using manual AckMode
, the listener can also be provided with the Acknowledgment
; this example also shows how to use a different container factory.
@KafkaListener(id = "baz", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
Finally, metadata about the message is available from message headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
Starting with version 1.1, @KafkaListener
methods can be configured to receive the entire batch of consumer records received from the consumer poll. To configure the listener container factory to create batch listeners, set the batchListener
property:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
To receive a simple list of payloads:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
The topic, partition, offset etc are available in headers which parallel the payloads:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
Alternatively you can receive a List of Message<?>
objects with each offset, etc in each message, but it must be the only parameter (aside from an optional Acknowledgment
when using manual commits) defined on the method:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
You can also receive a list of ConsumerRecord<?, ?>
objects but it must be the only parameter (aside from an optionalAcknowledgment
when using manual commits) defined on the method:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
Filtering Messages
In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed. The framework cannot know whether such a message has been processed or not, that is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation thereof.
The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter
class, which can wrap your MessageListener
. This class takes an implementation of RecordFilterStrategy
where you implement the filter
method to signal that a message is a duplicate and should be discarded.
A FilteringAcknowledgingMessageListenerAdapter
is also provided for wrapping an AcknowledgingMessageListener
. This has an additional property ackDiscarded
which indicates whether the adapter should acknowledge the discarded record; it istrue
by default.
When using @KafkaListener
, set the RecordFilterStrategy
(and optionally ackDiscarded
) on the container factory and the listener will be wrapped in the appropriate filtering adapter.
Finally, FilteringBatchMessageListenerAdapter
and FilteringBatchAcknowledgingMessageListenerAdapter
are provided, for when using a batch message listener.
Retrying Deliveries
If your listener throws an exception, the default behavior is to invoke the ErrorHandler
, if configured, or logged otherwise.
Note
|
Two error handler interfaces are provided ErrorHandler and BatchErrorHandler ; the appropriate type must be configured to match the Message Listener. |
To retry deliveries, convenient listener adapters - RetryingMessageListenerAdapter
andRetryingAcknowledgingMessageListenerAdapter
are provided, depending on whether you are using a MessageListener
or an AcknowledgingMessageListener
.
These can be configured with a RetryTemplate
and RecoveryCallback<Void>
- see the spring-retry project for information about these components. If a recovery callback is not provided, the exception is thrown to the container after retries are exhausted. In that case, the ErrorHandler
will be invoked, if configured, or logged otherwise.
When using @KafkaListener
, set the RetryTemplate
(and optionally recoveryCallback
) on the container factory and the listener will be wrapped in the appropriate retrying adapter.
A retry adapter is not provided for any of the batch message listeners.
Detecting Idle Asynchronous Consumers
While efficient, one problem with asynchronous consumers is detecting when they are idle - users might want to take some action if no messages arrive for some period of time.
You can configure the listener container to publish a ListenerContainerIdleEvent
when some time passes with no message delivery. While the container is idle, an event will be published every idleEventInterval
milliseconds.
To configure this feature, set the idleEventInterval
on the container:
@Bean
public KafKaMessageListenerContainer(ConnectionFactory connectionFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafKaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
return container;
}
Or, for a @KafkaListener
…
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
In each of these cases, an event will be published once per minute while the container is idle.
Event Consumption
You can capture these events by implementing ApplicationListener
- either a general listener, or one narrowed to only receive this specific event. You can also use @EventListener
, introduced in Spring Framework 4.2.
The following example combines the @KafkaListener
and @EventListener
into a single class. It’s important to understand that the application listener will get events for all containers so you may need to check the listener id if you want to take specific action based on which container is idle. You can also use the @EventListener
condition
for this purpose.
The events have 4 properties:
-
source
- the listener container instance -
id
- the listener id (or container bean name) -
idleTime
- the time the container had been idle when the event was published -
topicPartitions
- the topics/partitions that the container was assigned at the time the event was generated
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
this.event = event;
eventLatch.countDown();
}
}
Important
|
Event listeners will see events for all containers; so, in the example above, we narrow the events received based on the listener ID. Since containers created for the @KafkaListener support concurrency, the actual containers are named id-n where the n is a unique value for each instance to support the concurrency. Hence we use startsWith in the condition. |
Caution
|
If you wish to use the idle event to stop the lister container, you should not call container.stop() on the thread that calls the listener - it will cause delays and unnecessary log messages. Instead, you should hand off the event to a different thread that can then stop the container. Also, you should not stop() the container instance in the event if it is a child container, you should stop the concurrent container instead. |
Current Positions when Idle
Note that you can obtain the current positions when idle is detected by implementing ConsumerSeekAware
in your listener; seeonIdleContainer()
in `Seeking to a Specific Offset.
Topic/Partition Initial Offset
There are several ways to set the initial offset for a partition.
When manually assigning partitions, simply set the initial offset (if desired) in the configured TopicPartitionInitialOffset
arguments (see Message Listener Containers). You can also seek to a specific offset at any time.
When using group management where the broker assigns partitions:
-
For a new
group.id
, the initial offset is determined by theauto.offset.reset
consumer property (earliest
orlatest
). -
For an existing group id, the initial offset is the current offset for that group id. You can, however, seek to a specific offset during initialization (or at any time thereafter).
Seeking to a Specific Offset
In order to seek, your listener must implement ConsumerSeekAware
which has the following methods:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in aConcurrentMessageListenerContainer
) you should store the callback in a ThreadLocal
or some other structure keyed by the listener Thread
.
When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback; you must use the callback argument, not the one passed intoregisterSeekCallback
. This method will never be called if you explicitly assign partitions yourself; use theTopicPartitionInitialOffset
in that case.
The callback has one method:
void seek(String topic, int partition, long offset);
You can also perform seek operations from onIdleContainer()
when an idle container is detected; see Detecting Idle Asynchronous Consumers for how to enable idle container detection.
To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback
for the appropriate thread.
Serialization/Deserialization and Message Conversion
Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys. It is present with theorg.apache.kafka.common.serialization.Serializer<T>
and org.apache.kafka.common.serialization.Deserializer<T>
abstractions with some built-in implementations. Meanwhile we can specify simple (de)serializer classes using Producer and/or Consumer configuration properties, e.g.:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
for more complex or particular cases, the KafkaConsumer
, and therefore KafkaProducer
, provides overloaded constructors to accept (De)Serializer
instances for keys
and/or values
, respectively.
To meet this API, the DefaultKafkaProducerFactory
and DefaultKafkaConsumerFactory
also provide properties to allow to inject a custom (De)Serializer
to target Producer
/Consumer
.
For this purpose, Spring for Apache Kafka also provides JsonSerializer
/JsonDeserializer
implementations based on the Jackson JSON object mapper. The JsonSerializer
is quite simple and just allows writing any Java object as a JSON byte[]
, the JsonDeserializer
requires an additional Class<?> targetType
argument to allow the deserialization of a consumedbyte[]
to the proper target object.
JsonDeserializer<Bar> barDeserializer = new JsonDeserializer<>(Bar.class);
Both JsonSerializer
and JsonDeserializer
can be customized with an ObjectMapper
. You can also extend them to implement some particular configuration logic in the configure(Map<String, ?> configs, boolean isKey)
method.
Although the Serializer
/Deserializer
API is quite simple and flexible from the low-level Kafka Consumer
and Producer
perspective, you might need more flexibility at the Spring Messaging level, either when using @KafkaListener
or Spring Integration. To easily convert to/from org.springframework.messaging.Message
, Spring for Apache Kafka provides aMessageConverter
abstraction with the MessagingMessageConverter
implementation and its StringJsonMessageConverter
customization. The MessageConverter
can be injected into KafkaTemplate
instance directly and viaAbstractKafkaListenerContainerFactory
bean definition for the @KafkaListener.containerFactory()
property:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Foo foo) {
...
}
When using a @KafkaListener
, the parameter type is provided to the message converter to assist with the conversion.
Note
|
When using the StringJsonMessageConverter , you should use a StringDeserializer in the kafka consumer configuration and StringSerializer in the kafka producer configuration, when using Spring Integration or theKafkaTemplate.send(Message<?> message) method. |
Log Compaction
When using Log Compaction, it is possible to send and receive messages with null
payloads which identifies the deletion of a key.
Starting with version 1.0.3, this is now fully supported.
To send a null
payload using the KafkaTemplate
simply pass null into the value argument of the send()
methods. One exception to this is the send(Message<?> message)
variant. Since spring-messaging
Message<?>
cannot have a null
payload, a special payload type KafkaNull
is used and the framework will send null
. For convenience, the staticKafkaNull.INSTANCE
is provided.
When using a message listener container, the received ConsumerRecord
will have a null
value()
.
To configure the @KafkaListener
to handle null
payloads, you must use the @Payload
annotation with required = false
; you will usually also need the key so your application knows which key was "deleted":
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// value == null represents key deletion
}
When using a class-level @KafkaListener
, some additional configuration is needed - a @KafkaHandler
method with aKafkaNull
payload:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
private final CountDownLatch latch1 = new CountDownLatch(2);
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
相关推荐
Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。
### Spring for Apache Kafka #### 一、概览与快速入门 **Spring for Apache Kafka** 是一个结合了 **Spring 框架** 和 **Apache Kafka** 的项目,它为开发基于Kafka的消息解决方案提供了核心的Spring概念支持。此...
Spring for Apache Kafka整合概述 Spring for Apache Kafka是Apache Kafka的一个Spring整合实现,旨在提供一个简洁、灵活的消息队列解决方案。下面是Spring for Apache Kafka的概述和关键特性: 概述 Spring for ...
本文将深入探讨"Streaming Architecture New Designs Using Apache Kafka and MapR Streams"这一主题,阐述如何利用这两种强大的工具构建高效、可扩展的流处理系统。 Apache Kafka是一种分布式流处理平台,由...
文档开始部分介绍了文档的版权信息,表明文档是Spring for Apache Kafka 2.1.9版本的官方文档,由Gary Russel、Artem Bilan和Biju Kunjummen撰写,由Pivotal Software Inc.出版。文档的使用条件允许个人和免费分发给...
《Apache Kafka实战》这本书深入浅出地介绍了Apache Kafka这一分布式流处理平台的各个方面,旨在帮助读者掌握Kafka的实际应用和核心概念。Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和...
《Spring Boot 整合 Apache Kafka》博文对应的源代码;博文地址:https://blog.csdn.net/shawearn1027/article/details/107067328
streaming applications using Apache Kafka and other big data tools. It includes best practices for building such applications and tackles some common challenges such as how to use Kafka efficiently to...
It is aimed at getting you started programming with Kafka so that you will have a solid foundation to dive deep into different types of implementations and integrations for Kafka producers and ...
在本文中,我们将深入探讨如何在Spring Boot应用中集成Apache Kafka。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,常用于构建实时数据管道和流处理应用程序。Spring Boot是一个简化Spring应用开发的框架,它...
标题中的“jiangjie qin-Auto Management for Apache Kafka and Distributed”指的是一个关于自动化管理Apache Kafka以及分布式状态系统的一份资料,可能是一个研究报告或者教程。Apache Kafka是一款流行的消息...
《Apache Kafka Cookbook》是由PACKT在2015年出版的一本专著,专注于介绍Apache Kafka这一分布式流处理平台的实战技巧和最佳实践。Apache Kafka是一个高性能、可扩展且容错性强的消息中间件,它被广泛应用于大数据...
Qin的报告“Auto Management for Apache Kafka and Distributed Stateful System in General”可能涵盖了这些领域的最新研究和实践,提供了对Kafka自动化管理和分布式状态系统全面理解的洞察。通过阅读这份报告,...
Building Data Streaming Applications with Apache Kafka 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
Apache Kafka是一款分布式流处理平台,它被设计用来处理大量数据,并且提供一个统一、高吞吐量、低延迟的处理数据管道。由于Kafka的这些特性,它在大数据处理和实时数据管道领域得到了广泛的应用。Kafka作为一个开源...
### Apache Kafka:设置集群与开发消息生产者及消费者 #### Apache Kafka概述 Apache Kafka是一种开源流处理平台,由LinkedIn公司创建并捐赠给Apache软件基金会。Kafka被设计为一种高吞吐量、分布式的消息系统,它...
Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。 1.Kafka集群包含一个或多个服务器,这种服务器被称为broker 2.Partition是物理上的概念,每个Topic...
在本文中,我们将深入探讨如何使用Spring Boot与Apache Kafka进行集成,从而实现高效的消息传递功能。Spring Boot简化了Kafka的配置和使用,使得开发者能够快速地构建基于事件驱动的应用程序。 首先,让我们理解...