`

基础概念及安装

阅读更多
KAFKA


一、概念

Kafka是一个分布式的、可分区的、可复制的消息系统。
Kafka将消息以topic为单位进行归纳。
将向Kafka topic发布消息的程序称为producers.
将预订topics并消费消息的程序称为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息。
客户端和服务端通过TCP协议通信。

二、伪分布式安装

1.解压安装包
tar -zxvf kafka.tar.gz

2.cd /conf
kafka自带zookeeper

修改server.properties文件
log.dirs=/tmp/kafka-logs
修改zookeeper.properties
dataDir=/tmp/zookeeper

两个路径是并行的关系,不可存在父子关系,否则,启动报错

3.启动

启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka:
bin/kafka-server-start.sh config/server.properties

4.测试

创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费数据
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

三、完全分布式安装

1.删除上述伪分布式测试后的日志

cd tmp ,rm -fr *.*

2.拷贝到其他两台机器上
scp -r kafka root@linux02
scp -r kafka root@linux03

3.修改配置文件

vim server.properties

broker.id=0 #当前server编号 
修改每台机器上的配置文件中的内容,各不相同

4.测试

四、Java api

package com.study.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.junit.Test;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndMetadata;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaDemo {

	@Test
	public void put(){
		Properties props = new Properties();
		// 固定,序列化的类
		props.put("serializer.class", "kafka.serializer.StringEncoder");
        // 机器地址
		props.put("metadata.broker.list", "linux01:9092");
        // 写入数据
        Producer<Integer, String> producer = new Producer<>(new ProducerConfig(props));
        producer.send(new KeyedMessage<Integer, String>("park", "from java~~~"));
        
        producer.close();
	}
	
	@Test
	public void get(){
		// 声明连接属性
		Properties properties = new Properties();  
		// zookeeper 地址
		properties.put("zookeeper.connect", "linux01:2181,linux02:2181,linux03:2181");//声明zk  
		// 组名称,向相同的组名称中发送数据,topic 间竞争 -- 队列模式
		//        不同的组名,消息间并行 -- 发布订阅模式
		properties.put("group.id", "g_1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据  
		properties.put("auto.offset.reset", "smallest");
		
		//连接kafka
		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

		//消费数据
		Map<String, Integer> confMap = new HashMap<>();
		confMap.put("park", 1); // topic 名称及读取数据的量
		Map<String, List<KafkaStream<byte[], byte[]>>> ms = consumer.createMessageStreams(confMap);
		KafkaStream<byte[], byte[]> ks = ms.get("park").get(0);
		ConsumerIterator<byte[], byte[]> it = ks.iterator();
		while(it.hasNext()){
			MessageAndMetadata<byte[], byte[]> next = it.next();
			byte[] message = next.message();
			String str = new String(message);
			System.out.println(str);
		}
		
		//断开连接
		consumer.shutdown();
	}
	
	public static void main(String[] args) {

	}

}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics