`

0.8.0 Producer Example(2)

 
阅读更多
 
 
Skip to end of metadata
 
Go to start of metadata
 

Once you have confirmed you have a basic Kafka cluster setup (see 0.8 Quick Start) it is time to write some code!

Producers

The Producer class is used to create new messages for a specific Topic and optional Partition. 

If using Java you need to include a few packages for the Producer and supporting classes:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

The first step in your code is to define properties for how the Producer finds the cluster, serializes the messages and if appropriate directs the message to a specific Partition. 

These properties are defined in the standard Java Properties object:

Properties props = new Properties();
 
props.put("metadata.broker.list""broker1:9092,broker2:9092");
props.put("serializer.class""kafka.serializer.StringEncoder");
props.put("partitioner.class""example.producer.SimplePartitioner");
props.put("request.required.acks""1");
 
ProducerConfig config = new ProducerConfig(props);

The first property, “metadata.broker.list” defines where the Producer can find a one or more Brokers to determine the Leader for each topic. This does not need to be the full set of Brokers in your cluster but should include at least two in case the first Broker is not available. No need to worry about figuring out which Broker is the leader for the topic (and partition), the Producer knows how to connect to the Broker and ask for the meta data then connect to the correct Broker. 

The second property “serializer.class” defines what Serializer to use when preparing the message for transmission to the Broker. In our example we use a simple String encoder provided as part of Kafka. Note that the encoder must accept the same type as defined in the KeyedMessage object in the next step. 

It is possible to change the Serializer for the Key (see below) of the message by defining "key.serializer.class" appropriately. By default it is set to the same value as "serializer.class". 

The third property  "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition. 

The last property "request.required.acks" tells Kafka that you want your Producer to require an acknowledgement from the Broker that the message was received. Without this setting the Producer will 'fire and forget' possibly leading to data loss. Additional information can be foundhere 

Next you define the Producer object itself:

Producer<String, String> producer = new Producer<String, String>(config);

Note that the Producer is a Java Generic and you need to tell it the type of two parameters. The first is the type of the Partition key, the second the type of the message. In this example they are both Strings, which also matches to what we defined in the Properties above. 

Now build your message:

Random rnd = new Random();
 
long runtime = new Date().getTime();
 
String ip = “192.168.2.” + rnd.nextInt(255);
 
String msg = runtime + “,www.example.com,” + ip;

 

In this example we are faking a message for a website visit by IP address. First part of the comma-separated message is the timestamp of the event, the second is the website and the third is the IP address of the requester. We use the Java Random class here to make the last octet of the IP vary so we can see how Partitioning works.

Finally write the message to the Broker:

KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
 
producer.send(data);

The “page_visits” is the Topic to write to. Here we are passing the IP as the partition key. Note that if you do not include a key, even if you've defined a partitioner class, Kafka will assign the message to a random partition. 

Full Source:

import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list""broker1:9092,broker2:9092 ");
        props.put("serializer.class""kafka.serializer.StringEncoder");
        props.put("partitioner.class""example.producer.SimplePartitioner");
        props.put("request.required.acks""1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

 

Partitioning Code: 

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner<String> {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}

The logic takes the key, which we expect to be the IP address, finds the last octet and does a modulo operation on the number of partitions defined within Kafka for the topic. The benefit of this partitioning logic is all web visits from the same source IP end up in the same Partition. Of course so do other IPs, but your consumer logic will need to know how to handle that.

Before running this, make sure you have created the Topic page_visits. From the command line:

bin/kafka-create-topic.sh --topic page_visits --replica 3 --zookeeper localhost:2181 --partition 5

Make sure you include a --partition option so you create more than one. 

Now compile and run your Producer and data will be written to Kafka. 

To confirm you have data, use the command line tool to see what was written:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning

 

 
分享到:
评论

相关推荐

    grib2json-0.8.0.zip

    标题 "grib2json-0.8.0.zip" 提供了一个工具的版本信息,它主要用于将GRIB2格式的气象数据转换成JSON格式。GRIB(GRIdded Binary)是世界气象组织(WMO)推荐的一种用于存储气象预报和观测数据的二进制格式,特别是...

    ADT-0.8.0.zip

    ADT-0.8.0.zipADT-0.8.0.zipADT-0.8.0.zipADT-0.8.0.zip

    astor-0.8.0-py2.py3-none-any.whl

    tensorflow工具安装依赖包之一的astor 0.8.0 python2 python3通用

    openocd 0.8.0 for windows

    openocd for windows 0.8.0

    ArchivesSnake-0.8.0-py2-none-any.whl.zip

    标题中的"ArchivesSnake-0.8.0-py2-none-any.whl.zip"表明这是一个归档文件,其中包含一个名为"ArchivesSnake"的软件库的版本为0.8.0的Python wheels包。Wheels是Python的一种二进制分发格式,它解决了源代码安装包...

    AioMemcached-0.8.0-py2.py3-none-any.whl.zip

    《AioMemcached-0.8.0-py2.py3-none-any.whl.zip:一个异步Memcached库的Python实现》 在Python的世界里,高效的数据存储和访问是开发高性能应用的关键。AioMemcached是一款针对Python设计的异步Memcached客户端库...

    jadx-gui-0.8.0

    2. **图形界面操作**: 用户友好的GUI使得代码浏览、搜索和导航变得直观且简便。你可以通过它查看类、方法、变量等,甚至可以进行代码的折叠和展开,以适应不同的查看需求。 3. **详细注释**: jadx-gui会尽可能地...

    PyPI 官网下载 | mobile_balance-0.8.0-py2-none-any.whl

    《PyPI官网下载:mobile_balance-0.8.0-py2-none-any.whl》 在Python的世界里,PyPI(Python Package Index)是官方的软件仓库,它为开发者提供了无数的第三方模块和库,极大地扩展了Python的功能。本文将详细讲解...

    xlrd-0.8.0.zip

    xlrd库是Python中广泛使用的开源库,其版本0.8.0提供了对早期Excel文件(.xls格式)的支持。虽然不支持最新的.xlsx格式(需要使用openpyxl或pandas等其他库),但对于处理大量的历史数据或者需要兼容旧版Excel文件的...

    classmate-0.8.0.jar

    classmate-0.8.0.jar

    chukwa-src-0.8.0.tar

    2. Agent:部署在各个分布式系统节点上,Agent负责从系统中收集日志和其他监控数据,然后通过网络发送到Collector。 3. Hadoop Integration:Chukwa利用Hadoop的MapReduce框架进行数据处理,这使得它能够处理PB级别...

    PyPI 官网下载 | scrape-0.8.0-py2-none-any.whl

    标题中的“PyPI 官网下载 | scrape-0.8.0-py2-none-any.whl”表明我们讨论的是一个Python软件包,它可以从Python的官方包索引(PyPI)上获取。"scrape"是这个包的名字,版本号为0.8.0,而".whl"是Python的轮子文件...

    axiom-0.8.0-py2-none-any.whl.zip

    axiom-0.8.0-py2-none-any.whl.zip

    bacnet-stack-0.8.0

    Bacnet-stack-0.8.0 是一个针对BACnet协议的开源实现,适用于各种嵌入式设备。这个版本的堆栈(stack)是0.8.0,它旨在为不同平台提供支持,包括Linux、Windows 32位系统、PIC微控制器以及ARM7架构的处理器。BACnet...

    PyMySQL-0.8.0-py2.py3-none-any.whl

    PyMySQL-0.8.0-py2.py3-none-any.whl

    bonecp-0.8.0.RELEASE.jar

    在标题中提到的 "bonecp-0.8.0.RELEASE.jar" 是BoneCP连接池的一个特定版本,版本号为0.8.0.RELEASE。 描述中提到了几个相关的JAR文件,它们分别是: 1. **bonecp.jar**:这是BoneCP的核心库文件,包含了连接池的...

    bazel-0.8.0

    标题"Bazel-0.8.0"指的是Bazel工具的一个特定版本,即0.8.0。Bazel是Google开源的一款构建工具,它支持多语言项目,能够有效地管理和编译大型软件工程。描述提到的"bazel-0.8.0-dist.zip"是一个包含Bazel源码的压缩...

    advertools-0.8.0-py2.py3-none-any.whl.zip

    "advertools-0.8.0-py2.py3-none-any.whl.zip" 是一个压缩包文件,其中包含了Python库advertools的版本0.8.0的安装包。这个文件名遵循了Python的Wheel格式,Wheel是Python的一种二进制包格式,用于简化Python库的...

    Python库 | nala-0.8.0-py2.py3-none-any.whl

    "nala-0.8.0-py2.py3-none-any.whl" 文件是一个Python库的发行版,它是一个预编译的Python轮子文件(wheel file),为Python 2和Python 3版本提供了兼容性。 **什么是Python Wheel文件?** Python Wheel是一种二...

    torch-1.6.0 + torchvision-0.8.0压缩包

    内含 torchvision-0.8.0a0+10d5a55-cp37-cp37m-linux_armv7l 和 torch-1.6.0a0+b31f58d-cp37-cp37m-linux_armv7l

Global site tag (gtag.js) - Google Analytics