`
QING____
  • 浏览: 2253333 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Kafka部署与代码实例

 
阅读更多

    kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.(本示例基于伪分布式部署)

    1) zk-0

    调整配置文件:

clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

./zkServer.sh start

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

clientPort=2182
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

clientPort=2183
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
##replication机制,让每个topic的partitions在kafka-cluster中备份2个
##用来提高cluster的容错能力..
default.replication.factor=1
log.cleanup.interval.mins=10
##zookeeper.connect指定zookeeper的地址,默认情况下将会在zk的“/”目录下
##创建meta信息和路径,为了对znode进行归类,我们可以在connect之后追加路径,比如
##127.0.0.1:2183/kafka
##不过需要注意,此后的producer、consumer都需要带上此根路径
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency 

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

broker.id=1
port=9093
##其他配置和kafka-0保持一致

    然后和kafka-0一样执行打包命令,然后启动此broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

    仍然可以通过如下指令查看topic的"partition"/"replicas"的分布和存活情况.

> bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: my-replicated-topic	partition: 0	leader: 2	replicas: 1,2,0	isr: 2
topic: test	partition: 0	leader: 0	replicas: 0	isr: 0 

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。[配置参数详解]

 

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

<dependencies>
	<dependency>
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.14</version>
	</dependency>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.8.2</artifactId>
		<version>0.8.0</version>
		<exclusions>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>log4j</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.scala-lang</groupId>
		<artifactId>scala-library</artifactId>
		<version>2.8.2</version>
	</dependency>
	<dependency>
		<groupId>com.yammer.metrics</groupId>
		<artifactId>metrics-core</artifactId>
		<version>2.2.0</version>
	</dependency>
	<dependency>
		<groupId>com.101tec</groupId>
		<artifactId>zkclient</artifactId>
		<version>0.3</version>
	</dependency>
</dependencies>

 

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

#partitioner.class=
##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata
##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来
##此值,我们可以在spring中注入过来
##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
##同步,建议为async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
##在producer.type=async时有效
#batch.num.messages=100

    2) KafkaProducerClient.java代码样例

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * User: guanqing-liu
 */
public class KafkaProducerClient {

	private Producer<String, String> inner;
	
	private String brokerList;//for metadata discovery,spring setter
	private String location = "kafka-producer.properties";//spring setter
	
	private String defaultTopic;//spring setter

	public void setBrokerList(String brokerList) {
		this.brokerList = brokerList;
	}

	public void setLocation(String location) {
		this.location = location;
	}

	public void setDefaultTopic(String defaultTopic) {
		this.defaultTopic = defaultTopic;
	}

	public KafkaProducerClient(){}
	
	public void init() throws Exception {
		Properties properties = new Properties();
		properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
		
		
		if(brokerList != null) {
			properties.put("metadata.broker.list", brokerList);
		}

		ProducerConfig config = new ProducerConfig(properties);
		inner = new Producer<String, String>(config);
	}

	public void send(String message){
		send(defaultTopic,message);
	}
	
	public void send(Collection<String> messages){
		send(defaultTopic,messages);
	}
	
	public void send(String topicName, String message) {
		if (topicName == null || message == null) {
			return;
		}
		KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
		inner.send(km);
	}

	public void send(String topicName, Collection<String> messages) {
		if (topicName == null || messages == null) {
			return;
		}
		if (messages.isEmpty()) {
			return;
		}
		List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
		int i= 0;
		for (String entry : messages) {
			KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
			kms.add(km);
			i++;
			if(i % 20 == 0){
				inner.send(kms);
				kms.clear();
			}
		}
		
		if(!kms.isEmpty()){
			inner.send(kms);
		}
	}

	public void close() {
		inner.close();
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		KafkaProducerClient producer = null;
		try {
			producer = new KafkaProducerClient();
			//producer.setBrokerList("");
			int i = 0;
			while (true) {
				producer.send("test-topic", "this is a sample" + i);
				i++;
				Thread.sleep(2000);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (producer != null) {
				producer.close();
			}
		}

	}

}

    3) spring配置

    <bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close">
        <property name="zkConnect" value="${zookeeper_cluster}"></property>
        <property name="defaultTopic" value="${kafka_topic}"></property>
    </bean>

 

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

## 此值可以配置,也可以通过spring注入
##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
auto.commit.enable=true
auto.commit.interval.ms=60000

    2) KafkaConsumerClient.java代码样例

package com.test.kafka;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

/**
 * User: guanqing-liu 
 */
public class KafkaConsumerClient {

	private String groupid; //can be setting by spring
	private String zkConnect;//can be setting by spring
	private String location = "kafka-consumer.properties";//配置文件位置
	private String topic;
	private int partitionsNum = 1;
	private MessageExecutor executor; //message listener
	private ExecutorService threadPool;
	
	private ConsumerConnector connector;
	
	private Charset charset = Charset.forName("utf8");

	public void setGroupid(String groupid) {
		this.groupid = groupid;
	}

	public void setZkConnect(String zkConnect) {
		this.zkConnect = zkConnect;
	}

	public void setLocation(String location) {
		this.location = location;
	}

	public void setTopic(String topic) {
		this.topic = topic;
	}

	public void setPartitionsNum(int partitionsNum) {
		this.partitionsNum = partitionsNum;
	}

	public void setExecutor(MessageExecutor executor) {
		this.executor = executor;
	}

	public KafkaConsumerClient() {}

	//init consumer,and start connection and listener
	public void init() throws Exception {
		if(executor == null){
			throw new RuntimeException("KafkaConsumer,exectuor cant be null!");
		}
		Properties properties = new Properties();
		properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
		
		if(groupid != null){
			properties.put("groupid", groupid);
		}
		if(zkConnect != null){
			properties.put("zookeeper.connect", zkConnect);
		}
		ConsumerConfig config = new ConsumerConfig(properties);

		connector = Consumer.createJavaConsumerConnector(config);
		Map<String, Integer> topics = new HashMap<String, Integer>();
		topics.put(topic, partitionsNum);
		Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
		List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
		threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
		
		//start
		for (KafkaStream<byte[], byte[]> partition : partitions) {
			threadPool.execute(new MessageRunner(partition));
		}
	}

	public void close() {
		try {
			threadPool.shutdownNow();
		} catch (Exception e) {
			//
		} finally {
			connector.shutdown();
		}

	}

	class MessageRunner implements Runnable {
		private KafkaStream<byte[], byte[]> partition;

		MessageRunner(KafkaStream<byte[], byte[]> partition) {
			this.partition = partition;
		}

		public void run() {
			ConsumerIterator<byte[], byte[]> it = partition.iterator();
			while (it.hasNext()) {
				// connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用
				MessageAndMetadata<byte[], byte[]> item = it.next();
				try{
					executor.execute(new String(item.message(),charset));// UTF-8,注意异常
				}catch(Exception e){
					//
				}
			}
		}
		
		public String getContent(Message message){
            ByteBuffer buffer = message.payload();
            if (buffer.remaining() == 0) {
                return null;
            }
            CharBuffer charBuffer = charset.decode(buffer);
            return charBuffer.toString();
		}
	}

	public static interface MessageExecutor {

		public void execute(String message);
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		KafkaConsumerClient consumer = null;
		try {
			MessageExecutor executor = new MessageExecutor() {

				public void execute(String message) {
					System.out.println(message);
				}
			};
			consumer = new KafkaConsumerClient();
			
			consumer.setTopic("test-topic");
			consumer.setPartitionsNum(2);
			consumer.setExecutor(executor);
			consumer.init();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			 if(consumer != null){
				 consumer.close();
			 }
		}

	}

}

    3) spring配置(略)

 

    需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

4
0
分享到:
评论
21 楼 jjshanwei 2014-10-14  
jjshanwei 写道
jjshanwei 写道
你好!

Map<String, Integer> topics = new HashMap<String, Integer>(); 
topics.put(topic, partitionsNum); 
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);

消费者订阅一个topic时为什么还要传入partitionsNum呢,一个 consumer group 里的一个consumer 只对应一个partition. partitionsNum在消费端这里有什么作用呢, 必须和定义topic时指定的parition数一致吗?

谢谢!



如果topic 的partion数量为 3,  consumer的partitionsNum 设为1, 会导致另外两个partion里的消息消费不到吗



这个地方明白了,  这里是为了如果不存在该topic的话会自动创建该topic及指定的Partition数, 如果存在的话无影响.
20 楼 jjshanwei 2014-10-13  
jjshanwei 写道
你好!

Map<String, Integer> topics = new HashMap<String, Integer>(); 
topics.put(topic, partitionsNum); 
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);

消费者订阅一个topic时为什么还要传入partitionsNum呢,一个 consumer group 里的一个consumer 只对应一个partition. partitionsNum在消费端这里有什么作用呢, 必须和定义topic时指定的parition数一致吗?

谢谢!



如果topic 的partion数量为 3,  consumer的partitionsNum 设为1, 会导致另外两个partion里的消息消费不到吗
19 楼 jjshanwei 2014-10-13  
你好!

Map<String, Integer> topics = new HashMap<String, Integer>(); 
topics.put(topic, partitionsNum); 
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);

消费者订阅一个topic时为什么还要传入partitionsNum呢,一个 consumer group 里的一个consumer 只对应一个partition. partitionsNum在消费端这里有什么作用呢, 必须和定义topic时指定的parition数一致吗?

谢谢!
18 楼 QING____ 2014-05-22  
liuzhipassman 写道
QING____ 写道
liuzhipassman 写道
楼主知道怎么实现批量消费吗,你的代码只能逐一消费message,通常我们需要批量处理,将message set交给相应的worker api去处理。

你是说,一次获取多条消息?不过不用担心,kafka consumer端是一次从broker获取多条消息的,只不过似乎kafka API中没有一次获取多条消息的操作,不过自己只能一次获取一条消息,然后封装成set,交给worker。这并没有性能问题。

我试过这种方式,但是当没有消息过来的时候,其实是block在while (it.hasNext()) {}循环里面,出不去的,所以没法交出这个Set。如果说另起线程去Set里拿数据的话,那么这个问题又更加复杂化了。
不知道有没有其他替代方案?


你可以将kafka中获取的消息放入一个阻塞队列中,让你的worker线程从队列中拿消息,所以worker线程可以决定取出消息的个数和时机。。
目前kafka Api中没有批量获取消息的方式。
17 楼 liuzhipassman 2014-05-22  
QING____ 写道
liuzhipassman 写道
楼主知道怎么实现批量消费吗,你的代码只能逐一消费message,通常我们需要批量处理,将message set交给相应的worker api去处理。

你是说,一次获取多条消息?不过不用担心,kafka consumer端是一次从broker获取多条消息的,只不过似乎kafka API中没有一次获取多条消息的操作,不过自己只能一次获取一条消息,然后封装成set,交给worker。这并没有性能问题。

我试过这种方式,但是当没有消息过来的时候,其实是block在while (it.hasNext()) {}循环里面,出不去的,所以没法交出这个Set。如果说另起线程去Set里拿数据的话,那么这个问题又更加复杂化了。
不知道有没有其他替代方案?
16 楼 QING____ 2014-05-16  
liuzhipassman 写道
楼主知道怎么实现批量消费吗,你的代码只能逐一消费message,通常我们需要批量处理,将message set交给相应的worker api去处理。

你是说,一次获取多条消息?不过不用担心,kafka consumer端是一次从broker获取多条消息的,只不过似乎kafka API中没有一次获取多条消息的操作,不过自己只能一次获取一条消息,然后封装成set,交给worker。这并没有性能问题。
15 楼 liuzhipassman 2014-05-16  
楼主知道怎么实现批量消费吗,你的代码只能逐一消费message,通常我们需要批量处理,将message set交给相应的worker api去处理。
14 楼 lanxiaoshuang 2014-03-11  
学习了,收益匪浅,Love,
13 楼 QING____ 2014-02-08  
heipark 写道
使用楼主的代码发现producer产生的消息只发送到一个partition,导致只有一个consumer线程可以获取数据。
原因在于producer没有使用key将消息hash(不设置key,将导致消息只发送给一个partition),解决办法,使用三个参数的KeyedMessage构造函数,添加key字段,message会根据它进行hash,然后分布到不同partition:

new KeyedMessage<String, String>(topicName, key, message);


非常正确..
12 楼 heipark 2014-01-27  
使用楼主的代码发现producer产生的消息只发送到一个partition,导致只有一个consumer线程可以获取数据。
原因在于producer没有使用key将消息hash(不设置key,将导致消息只发送给一个partition),解决办法,使用三个参数的KeyedMessage构造函数,添加key字段,message会根据它进行hash,然后分布到不同partition:

new KeyedMessage<String, String>(topicName, key, message);
11 楼 gaoshui87 2013-12-30  
我通过看他的日志  Adding fetcher for partition [testg,1], initOffset 6 to broker 0 with fetcherId 0 已经移动了,但是换了分组名还是不出来
10 楼 gaoshui87 2013-12-30  
楼主,为什么我换了分组名称后还是不能重复消费的呢?我在0.7版本里面测试是没有问题的,但是到了0.8就出现这个问题了!
9 楼 QING____ 2013-12-27  
zhangnianli 写道
QING____ 写道
zhangnianli 写道
楼主你好,请问你个问题,
kafka的消息全部存放在日志里,那被消费后的日志会自动清除吗?
我查到log.retention.hours的配置,是清理日志用的,但是为什么会把未消费的数据也给清除了?

无论日志是否被消费,按照"log.retention.hours="的滞后时长删除过期的日志文件.事实上"清理器"是不关心日志是否被消费.
之所以删除日志,kafka可能考虑到存储上带来的磁盘开支.如果你期望其他的存储支持(比如Hadoop)等,你可以考虑使用一个consumer将日志数据消费并转存到hadoop中.

哦,谢谢!
那就是说Kafka不像传统的MQ那样会对已消费的数据进行处理,而是最大程序的保证数据的吞吐量.



ActiveMQ,本身也会对已经消费的消息进行"dequeue"操作(从store中删除,或者从内存移除),而且当队列深度达到一定程度时,仍然会触发一定的"evictionPolicy";你可以简单的认为"log.retention.hours"是一个单调的evictionPolicy.
8 楼 zhangnianli 2013-12-27  
QING____ 写道
zhangnianli 写道
楼主你好,请问你个问题,
kafka的消息全部存放在日志里,那被消费后的日志会自动清除吗?
我查到log.retention.hours的配置,是清理日志用的,但是为什么会把未消费的数据也给清除了?

无论日志是否被消费,按照"log.retention.hours="的滞后时长删除过期的日志文件.事实上"清理器"是不关心日志是否被消费.
之所以删除日志,kafka可能考虑到存储上带来的磁盘开支.如果你期望其他的存储支持(比如Hadoop)等,你可以考虑使用一个consumer将日志数据消费并转存到hadoop中.

哦,谢谢!
那就是说Kafka不像传统的MQ那样会对已消费的数据进行处理,而是最大程序的保证数据的吞吐量.
7 楼 QING____ 2013-12-26  
zhangnianli 写道
楼主你好,请问你个问题,
kafka的消息全部存放在日志里,那被消费后的日志会自动清除吗?
我查到log.retention.hours的配置,是清理日志用的,但是为什么会把未消费的数据也给清除了?

无论日志是否被消费,按照"log.retention.hours="的滞后时长删除过期的日志文件.事实上"清理器"是不关心日志是否被消费.
之所以删除日志,kafka可能考虑到存储上带来的磁盘开支.如果你期望其他的存储支持(比如Hadoop)等,你可以考虑使用一个consumer将日志数据消费并转存到hadoop中.
6 楼 zhangnianli 2013-12-26  
楼主你好,请问你个问题,
kafka的消息全部存放在日志里,那被消费后的日志会自动清除吗?
我查到log.retention.hours的配置,是清理日志用的,但是为什么会把未消费的数据也给清除了?
5 楼 hundange 2013-10-28  
QING____ 写道
hundange 写道
楼主你好,请教一个问题,如果重启一下消费者程序,那就会连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。消费过的数据是不能再消费,但是消费者重启或者出现故障恢复时它只能从实时推送的消息进行消费,或者更改组ID设置重头开始消费,如何对之前的消息进行消费呢?它是有记录offset的,low level 代码里可以改offset值,但是high level 不知道如何修改offset的值,楼主可以指点下么?非常感谢


你这种情况常规情况下是不会出现的,因为kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.
在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.

那么出现你这种情况,可能的问题有:
1)auto.commit.enable=true 

如果"auto.commit.enable = false",那么需要开发者在消费消息之后,手动去提交offset位置,否则offset位置只会在consumer的内存保存,当consumer失效后,将会重新消费.
手动提交的代码:ConsumerConnector.commitOffsets()
2) 
auto.commit.interval.ms=60*1000
此值表示,自动提交offset的时间间隔,如果此间隔时间很长(比如24小时),那么如果在24小时内consumer多次故障,消息任然会重复消费,因为offset值没有提交给zk.


因为kafka的团队迁移,导致kafka在0.7和0.8中各种配置项参数不同,请你 参考源代码或者最新文档来配置相应的参数.谢谢!
祝你好运!!!


非常感谢楼主的详细解答,受益匪浅!
4 楼 QING____ 2013-10-24  
hundange 写道
楼主你好,请教一个问题,如果重启一下消费者程序,那就会连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。消费过的数据是不能再消费,但是消费者重启或者出现故障恢复时它只能从实时推送的消息进行消费,或者更改组ID设置重头开始消费,如何对之前的消息进行消费呢?它是有记录offset的,low level 代码里可以改offset值,但是high level 不知道如何修改offset的值,楼主可以指点下么?非常感谢


你这种情况常规情况下是不会出现的,因为kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.
在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.

那么出现你这种情况,可能的问题有:
1)auto.commit.enable=true 

如果"auto.commit.enable = false",那么需要开发者在消费消息之后,手动去提交offset位置,否则offset位置只会在consumer的内存保存,当consumer失效后,将会重新消费.
手动提交的代码:ConsumerConnector.commitOffsets()
2) 
auto.commit.interval.ms=60*1000
此值表示,自动提交offset的时间间隔,如果此间隔时间很长(比如24小时),那么如果在24小时内consumer多次故障,消息任然会重复消费,因为offset值没有提交给zk.


因为kafka的团队迁移,导致kafka在0.7和0.8中各种配置项参数不同,请你 参考源代码或者最新文档来配置相应的参数.谢谢!
祝你好运!!!
3 楼 hundange 2013-10-24  
楼主你好,请教一个问题,如果重启一下消费者程序,那就会连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。消费过的数据是不能再消费,但是消费者重启或者出现故障恢复时它只能从实时推送的消息进行消费,或者更改组ID设置重头开始消费,如何对之前的消息进行消费呢?它是有记录offset的,low level 代码里可以改offset值,但是high level 不知道如何修改offset的值,楼主可以指点下么?非常感谢
2 楼 QING____ 2013-09-25  
zhangchaotydic 写道
客户端在producer时,报17:58:47,109  WARN DefaultEventHandler:88 - Failed to send producer request with correlation id 11 to broker 1 with data for partitions [test-topic,0]
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:30)错误? 版本用的和楼主一致

UnresolvedAddressException这个是一个底层的错误,"metadata.broker.list="请你检测此配置中IP地址是否正确.如果你使用了hostname等方式翻转成IP地址,就需要检测hosts文件中或者DNS服务中是否进行了相应的解析配置.

相关推荐

    大华无插件播放项目111

    大华无插件播放项目111

    Oracle 19c 数据库备份恢复与导入导出实战指南

    内容概要:本文详细介绍了Oracle 19c数据库的备份恢复和导入导出操作。首先概述了基本命令,然后分别讲述了三种工作方式(交互式、命令行、参数文件)和三种模式(表、用户、全库)。接着介绍了高级选项,如分割成多个文件、增量导出/导入、以SYSDBA进行导出/导入、表空间传输等。最后讨论了优化技巧,包括加快导出和导入速度的方法。还解决了一些常见问题,如字符集问题和版本问题。 适用人群:Oracle数据库管理员和相关技术人员。 使用场景及目标:适合在日常数据库管理和维护中进行数据备份、恢复、导入和导出操作,提高数据安全性和管理效率。 其他说明:文章内容丰富,涉及多种实用技巧,适用于不同场景下的具体操作,有助于提升工作效率。

    大数据旅游酒店大数据可视化项目

    基于Python Flask开发的旅游酒店大数据可视化项目,可以直接运行。 操作步骤: 1. 解压缩项目文件 2. 使用 pycharm打开项目 3. 运行项目中的app.py文件 注意:需要确保项目的Flask Python相关的环境已经搭建完成。

    模拟立体翻转效果,非Gallery实现.zip

    Android 毕业设计,Android 毕业设计,小Android 程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    仿360 浮动小插件效果.zip

    Android 毕业设计,Android 毕业设计,小Android 程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告

    基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告,个人高分设计项目、经导师指导并认可通过的高分设计项目,评审分99分,代码完整确保可以运行,小白也可以亲自搞定,主要针对计算机相关专业的学生和需要项目实战练习的学习者。 基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告个人高分设计项目、经导师指导并认可通过的高分设计项目,评审分99分,代码完整确保可以运行,小白也可以亲自搞定,主要针对计算机相关专业的学生和需要项目实战练习的学习者。 个人高分设计项目、经导师指导并认可通过的高分设

    棉花检测20-YOLO(v5至v9)、COCO、CreateML、Darknet、Paligemma、TFRecord、VOC数据集合集.rar

    棉花检测20-YOLO(v5至v9)、COCO、CreateML、Darknet、Paligemma、TFRecord、VOC数据集合集.rar棉-V2释放 ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 它包括406张图像。 以可可格式注释棉花。 将以下预处理应用于每个图像: 没有应用图像增强技术。

    javaweb社区医院挂号系统-lw.zip

    项目包含前后台完整源码。 项目都经过严格调试,确保可以运行! 具体项目介绍可查看博主文章或私聊获取 助力学习实践,提升编程技能,快来获取这份宝贵的资源吧!

    python-3.11.11-amd64.exe

    windwos环境下python 3.11系列64位安装包,仅推荐个人学习、开发、娱乐或者测试环境下使用。

    基于ssm的精品酒销售管理系统+jsp源代码(完整前后端+mysql+说明文档+LW).zip

    使用精品酒销售管理系统的用户分管理员和用户两个角色的权限子模块。 管理员所能使用的功能主要有:主页、个人中心、用户管理、商品分类管理、商品信息管理、系统管理、订单管理等。 用户可以实现主页、个人中心、我的收藏管理、订单管理等。 前台首页可以实现商品信息、新闻资讯、我的、跳转到后台、购物车等。 项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    Video_2024-12-18_000023.wmv

    Video_2024-12-18_000023.wmv

    ppt最終版asasaadd

    ppt最終版asasaadd

    计算机图形学试卷第一套

    计算机图形学期末考试

    springboot-基于SpringBootVue的家具商城系统设计与实现.zip

    springboot-基于SpringBootVue的家具商城系统设计与实现.zip

    PenTablet_5.2.4-5.zip

    PenTablet_5.2.4-5.zip

    基于ssm的企业人力资源管理系统源代码(完整前后端+mysql+说明文档+LW).zip

    考虑了企业管理者的实际工作环境和需求,最终将人力资源系统划分为5个部分,即登录模块、组织发展模块、员工团队模块、合同管理模块、党建管理模块。 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    QT5.12.9音乐播放器MP3 免费下载交流学习

    QT音乐播放器MP3 可点击播放可上一首下一首可调节音量 可暂停可上传音乐

    椅子检测6-YOLO(v5至v9)、COCO、CreateML、Darknet、Paligemma、TFRecord、VOC数据集合集.rar

    椅子检测6-YOLO(v5至v9)、COCO、CreateML、Darknet、Paligemma、TFRecord、VOC数据集合集.rar对象检测实验室-V1 2023-08-21 2:28 PM ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解和搜索非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 对于最先进的计算机视觉培训笔记本,您可以与此数据集一起使用 该数据集包括997张图像。 对象以可可格式注释。 将以下预处理应用于每个图像: *像素数据的自动取向(带有Exif-Arientation剥离) *调整大小为640x640(拉伸) 应用以下扩展来创建每个源图像的3个版本: *将盐和胡椒噪声应用于10%的像素

    Python项目-实例-13 截图工具.zip

    Python课程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    Altas PF拧紧枪 OP协议,开发协议

    Altas PF拧紧枪 OP协议,开发协议

Global site tag (gtag.js) - Google Analytics