`
sillycat
  • 浏览: 2551851 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Kafka(4)Multiple Kafka and Scala Client

 
阅读更多

Kafka(4)Multiple Kafka and Scala Client

1. Create Multiple Nodes
>cp server.properties server1.properties
>cp server.properties server2.properties
The content are as follow:
config/sever1.properties
broker.id = 1
port =9093
log.dir=/tmp/kafka-logs-1

config/server2.properties
broker.id = 2
port = 9094
log.dir=/tmp/kafka-logs-2

Start the 2 nodes
>JMX_PORT=9997 bin/kafka-server-start.sh config/server1.properties &
>JMX_PORT=9998 bin/kafka-server-start.sh config/server2.properties &

Create the topic
>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 2 --partition 1 --topic my-replicated-topic

List the topics
>bin/kafka-list-topic.sh --zookeeper localhost:2181

topic: my-replicated-topicpartition: 0leader: 1replicas: 1,2isr: 1,2 topic: testpartition: 0leader: 0replicas: 0isr: 0

Here is the command to kill one node
>pkill -9 -f config/server1.properties

2. The Scala Client
The Scala Class will be as follow:
        "com.rabbitmq"        %   "amqp-client"               % "3.1.4",
        "org.apache.kafka"          %   "kafka_2.10"            % "0.8.0" intransitive(),
        "com.yammer.metrics"        %   "metrics-core"          % "2.2.0",
        "com.twitter"         %%  "util-collection"           % "6.3.6",
        "com.101tec"          %   "zkclient"                  % "0.3"

The producer class
package com.sillycat.superduty.jobs.producer

import java.util.Properties

import kafka.javaapi.producer.Producer
import kafka.producer.{ KeyedMessage, ProducerConfig }

object NewTaskKafka extends App {
  val props2: Properties = new Properties()
  props2.put("zk.connect", "localhost:2181")
  props2.put("metadata.broker.list", "localhost:9092");
  props2.put("serializer.class", "kafka.serializer.StringEncoder")
  props2.put("zk.connectiontimeout.ms", "15000")

  val config: ProducerConfig = new ProducerConfig(props2)

  val producer: Producer[String, String] = new Producer[String, String](config)

  val data = new KeyedMessage[String, String]("test", "test-message, it is ok")
  producer.send(data)
  producer.close
}

The Consumer class
package com.sillycat.superduty.jobs.consumer

import kafka.api.{ FetchRequestBuilder, FetchRequest }
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.FetchResponse
import kafka.javaapi.message.ByteBufferMessageSet
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import com.typesafe.scalalogging.slf4j.Logging
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig}
import scala.collection.JavaConverters._

object WorkerKafka extends App with Logging {

  val props = new Properties()

  props.put("group.id", "console-consumer-2222222")
  props.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
  props.put("socket.timeout.ms", (ConsumerConfig.SocketTimeout).toString)
  props.put("fetch.message.max.bytes", (1024 * 1024).toString)
  props.put("fetch.min.bytes", (1).toString)
  props.put("fetch.wait.max.ms", (100).toString)
  props.put("auto.commit.enable", "true")
  props.put("auto.commit.interval.ms", (ConsumerConfig.AutoCommitInterval).toString)
  props.put("auto.offset.reset", "smallest")
  props.put("zookeeper.connect", "localhost:2181")
  props.put("consumer.timeout.ms", (-1).toString)
  props.put("refresh.leader.backoff.ms", (ConsumerConfig.RefreshMetadataBackoffMs).toString)


  val config   = new ConsumerConfig(props)
  val consumer = Consumer.createJavaConsumerConnector(config)

  val topicMap =  Map[String, Integer]("test" -> 1)

  println("about to get the comsumerMsgStreams")
  val consumerMap = consumer.createMessageStreams(topicMap.asJava)

  val streamz = consumerMap.get("test")

  val stream = streamz.iterator().next()

  println("listening... (?) ")

  val consumerIter = stream.iterator()
  while(consumerIter.hasNext()){
    System.out.println("MSG -> " + new String(consumerIter.next().message))
  }

}


We can use zkCli.sh to view some of the configuration.
>zkCli.sh -server localhost:2181
zkCli>ls /
zkCli>get /brokers/topics/my-replicated-topic
zkCli>get /brokers/topics/test

And here is the document for the zookeeper data structure
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

We can also configure the multiple nodes for zookeeper in server.properties, there is a configuration like this.
zookeeper.connect=localhost:2182,localhost:2183


References:
http://kafka.apache.org/08/quickstart.html

https://cwiki.apache.org/confluence/display/KAFKA/Index

分享到:
评论

相关推荐

    Scala安装包和kafka安装包

    Scala是编写Kafka服务端和客户端程序的常用语言,因此两者在大数据领域的结合十分紧密。 Scala安装包: Scala2.11.4是Scala的一个版本,它的主要特点是提供了更好的类型推断和性能优化。Scala的安装过程通常包括...

    kafka kafka与sparkStreaming kafka与Scala

    Kafka的API也提供了Scala版本,使得开发者能用自然且强大的方式与Kafka交互。 **分布式和后端开发** Kafka和Spark Streaming都是分布式系统的重要组成部分,它们在后端架构中起到关键作用。Kafka作为消息中间件,...

    kafka-2.13-3.5.0+scala-2.13.2

    描述中的 "kafka_2.13-3.5.0+scala-2.13.2" 与标题相呼应,进一步强调了Kafka的构建依赖于Scala 2.13.2,并且是针对Java平台的2.13版本的Scala库编译的。这意味着此版本的Kafka可能包含对Scala语言新特性的支持,...

    kafka_2.12-2.5.0 s scala-2-12.11.tg

    4. 分区与并行处理:Kafka的消息存储和消费基于分区,每个分区只能被一个消费者消费,这样允许多个消费者并行处理消息,提高处理效率。 二、Kafka_2.12-2.5.0更新亮点 1. 支持Java和Scala:Kafka的客户端库支持...

    scala-kafka-client:用于运行Apache Kafka客户端库的Scala帮助器模块(0.9.x-2.1.0)

    4. **自动分区分配**:Scala-Kafka-Client支持自动分区分配策略,允许开发者配置如何在消费者组内的成员之间均匀分配主题分区,从而实现负载均衡。 5. **幂等性与事务**:对于高一致性需求的应用,Scala-Kafka-...

    Scala代码积累之spark streaming kafka 数据存入到hive源码实例

    Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。

    kafka-schema-registry-client-6.2.2.jar

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....

    kafka4s:使用Kafka和Scala进行函数式编程

    kafka4s-使用Kafka和Scala进行函数式编程 kafka4s提供了纯净,参照透明的功能来使用Kafka,并与和等FP库集成。 快速开始 要在具有Scala 2.12或更高版本的现有SBT项目中使用kafka4s,请根据需要将以下依赖项添加到...

    kafka-consumer:简单的 Scala Kafka 消费者

    在本文中,我们将深入探讨如何使用 Scala 创建一个简单的 Kafka 消费者,以便与 Kafka 集群进行交互。Kafka 是一个分布式流处理平台,广泛用于实时数据处理和消息传递。而 Scala 是一种功能丰富的编程语言,它与 ...

    kafka client依赖包和示例代码

    版本:kafka_2.12-0.10.2.1,生产者消费者示例代码,以及client相关包。 部分代码: public static KafkaProducer, String> getProducer() { if (kp == null) { Properties props = new Properties(); props....

    Apache Kafka 3.2.0 (Scala 2.12 :kafka_2.12-3.2.0.tgz)

    Apache Kafka 3.2.0 (Scala 2.12 :kafka_2.12-3.2.0.tgz) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。) 是一个开源分布式事件流平台,被数千家公司用于高...

    阿里云消息队列kafka demo

    4. `Main.scala`或`Runner.scala`:主程序,启动Producer和Consumer。 5. 可能还有其他辅助文件,如`build.sbt`(Scala构建文件)或`pom.xml`(Maven项目文件)。 要运行这个Demo,你需要确保已经安装了Scala和相关...

    Apache Kafka 3.2.0 (Scala 2.13 :kafka_2.13-3.2.0.tgz)

    Apache Kafka 3.2.0 (Scala 2.13 :kafka_2.13-3.2.0.tgz) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。) 是一个开源分布式事件流平台,被数千家公司用于高...

    kafka-streams-scala:Kafka Streams API的Thin Scala包装器

    `kafka-streams-scala`是一个项目,它为Apache Kafka的Kafka Streams API提供了轻量级的Scala封装,使得Scala开发者可以更自然、更便捷地利用Kafka Streams的功能。Kafka Streams是一个用于在Java和Scala应用程序中...

    kafka-kstream-udemy-scala

    在这个"Kafka-KStream-Udemy-Scala"项目中,@kagemomiji分享了使用Scala语言和Kafka Streams在Udemy课程中的示例代码。 **Kafka核心概念** 1. **主题(Topic)**:Kafka中的数据存储单元,类似于数据库中的表,数据...

    kafka-clients-2.2.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...

    java开发kafka-clients所需要的所有jar包以及源码

    4. **Java开发Kafka消费者**: - `KafkaConsumer`类用于创建消费者实例,配置包括bootstrap servers、group id、offset管理策略等。 - `subscribe()`方法用于订阅一个或多个主题,`poll()`方法用于轮询获取新消息...

    kafka源码分析

    kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala

    kafka的java依赖包

    4. `zookeeper`: Kafka使用ZooKeeper进行集群协调和管理,因此ZooKeeper的客户端库也是必不可少的。 5. `lz4`和`snappy`: 这些是高效的压缩库,Kafka默认使用它们对消息进行压缩,以节省存储空间。 使用这些jar包...

Global site tag (gtag.js) - Google Analytics