`
lizhuang
  • 浏览: 907756 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

apache kafka0.10.20 搭建简单环境并运行javaDemo

阅读更多
官网下载apache kafka 0.10.20版本,本例子基于jdk1.8环境,mac os el captain.

第一步:下载0.10.2.0压缩包,解压缩
官网http://kafka.apache.org/下载
> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0

第二步 启动服务器
启动zookeeper服务器
> bin/zookeeper-server-start.sh config/zookeeper.properties


启动Kafka服务器
> bin/kafka-server-start.sh config/server.properties


第三步 创建主题
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

> bin/kafka-topics.sh --list --zookeeper localhost:2181


第四步 给主题发信息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

第五步 创建用户(consumer)接收信息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message


使用java创建demo主要针对创建主题,发信息,这主要靠kafka生产者(Producer).
//连接配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //生产者接口及相关实现类
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            //发送消息
            /**
             * ProducerRecord(String topic, K key, V value) Create a record to
             * be sent to Kafka
             */
            producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
        }

        producer.close();


从服务器主动拉取信息需要Consumer.
Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n", 
                        record.topic(),record.partition(),record.offset(), record.key(), record.value());
            }
        }


使用main方法启动consumerTester类再启动producerTester,在console就可以看到接收到的信息了。
--- exec-maven-plugin:1.2.1:exec (default-cli) @ KafkaDemo ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details
topic = my-topic, partition = 0, offset = 300, key = 0, value = 0
topic = my-topic, partition = 0, offset = 301, key = 1, value = 1
topic = my-topic, partition = 0, offset = 302, key = 2, value = 2
topic = my-topic, partition = 0, offset = 303, key = 3, value = 3
topic = my-topic, partition = 0, offset = 304, key = 4, value = 4
topic = my-topic, partition = 0, offset = 305, key = 5, value = 5
topic = my-topic, partition = 0, offset = 306, key = 6, value = 6
topic = my-topic, partition = 0, offset = 307, key = 7, value = 7
topic = my-topic, partition = 0, offset = 308, key = 8, value = 8
topic = my-topic, partition = 0, offset = 309, key = 9, value = 9


具体相关的配置可以看附件maven项目。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics