`

Kafka JAVA客户端代码示例

 
阅读更多

介绍

     http://kafka.apache.org 
    kafka是一种高吞吐量的分布式发布订阅消息系统 
    kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态) 

    当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。 

高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

测试环境

    kafka_2.10-0.8.1.1 3个节点做的集群

    zookeeper-3.4.5 一个实例节点

代码示例

消息生产者代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
/**
 * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
 * @author Fung
 *
 */
public class ProducerDemo {
    public static void main(String[] args) {
        Random rnd = new Random();
        int events=100;
 
        // 设置配置属性
        Properties props = new Properties();
        props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // key.serializer.class默认为serializer.class
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        // 可选配置,如果不配置,则使用默认的partitioner
        props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
        // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
        // 值为0,1,-1,可以参考
        // http://kafka.apache.org/08/configuration.html
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
 
        // 创建producer
        Producer<String, String> producer = new Producer<String, String>(config);
        // 产生并发送消息
        long start=System.currentTimeMillis();
        for (long i = 0; i < events; i++) {
            long runtime = new Date().getTime();
            String ip = "192.168.2." + i;//rnd.nextInt(255);
            String msg = runtime + ",www.example.com," + ip;
            //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
            KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                    "page_visits", ip, msg);
            producer.send(data);
        }
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
        // 关闭producer
        producer.close();
    }
}

消息消费者代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
/**
 * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 *
 * @author Fung
 *
 */
public class ConsumerDemo {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
 
    public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
    }
 
    public void run(int numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
        // now launch all the threads
        executor = Executors.newFixedThreadPool(numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerMsgTask(stream, threadNumber));
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
            String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] arg) {
        String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
        demo.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        demo.shutdown();
    }
}

消息处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
 
public class ConsumerMsgTask implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;
 
    public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
        m_threadNumber = threadNumber;
        m_stream = stream;
    }
 
    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": "
                    + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

Partitioner类示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class PartitionerDemo implements Partitioner {
    public PartitionerDemo(VerifiableProperties props) {
 
    }
 
    @Override
    public int partition(Object obj, int numPartitions) {
        int partition = 0;
        if (obj instanceof String) {
            String key=(String)obj;
            int offset = key.lastIndexOf('.');
            if (offset > 0) {
                partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
            }
        }else{
            partition = obj.toString().length() % numPartitions;
        }
         
        return partition;
    }
 
}

pom.xml文件

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.xxx</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>kafka-demo</name>
    <url>http://maven.apache.org</url>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>mail</artifactId>
                    <groupId>javax.mail</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
分享到:
评论

相关推荐

    kafka-java-demo 基于java的kafka生产消费者示例

    在Maven的pom.xml文件中,你需要添加Kafka的Java客户端依赖,以便项目可以编译和运行Kafka相关的代码。 【Kafka的其他特性】 除了基本的生产和消费功能,Kafka还支持一些高级特性,如幂等性生产者、事务性消费者、...

    Kafka使用Java客户端进行访问的示例代码

    Kafka 使用 Java 客户端进行访问的示例代码 Kafka 是一种流行的分布式消息队列系统,广泛应用于大数据实时处理、日志聚合、消息队列等领域。Java 是一种广泛使用的编程语言,很多开发者使用 Java 进行 Kafka 的开发...

    kafka学习代码(java开发kafka)

    在" kafka-study "压缩包中,你可能会找到一些示例代码,这些代码展示了如何使用Java API创建Kafka生产者和消费者,以及如何处理错误和配置选项。通过研究这些示例,你可以更好地理解和应用Kafka的Java开发实践。...

    kafka java 下载的jar

    Kafka的Java客户端库使得Java开发者能够方便地与Kafka集群进行交互,包括生产消息、消费消息以及管理主题等操作。在这个场景中,"kafka java 下载的jar"指的是下载的Java客户端库的JAR文件,它包含了所有必要的类和...

    c#项目下使用kafka.zip

    软件开发设计:应用软件开发、系统软件开发、移动应用开发、网站开发C++、Java、python、web、C#等语言的项目开发与学习资料 硬件与设备:单片机、EDA、proteus、RTOS、包括计算机硬件、服务器、网络设备、存储设备...

    Kafka客户端开发实例java源码.zip

    在本压缩包“Kafka客户端开发实例java源码.zip”中,包含的是使用Java语言编写的Kafka客户端应用示例代码。Kafka是一款高吞吐量、分布式的消息系统,广泛应用于大数据实时处理、日志收集等领域。这个实例将帮助我们...

    kafka安装相关文件以及java调用kafka示例项目

    3. **创建Topic**:创建Topic是通过Kafka的命令行工具kafka-topics.sh完成的,或者在Java代码中使用AdminClient接口。你需要指定主题名、分区数、副本数等参数。 4. **配置管理**:Kafka的配置文件(server....

    spark streamming消费kafka数据存入hbase示例代码

    在这个示例中,Maven 用于管理项目的依赖关系,如 Spark、Kafka 和 HBase 的客户端库。 整个流程大致如下: 1. 设置 Spark Streaming 和 Kafka 连接参数。 2. 创建 DStream 从 Kafka 消费数据。 3. 处理 DStream ...

    apache-kafka-1.0.0 java Demo

    在Java中,我们需要引入Kafka的客户端库。1.0.0版本的Kafka提供了`kafka-clients`包,其中包含生产者和消费者的API。在项目中,你需要将对应的JAR文件添加到类路径中,或者使用Maven或Gradle等构建工具管理依赖。 ...

    java实现flink订阅Kerberos认证的Kafka消息示例源码

    - 它涉及到三个主要角色:客户端(Kafka消费者或生产者),服务端(Kafka Broker)和Kerberos域控制器(KDC)。 - 用户首先向KDC获取服务票证,然后使用该票证与服务端进行通信。 2. **配置Flink与Kafka的...

    Kafka中生产者和消费者java实现

    Kafka的Java客户端库通常包括`kafka-clients`,你可以添加以下依赖: ```xml &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients &lt;version&gt;2.8.1 ``` **生产者实现** 生产者是Kafka系统中消息的源头...

    Kafka生产者向kafka发送消息Java代码.zip

    首先,要创建一个Kafka生产者,我们需要引入Kafka的Java客户端库。在Maven项目中,这可以通过在`pom.xml`文件中添加以下依赖完成: ```xml &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients &lt;version&gt;...

    Java实现Kafka生产者消费者代码实例

    在Java中实现Kafka的生产者和消费者,需要依赖Kafka提供的客户端库。Maven项目中通常需要在pom.xml文件中加入kafka-clients依赖,如下: ```xml &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients ...

    java kafka 生产者/消费者demo

    在Java中,我们可以使用Kafka官方提供的Java客户端库来编写生产者和消费者程序。下面将详细介绍如何创建Java Kafka的生产者和消费者示例。 一、Kafka生产者 1. 添加依赖:在项目中添加Kafka的Maven依赖,如`org....

    2、java调用kafka api

    在项目中添加Maven依赖,以使用Kafka的Java客户端库。在`pom.xml`文件中,加入以下代码: ```xml &lt;!-- Kafka客户端工具 --&gt; &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients &lt;version&gt;2.4.1 &lt;!-- ...

    kafka-java-demo_java连接卡夫卡_steamyx8_DEMO_kafka_breathsru_

    在Java环境中,我们通常使用Kafka的官方Java客户端库来与Kafka服务端进行交互。 对于Java连接Kafka,我们首先需要在项目中引入相关的依赖。如果你使用的是Maven,可以在pom.xml文件中添加以下依赖: ```xml ...

    kafka生产者和消费者的javaAPI的示例代码

    "Kafka 生产者和消费者的 Java API 示例代码" 在本文中,我们将详细介绍 Kafka 生产者和消费者的 Java API 示例代码,以及相关的知识点和概念。 Kafka 概述 Apache Kafka 是一个分布式流媒体平台,用于构建实时...

    kafkademo代码

    【描述】提到的"java"是指该示例是用Java编程语言编写的,Java是通用且跨平台的编程语言,特别适合开发企业级应用程序,如Kafka客户端。在这个kafkademo中,我们可能可以看到如何使用Java API与Kafka集群进行交互,...

    java实现tcp客户端发送服务端解析程序

    这需要引入Apache Kafka的Java客户端库,并创建一个Producer实例: ```java Properties props = new Properties(); props.put("bootstrap.servers", "kafka服务器地址:端口"); props.put("key.serializer", "org....

    kafka-consumer:Kafka消费者示例

    本文将深入探讨 `Kafka消费者` 的概念、工作原理以及如何在 Java 中实现消费者示例。 ### Kafka 消费者简介 Kafka 消费者是 Kafka 集群中的客户端程序,负责从主题(Topic)中读取和消费消息。它们通过消费组...

Global site tag (gtag.js) - Google Analytics