Kafka(4)Multiple Kafka and Scala Client
1. Create Multiple Nodes
>cp server.properties server1.properties
>cp server.properties server2.properties
The content are as follow:
config/sever1.properties
broker.id = 1
port =9093
log.dir=/tmp/kafka-logs-1
config/server2.properties
broker.id = 2
port = 9094
log.dir=/tmp/kafka-logs-2
Start the 2 nodes
>JMX_PORT=9997 bin/kafka-server-start.sh config/server1.properties &
>JMX_PORT=9998 bin/kafka-server-start.sh config/server2.properties &
Create the topic
>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 2 --partition 1 --topic my-replicated-topic
List the topics
>bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: my-replicated-topicpartition: 0leader: 1replicas: 1,2isr: 1,2 topic: testpartition: 0leader: 0replicas: 0isr: 0
Here is the command to kill one node
>pkill -9 -f config/server1.properties
2. The Scala Client
The Scala Class will be as follow:
"com.rabbitmq" % "amqp-client" % "3.1.4",
"org.apache.kafka" % "kafka_2.10" % "0.8.0" intransitive(),
"com.yammer.metrics" % "metrics-core" % "2.2.0",
"com.twitter" %% "util-collection" % "6.3.6",
"com.101tec" % "zkclient" % "0.3"
The producer class
package com.sillycat.superduty.jobs.producer
import java.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.{ KeyedMessage, ProducerConfig }
object NewTaskKafka extends App {
val props2: Properties = new Properties()
props2.put("zk.connect", "localhost:2181")
props2.put("metadata.broker.list", "localhost:9092");
props2.put("serializer.class", "kafka.serializer.StringEncoder")
props2.put("zk.connectiontimeout.ms", "15000")
val config: ProducerConfig = new ProducerConfig(props2)
val producer: Producer[String, String] = new Producer[String, String](config)
val data = new KeyedMessage[String, String]("test", "test-message, it is ok")
producer.send(data)
producer.close
}
The Consumer class
package com.sillycat.superduty.jobs.consumer
import kafka.api.{ FetchRequestBuilder, FetchRequest }
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.FetchResponse
import kafka.javaapi.message.ByteBufferMessageSet
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import com.typesafe.scalalogging.slf4j.Logging
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig}
import scala.collection.JavaConverters._
object WorkerKafka extends App with Logging {
val props = new Properties()
props.put("group.id", "console-consumer-2222222")
props.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
props.put("socket.timeout.ms", (ConsumerConfig.SocketTimeout).toString)
props.put("fetch.message.max.bytes", (1024 * 1024).toString)
props.put("fetch.min.bytes", (1).toString)
props.put("fetch.wait.max.ms", (100).toString)
props.put("auto.commit.enable", "true")
props.put("auto.commit.interval.ms", (ConsumerConfig.AutoCommitInterval).toString)
props.put("auto.offset.reset", "smallest")
props.put("zookeeper.connect", "localhost:2181")
props.put("consumer.timeout.ms", (-1).toString)
props.put("refresh.leader.backoff.ms", (ConsumerConfig.RefreshMetadataBackoffMs).toString)
val config = new ConsumerConfig(props)
val consumer = Consumer.createJavaConsumerConnector(config)
val topicMap = Map[String, Integer]("test" -> 1)
println("about to get the comsumerMsgStreams")
val consumerMap = consumer.createMessageStreams(topicMap.asJava)
val streamz = consumerMap.get("test")
val stream = streamz.iterator().next()
println("listening... (?) ")
val consumerIter = stream.iterator()
while(consumerIter.hasNext()){
System.out.println("MSG -> " + new String(consumerIter.next().message))
}
}
We can use zkCli.sh to view some of the configuration.
>zkCli.sh -server localhost:2181
zkCli>ls /
zkCli>get /brokers/topics/my-replicated-topic
zkCli>get /brokers/topics/test
And here is the document for the zookeeper data structure
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
We can also configure the multiple nodes for zookeeper in server.properties, there is a configuration like this.
zookeeper.connect=localhost:2182,localhost:2183
References:
http://kafka.apache.org/08/quickstart.html
https://cwiki.apache.org/confluence/display/KAFKA/Index
- 浏览: 2543199 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
发表评论
-
Update Site will come soon
2021-06-02 04:10 1672I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 289Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 441Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 284Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 245Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 315AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 308Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 336Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 447Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 495Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 366Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 325Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 367Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 433Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 516MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 456RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 318Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 316Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 322ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 396Jetty Server and Cookie Domain ...
相关推荐
Scala是编写Kafka服务端和客户端程序的常用语言,因此两者在大数据领域的结合十分紧密。 Scala安装包: Scala2.11.4是Scala的一个版本,它的主要特点是提供了更好的类型推断和性能优化。Scala的安装过程通常包括...
Kafka的API也提供了Scala版本,使得开发者能用自然且强大的方式与Kafka交互。 **分布式和后端开发** Kafka和Spark Streaming都是分布式系统的重要组成部分,它们在后端架构中起到关键作用。Kafka作为消息中间件,...
描述中的 "kafka_2.13-3.5.0+scala-2.13.2" 与标题相呼应,进一步强调了Kafka的构建依赖于Scala 2.13.2,并且是针对Java平台的2.13版本的Scala库编译的。这意味着此版本的Kafka可能包含对Scala语言新特性的支持,...
4. 分区与并行处理:Kafka的消息存储和消费基于分区,每个分区只能被一个消费者消费,这样允许多个消费者并行处理消息,提高处理效率。 二、Kafka_2.12-2.5.0更新亮点 1. 支持Java和Scala:Kafka的客户端库支持...
4. **自动分区分配**:Scala-Kafka-Client支持自动分区分配策略,允许开发者配置如何在消费者组内的成员之间均匀分配主题分区,从而实现负载均衡。 5. **幂等性与事务**:对于高一致性需求的应用,Scala-Kafka-...
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
kafka4s-使用Kafka和Scala进行函数式编程 kafka4s提供了纯净,参照透明的功能来使用Kafka,并与和等FP库集成。 快速开始 要在具有Scala 2.12或更高版本的现有SBT项目中使用kafka4s,请根据需要将以下依赖项添加到...
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
在本文中,我们将深入探讨如何使用 Scala 创建一个简单的 Kafka 消费者,以便与 Kafka 集群进行交互。Kafka 是一个分布式流处理平台,广泛用于实时数据处理和消息传递。而 Scala 是一种功能丰富的编程语言,它与 ...
版本:kafka_2.12-0.10.2.1,生产者消费者示例代码,以及client相关包。 部分代码: public static KafkaProducer, String> getProducer() { if (kp == null) { Properties props = new Properties(); props....
Apache Kafka 3.2.0 (Scala 2.12 :kafka_2.12-3.2.0.tgz) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。) 是一个开源分布式事件流平台,被数千家公司用于高...
4. `Main.scala`或`Runner.scala`:主程序,启动Producer和Consumer。 5. 可能还有其他辅助文件,如`build.sbt`(Scala构建文件)或`pom.xml`(Maven项目文件)。 要运行这个Demo,你需要确保已经安装了Scala和相关...
Apache Kafka 3.2.0 (Scala 2.13 :kafka_2.13-3.2.0.tgz) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。) 是一个开源分布式事件流平台,被数千家公司用于高...
`kafka-streams-scala`是一个项目,它为Apache Kafka的Kafka Streams API提供了轻量级的Scala封装,使得Scala开发者可以更自然、更便捷地利用Kafka Streams的功能。Kafka Streams是一个用于在Java和Scala应用程序中...
在这个"Kafka-KStream-Udemy-Scala"项目中,@kagemomiji分享了使用Scala语言和Kafka Streams在Udemy课程中的示例代码。 **Kafka核心概念** 1. **主题(Topic)**:Kafka中的数据存储单元,类似于数据库中的表,数据...
赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...
4. **Java开发Kafka消费者**: - `KafkaConsumer`类用于创建消费者实例,配置包括bootstrap servers、group id、offset管理策略等。 - `subscribe()`方法用于订阅一个或多个主题,`poll()`方法用于轮询获取新消息...
4. **性能监控**:这些工具不仅可以帮助我们创建和管理主题,还能进行性能监控,查看消息的生产和消费速率,以及Brokers的CPU和内存使用情况。这对于诊断性能问题和优化Kafka配置至关重要。 5. **安全配置**:在...
kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala