KAFKA
一、概念
Kafka是一个分布式的、可分区的、可复制的消息系统。
Kafka将消息以topic为单位进行归纳。
将向Kafka topic发布消息的程序称为producers.
将预订topics并消费消息的程序称为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息。
客户端和服务端通过TCP协议通信。
二、伪分布式安装
1.解压安装包
tar -zxvf kafka.tar.gz
2.cd /conf
kafka自带zookeeper
修改server.properties文件
log.dirs=/tmp/kafka-logs
修改zookeeper.properties
dataDir=/tmp/zookeeper
两个路径是并行的关系,不可存在父子关系,否则,启动报错
3.启动
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka:
bin/kafka-server-start.sh config/server.properties
4.测试
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费数据
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
三、完全分布式安装
1.删除上述伪分布式测试后的日志
cd tmp ,rm -fr *.*
2.拷贝到其他两台机器上
scp -r kafka root@linux02
scp -r kafka root@linux03
3.修改配置文件
vim server.properties
broker.id=0 #当前server编号
修改每台机器上的配置文件中的内容,各不相同
4.测试
四、Java api
package com.study.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.junit.Test;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndMetadata;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaDemo {
@Test
public void put(){
Properties props = new Properties();
// 固定,序列化的类
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 机器地址
props.put("metadata.broker.list", "linux01:9092");
// 写入数据
Producer<Integer, String> producer = new Producer<>(new ProducerConfig(props));
producer.send(new KeyedMessage<Integer, String>("park", "from java~~~"));
producer.close();
}
@Test
public void get(){
// 声明连接属性
Properties properties = new Properties();
// zookeeper 地址
properties.put("zookeeper.connect", "linux01:2181,linux02:2181,linux03:2181");//声明zk
// 组名称,向相同的组名称中发送数据,topic 间竞争 -- 队列模式
// 不同的组名,消息间并行 -- 发布订阅模式
properties.put("group.id", "g_1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
properties.put("auto.offset.reset", "smallest");
//连接kafka
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
//消费数据
Map<String, Integer> confMap = new HashMap<>();
confMap.put("park", 1); // topic 名称及读取数据的量
Map<String, List<KafkaStream<byte[], byte[]>>> ms = consumer.createMessageStreams(confMap);
KafkaStream<byte[], byte[]> ks = ms.get("park").get(0);
ConsumerIterator<byte[], byte[]> it = ks.iterator();
while(it.hasNext()){
MessageAndMetadata<byte[], byte[]> next = it.next();
byte[] message = next.message();
String str = new String(message);
System.out.println(str);
}
//断开连接
consumer.shutdown();
}
public static void main(String[] args) {
}
}
分享到:
相关推荐
本文来自于csdn,文章简单的介绍了ActiveMQ的概念,下载,安装,启动及优缺点。ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现,它...
Ansible:Ansible基础概念与安装.docx
Webpack:Webpack基础概念与安装.docx
Git:Git基础概念与安装.docx
Oracle 安装、Oracle基本概念介绍
"Maven 基础概念及使用" Maven 是 Apache 软件基金会组织维护的一款自动化构建工具,专注服务于 Java 平台的项目构建和依赖管理。它可以将项目分成多个模块,方便开发。通过配置 Maven,可以使用大量的工具和 jar ...
全书共13章,内容主要包括计算机网络基础、局域网基本概念、网络规划和设计、网络布线、Windows2000Server的安装和配置、网络客户机的安装和配置、Windows2000应用服务器的安装和配置、Windows网络应用、邮件服务器...
IBM 虚拟化基本概念 VIOS 的安装与配置 IBM 虚拟化基本概念是指通过软件和硬件的虚拟化技术,来实现对计算资源的优化配置和管理的一种解决方案。其中,VIOS(Virtual I/O Server)是 IBM 提供的一种虚拟化解决方案...
python 基础概念、软件安装和代码运行
安装、编码及amos基本概念和功能介绍.pptx
二次开发基础概念.docx PDF编辑软件:Adobe Acrobat二次开发_10.调试和测试二次开发应用.docx PDF编辑软件:Adobe Acrobat二次开发_11.发布和安装自定义应用程序.docx PDF编辑软件:Adobe Acrobat二次开发_12.最佳...
mysql安装配置教程 02 MySQL ...第3节 SQL基本概念与通用语法 第2节 MySQL安装与使用 第1节 数据库概念 第16节 用户管理和权限管理 第15节 事务 第14节 多表查询练习 第13节 子查询 第12节 多表查询
"云计算基本概念、传统虚拟化技术和ESXi安装" 云计算基本概念: 云计算是指通过网络提供的、可根据需求扩展或缩减的计算资源和存储资源的模式。云计算可以提供基础设施即服务(IaaS)、平台即服务(PaaS)和软件即...
IBM虚拟化基本概念(2)---VIOS的安装与配置
内容概要:本文介绍了Nginx的基本概念及其安装方法,指导了如何配置Nginx的基本参数、搭建反向代理以及如何执行负载平衡的操作流程,同时分享了一些常见的性能优化手段以及增强Nginx安全性的策略。并且提供了丰富的...
dockerDocker教程主要包括Docker的安装、基本概念、镜像管理、容器管理、Docker Compose的使用以及Docker网络等内容。以下是一个详细的Docker教程概述: 一、Docker安装 CentOS系统安装Docker 卸载旧版本(如果已...