`

kafka安装测试

阅读更多

kafka安装测试过程

kafka的性能在此不再赘述,百度一下很多,在此描述一下kafka的安装和测试过程:

  1. 安装kafka:
    #tar -xzf kafka_2.9.2-0.8.1.tgz
    #cd kafka_2.9.2-0.8.1
    #mv kafka_2.9.2-0.8.1 kafka
     
  2. 开启zookeeper服务:
    bin/zookeeper-server-start.sh config/zookeeper.properties
     
  3. 开启kafka服务:
    bin/kafka-server-start.sh config/server.properties
     
  4. 创建话题topic:
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
     具体kafka-topics.sh 的参数自行查看--help帮助
  5. 查看kafka服务中的topics:
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    #列出topics如下
    test
     在2.8之前的版本中的shell脚本可能不同
  6. 打开produce,向test话题添加消息:
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    xxxxxxxxxxxxxxxxx #输入内容后enter即可发送出消息内容
     
  7. 打开customer读取test话题内容:
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    xxxxxxxxxxxxxxxxx
     kafka的是scala语言编写的服务框架,因此用scala开发produce和custome应用程序应该是非常方便的,但是没有找到相应examples,但kafka也支持java和python以及c编写的客户端应用程序,下面分享一下java的代码片段(网络转载):
  8. 消费者custome:
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    public class ConsumerTest extends Thread {
    	private final ConsumerConnector consumer;
    	private final String topic;
    
    	public static void main(String[] args) {
    		ConsumerTest consumerThread = new ConsumerTest("1test");
    		consumerThread.start();
    	}
    
    	public ConsumerTest(String topic) {
    		consumer = kafka.consumer.Consumer
    				.createJavaConsumerConnector(createConsumerConfig());
    		this.topic = topic;
    	}
    
    	private static ConsumerConfig createConsumerConfig() {
    		Properties props = new Properties();
    		props.put("zookeeper.connect", "master:2181");
    		props.put("group.id", "0");
    		props.put("zookeeper.session.timeout.ms", "400000");
    		props.put("zookeeper.sync.time.ms", "200");
    		props.put("auto.commit.interval.ms", "1000");
    
    		return new ConsumerConfig(props);
    
    	}
    
    	public void run() {
    		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    		topicCountMap.put(topic, new Integer(1));
    		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
    				.createMessageStreams(topicCountMap);
    		KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    		ConsumerIterator<byte[], byte[]> it = stream.iterator();
    		while (it.hasNext())
    			System.out.println(new String(it.next().message()));
    	}
    }
     消息的生产者produce:
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class ProducerTest {
    	public static void main(String[] args) {
    		Properties props = new Properties();  
            props.put("zk.connect", "master:2181");  // zookeeper的一个节点地址
            props.put("serializer.class", "kafka.serializer.StringEncoder");// kafka序列化方式
            props.put("metadata.broker.list", "master:9092");
            props.put("request.required.acks", "1");
            //props.put("partitioner.class", "com.xq.SimplePartitioner");
            ProducerConfig config = new ProducerConfig(props);  
            Producer<String, String> producer = new Producer<String, String>(config);  
    
            String msg ="this is a messageuuu! XXXmessDageuuu";
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", msg); 
            for(int i = 0 ; i < 5; i ++){
            	System.out.println("send"+i);
            	producer.send(data);
            }
            producer.close();  
    	}
    }
     分别运行custom和produce即可看到控制台消息发送和接受的内容。
  9. 后续将继续更新kafka的各个参数的说明文档以及与spark集成,与flume集成。
分享到:
评论

相关推荐

    可视化kafka测试工具

    1. **安装与启动**:下载并解压压缩包文件“kafka测试工具”,根据提供的文档安装并启动工具。 2. **配置连接**:在工具界面输入Kafka Broker的地址,设置必要的认证信息(如果有的话)。 3. **创建测试用例**:...

    自用kafka简单测试

    在描述中,“kafka测试”可能是指创建或执行一些基本的Kafka操作,如生产消息、消费消息、检查主题(topics)、分区(partitions)和副本(replicas)等,以验证Kafka集群的正常运行。测试可能包括性能测试、稳定性...

    向kafka插入数据测试

    这个场景中,我们关注的是“向Kafka插入数据”的测试。这涉及到多个知识点,包括Kafka的基本概念、生产者API、数据模型、以及如何进行测试。 1. **Kafka基本概念**:Kafka是一个高吞吐量、低延迟的消息队列系统,它...

    kafka集群安装以及测试

    利用安装zookeeper的三台服务器搭建KAFKA集群,并对其进行验证测试

    Kafka安装与部署指南

    总结来说,Kafka的安装和部署涉及到Java环境的准备、下载与解压Kafka、配置服务器属性、启动ZooKeeper和Kafka服务,以及创建和测试消息传递。了解这些基本步骤和概念对于理解和操作Kafka至关重要。在实际应用中,你...

    kafka及其性能测试

    本知识点将详细阐述Kafka的核心概念、使用场景以及性能测试方法。 首先,Kafka的核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、代理(Broker)、分区(Partition)、偏移量(Offset)、副本...

    kafka的安装和简单实例测试

    ### Kafka的安装与简单实例测试知识点详解 #### 一、Kafka简介 Kafka是一种高吞吐量的分布式发布订阅消息系统,它被设计用于处理大规模网站中的所有动作流数据。Kafka的主要目标之一是通过Hadoop的并行加载机制来...

    Kafka安装(安装与配置).pdf

    ### Kafka安装与配置详解 #### 一、Kafka简介 Apache Kafka是一种分布式流处理平台,主要功能包括发布和订阅记录流、存储...至此,Kafka集群安装与配置的基本流程已经完成,接下来可以进行进一步的功能测试和使用。

    Kafka 测试小程序

    **Kafka测试小程序详解** Kafka是一款开源的分布式消息中间件,由Apache软件基金会开发,主要用于构建实时数据管道和流应用程序。它具有高吞吐量、可扩展性、持久性和容错性等特性,广泛应用于大数据领域。在这个...

    linux安装kafka教程

    Linux 安装 Kafka 教程 Kafka 是一种流行的分布式流处理平台,广泛应用于数据处理、实时数据处理和事件驱动架构等领域。本教程将指导您在 Linux 环境中安装和配置 Kafka。 一、Kafka 安装 Kafka 可以通过两种方式...

    kafka安装手册

    kafka安装手册 Kafka 是一种流行的分布式流处理平台,由 Apache 软件基金会开发和维护。Kafka 通过提供高吞吐量、持久化、多订阅者支持等特性,满足了大数据处理和实时数据处理的需求。 Kafka 集群安装 Kafka ...

    介绍kafka及kafka集群安装

    7. **验证**:使用 Kafka 提供的命令工具 `kafka-topics.sh` 创建 Topic,`kafka-console-producer.sh` 和 `kafka-console-consumer.sh` 测试生产与消费消息。 #### Kafka 单机版部署示例 1. **配置 server....

    Kafka性能测试实例1

    Kafka性能测试实例1 Kafka作为一个分布式流媒体平台,需要在高数据量和高并发情况下保证其性能和稳定性。为了提高Kafka集群的性能和稳定性,需要从生产者和消费者两方面进行性能测试。本文将从生产者和消费者两方...

    kafka资源及测试代码下载.rar

    《Kafka资源与测试代码详解》 在大数据处理和实时流计算领域,Apache Kafka是一款不可或缺的组件。本篇文章将深入探讨Kafka的核心概念、功能特性,并结合Python和Spark的使用,解析提供的测试代码,帮助读者更好地...

    Kafka性能测试报告.pdf

    Kafka性能测试报告.pdf 以下是根据给定文件信息生成的相关知识点: 一、Kafka性能测试报告概述 Kafka性能测试报告主要是测试Kafka的性能,并了解Kafka在虚拟机环境下的性能。本次测试使用的是最新版本的Kafka 0.8...

    kafka集群搭建及测试.docx

    【Kafka集群搭建及测试】 Kafka是一种分布式流处理平台,常用于实时数据处理和大数据管道。本文档将详细介绍如何在三台Ubuntu 16虚拟机上搭建Kafka集群,并进行基本的测试,确保其正常运行。 **1. 准备工作** 在...

    JMeter测试Kafka插件kafkameter-0.2.0.jar

    《JMeter测试Kafka插件kafkameter-0.2.0.jar:性能测试新利器》 在现代大数据处理领域,Apache Kafka作为一个高效、可扩展的消息中间件,广泛应用于实时数据流处理和分布式系统间的数据传递。为了确保Kafka在生产...

    kafka 插件kafka 插件kafka 插件

    1. **安装和配置**: 根据插件文档,将插件添加到项目依赖中,配置相关的Kafka连接信息(如bootstrap servers、topic等)。 2. **编写代码**: 使用插件提供的API编写生产或消费消息的代码,处理异常和错误情况。 3. *...

Global site tag (gtag.js) - Google Analytics