首先maven导入kafka的包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
先来Producter生产者:
public static void main(String[] args) {
//kafka的配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//key的序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value的序列化方式一般是json方式,这里我自己写了个对象序列化方式
props.put("value.serializer", "com.zfsy.syyx.kafka.MySeri");
Producer<String, RoleBean> producer = new KafkaProducer<String, RoleBean>(props);
RoleBean bean = new RoleBean();
bean.setDm("test");
bean.setMc("测试2");
//topic为linlin
ProducerRecord<String, RoleBean> record = new ProducerRecord<String, RoleBean>("linlin", "test03", bean);
//发送到kafka客户端
Future<RecordMetadata> future = producer.send(record);
try {
System.out.println(future.get().partition());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
然后就是消费者:
//消费者的配置
Properties props2 = new Properties();
props2.put("bootstrap.servers", "localhost:9092");
props2.put("group.id", "test");
props2.put("enable.auto.commit", "false");
props2.put("auto.commit.interval.ms", "1000");
props2.put("session.timeout.ms", "30000");
//key的反序列化方式
props2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化方式
props2.put("value.deserializer", "com.zfsy.syyx.kafka.MySeri");
Consumer<String, RoleBean> consumer = new KafkaConsumer<String, RoleBean>(props2);
//topic为linlin
consumer.subscribe(Lists.newArrayList("linlin"));
//一直获取
while(true){
ConsumerRecords<String, RoleBean> recoreds = consumer.poll(1000);
Iterator<ConsumerRecord<String, RoleBean>> iterator = recoreds.iterator();
while(iterator.hasNext()){
ConsumerRecord<String, RoleBean> record2 = iterator.next();
System.out.println(record2.value().getDm());
System.out.println(record2.value().getMc());
}
}
}
MySeri源码:
public class MySeri implements Deserializer<Object>, Serializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public Object deserialize(String topic, byte[] data) {
ByteArrayInputStream in = new ByteArrayInputStream(data);
try {
ObjectInputStream objectInputStream = new ObjectInputStream(in);
return objectInputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, Object data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(out);
objectOutputStream.writeObject(data);
return out.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
分享到:
相关推荐
总结来说,`kafkatool` 是一个强大而实用的 Kafka 管理工具,能够帮助开发者和运维人员更轻松地管理和监控 Kafka 集群,提高工作效率。无论是创建和管理主题、监控消费者组,还是进行数据操作,`kafkatool` 都能提供...
KafkaTool2-64bit是一款专为Kafka设计的强大管理工具,它以其64位架构提供了高效、稳定且用户友好的界面,使得对Apache Kafka集群的管理和监控变得更加简单。这款工具面向的是对大数据处理和消息队列系统有深入需求...
由于其为独立的可执行文件(kafkatool_64bit.exe),因此安装非常简单,只需将文件下载到本地,双击运行即可。无需复杂的配置,大大降低了用户的使用门槛。 在功能方面,Kafkatool_64bit提供了多种实用工具: 1. *...
8. **Kafka Streams**:这是Kafka自带的轻量级流处理库,允许开发者在Kafka集群内进行简单的流处理,无需额外的计算资源。 9. **Zookeeper集成**:Kafka依赖Zookeeper进行集群协调和管理,包括分配分区、维护消费者...
### Kafka 3.2 常用命令详解 #### 一、启动 ZooKeeper 服务 ...这些命令是日常管理和测试 Kafka 集群时非常实用的工具。熟练掌握这些命令可以帮助开发者和运维人员更高效地管理和使用 Kafka 集群。
《图解 Kafka 之实战指南》是一本深入解析Apache Kafka的实用书籍,旨在帮助读者理解和掌握这个分布式消息系统的精髓。Kafka是一个高吞吐、低延迟的开源流处理平台,常用于实时数据管道和流应用的构建。在这个实战...
Kafkatool_64bit是一款专为Windows用户设计的高效且实用的Kafka管理工具,旨在简化Kafka集群的操作与管理,特别是在数据操作和主题管理方面。这款工具以其便捷性和功能强大而受到广大开发人员和运维人员的青睐。 ...
标题中的“kafka的topic小工具”指的是一个用于管理和操作Kafka主题的实用程序,它提供了用户友好的界面或命令行工具,使得在Kafka集群上执行常见的管理任务变得简单。Kafka是一个分布式流处理平台,广泛应用于...
而Kafka Tools 1.0.3则是一款专为Kafka设计的实用工具,它极大地简化了对Kafka集群的管理和监控,使得开发者和运维人员能够更加高效地进行日常操作。 Kafka Tools的核心功能主要体现在以下几个方面: 1. **查看...
总之,Kafkatool_64bit是Windows用户管理Kafka集群的实用工具,它的易用性和强大的功能使得Kafka的运维工作变得更加高效。无论你是初学者还是经验丰富的开发者,都能从中受益,提升Kafka环境的操作体验。
`Kafka Manager`的Web界面提供了一系列实用功能。例如,用户可以通过界面创建和删除主题,调整分区数量,查看消费者组的消费详情,甚至进行简单的性能测试。此外,它还提供了报警配置,可以根据预设的条件触发通知,...
`kafka-demo.zip`包含了一个简单的项目,展示了生产者和消费者的基本用法,以及Kafka配置的关键参数。 首先,让我们了解一下SpringBoot和Kafka的基础。SpringBoot是Spring框架的简化版本,它通过自动配置和起步依赖...
5. **数据导出与导入**:该工具还支持将Kafka主题的数据导出到文件,或者从文件导入到Kafka,这对于数据迁移、备份和测试场景非常实用。 三、使用方法 kafkatool_64bit.exe是基于命令行的工具,因此需要熟悉基本的...
安装Kafka Manager 1.3.3.20非常简单,只需下载解压zip文件`kafka-manager-1.3.3.20`,然后按照官方文档的步骤进行配置即可。通常,你需要配置`app-conf/kafka-manager.conf`文件中的`zkhosts`参数,指定你的...
这时,Kafka Manager便应运而生,它为Kafka提供了一种直观且用户友好的管理界面,使得集群管理变得简单。这里我们关注的是已经编译好的版本——kafka-manager-1.3.3.17,它为用户节省了自行编译的时间和精力。 ...
Lua脚本发送Kafka工具包是一个实用的开发资源,它允许开发者使用Lua语言与Apache Kafka进行交互。Apache Kafka是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。Kafka以其高吞吐量、持久化和容错性著称,...
总的来说,Kafka Manager 1.3.3.18 提供了一个强大的管理平台,使得 Kafka 的运维工作变得更加简单和高效。无论是日常监控还是故障排查,它都是 Kafka 用户不可或缺的工具。通过深入理解和熟练运用 Kafka Manager,...
另一方面,Kafka Manager 2.0.0.2是2019年的旧版本,尽管相对较早,但依然具有稳定性和实用性。该版本可能专注于基本功能的实现和完善,如创建、删除和修改topics,以及监控producer和consumer的行为。对于那些只...
它提供了一系列实用功能,如创建、删除和管理主题,查看生产者和消费者的信息,以及监控集群的健康状态。Kafka Manager 1.3.4是其稳定版本之一,带来了更多的改进和优化。 二、安装与启动 在获得“kafka-manager”...
其次,Kafka Manager支持创建、删除和修改topics,这在日常运维中非常实用。只需几个简单的步骤,你就可以为新服务添加topic,或者根据需求调整现有topic的分区数量和副本数。此外,它还提供了创建和管理consumer ...