KAFKA
一、概念
Kafka是一个分布式的、可分区的、可复制的消息系统。
Kafka将消息以topic为单位进行归纳。
将向Kafka topic发布消息的程序称为producers.
将预订topics并消费消息的程序称为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息。
客户端和服务端通过TCP协议通信。
二、伪分布式安装
1.解压安装包
tar -zxvf kafka.tar.gz
2.cd /conf
kafka自带zookeeper
修改server.properties文件
log.dirs=/tmp/kafka-logs
修改zookeeper.properties
dataDir=/tmp/zookeeper
两个路径是并行的关系,不可存在父子关系,否则,启动报错
3.启动
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka:
bin/kafka-server-start.sh config/server.properties
4.测试
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费数据
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
三、完全分布式安装
1.删除上述伪分布式测试后的日志
cd tmp ,rm -fr *.*
2.拷贝到其他两台机器上
scp -r kafka root@linux02
scp -r kafka root@linux03
3.修改配置文件
vim server.properties
broker.id=0 #当前server编号
修改每台机器上的配置文件中的内容,各不相同
4.测试
四、Java api
package com.study.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.junit.Test;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndMetadata;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaDemo {
@Test
public void put(){
Properties props = new Properties();
// 固定,序列化的类
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 机器地址
props.put("metadata.broker.list", "linux01:9092");
// 写入数据
Producer<Integer, String> producer = new Producer<>(new ProducerConfig(props));
producer.send(new KeyedMessage<Integer, String>("park", "from java~~~"));
producer.close();
}
@Test
public void get(){
// 声明连接属性
Properties properties = new Properties();
// zookeeper 地址
properties.put("zookeeper.connect", "linux01:2181,linux02:2181,linux03:2181");//声明zk
// 组名称,向相同的组名称中发送数据,topic 间竞争 -- 队列模式
// 不同的组名,消息间并行 -- 发布订阅模式
properties.put("group.id", "g_1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
properties.put("auto.offset.reset", "smallest");
//连接kafka
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
//消费数据
Map<String, Integer> confMap = new HashMap<>();
confMap.put("park", 1); // topic 名称及读取数据的量
Map<String, List<KafkaStream<byte[], byte[]>>> ms = consumer.createMessageStreams(confMap);
KafkaStream<byte[], byte[]> ks = ms.get("park").get(0);
ConsumerIterator<byte[], byte[]> it = ks.iterator();
while(it.hasNext()){
MessageAndMetadata<byte[], byte[]> next = it.next();
byte[] message = next.message();
String str = new String(message);
System.out.println(str);
}
//断开连接
consumer.shutdown();
}
public static void main(String[] args) {
}
}
分享到:
相关推荐
本文来自于csdn,文章简单的介绍了ActiveMQ的概念,下载,安装,启动及优缺点。ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现,它...
Terraform:Terraform基础概念与安装
A-Frame引擎开发:WebVR基础概念与设置_(3).A-Frame安装与环境配置.docx A-Frame引擎开发:WebVR基础概念与设置_(4).A-Frame基本场景创建.docx A-Frame引擎开发:WebVR基础概念与设置_(5).A-Frame实体组件系统...
Ansible:Ansible基础概念与安装.docx
Webpack:Webpack基础概念与安装.docx
Git:Git基础概念与安装.docx
Oracle 安装、Oracle基本概念介绍
"Maven 基础概念及使用" Maven 是 Apache 软件基金会组织维护的一款自动化构建工具,专注服务于 Java 平台的项目构建和依赖管理。它可以将项目分成多个模块,方便开发。通过配置 Maven,可以使用大量的工具和 jar ...
全书共13章,内容主要包括计算机网络基础、局域网基本概念、网络规划和设计、网络布线、Windows2000Server的安装和配置、网络客户机的安装和配置、Windows2000应用服务器的安装和配置、Windows网络应用、邮件服务器...
IBM 虚拟化基本概念 VIOS 的安装与配置 IBM 虚拟化基本概念是指通过软件和硬件的虚拟化技术,来实现对计算资源的优化配置和管理的一种解决方案。其中,VIOS(Virtual I/O Server)是 IBM 提供的一种虚拟化解决方案...
Godot引擎开发:VR基础概念与设置_(1).Godot引擎简介与安装.docx Godot引擎开发:VR基础概念与设置_(2).VR技术基础概念.docx Godot引擎开发:VR基础概念与设置_(3).Godot中的VR模块介绍.docx Godot引擎开发:...
python 基础概念、软件安装和代码运行
安装、编码及amos基本概念和功能介绍.pptx
Monado引擎开发:VR基础概念与设置_(3).Monado引擎安装与配置.docx Monado引擎开发:VR基础概念与设置_(4).VR头显与设备支持.docx Monado引擎开发:VR基础概念与设置_(5).场景与环境搭建.docx Monado引擎开发...
二次开发基础概念.docx PDF编辑软件:Adobe Acrobat二次开发_10.调试和测试二次开发应用.docx PDF编辑软件:Adobe Acrobat二次开发_11.发布和安装自定义应用程序.docx PDF编辑软件:Adobe Acrobat二次开发_12.最佳...
mysql安装配置教程 02 MySQL ...第3节 SQL基本概念与通用语法 第2节 MySQL安装与使用 第1节 数据库概念 第16节 用户管理和权限管理 第15节 事务 第14节 多表查询练习 第13节 子查询 第12节 多表查询
"云计算基本概念、传统虚拟化技术和ESXi安装" 云计算基本概念: 云计算是指通过网络提供的、可根据需求扩展或缩减的计算资源和存储资源的模式。云计算可以提供基础设施即服务(IaaS)、平台即服务(PaaS)和软件即...