`
sillycat
  • 浏览: 2569660 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Kafka 2017 Update(2)Kafka Producer/Consumer and Architecture

    博客分类:
  • JAVA
 
阅读更多
Kafka 2017 Update(2)Kafka Producer/Consumer and Architecture

Spark Streaming/Apache Storm

Input Data Stream —> Spark Streaming —> batches of input data —> Spark Engine —> batches of processed data

Try the Word Count Example
https://spark.apache.org/docs/latest/streaming-programming-guide.html

Try that in Scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val conf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))

Exceptions:
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

Solution:
Zeppelin already created a sc context for us, so we can directly use this.
val ssc = new StreamingContext(sc, Seconds(10))

Or we can directly run the example
>bin/run-example streaming.NetworkWordCount localhost 9999

>nc -lk 9999
hello world

It will show in the console
Time: 1515044374000 ms
-------------------------------------------
(hello,1)
(world,1)

Kafka Concept
Consumer Group - group ID
Consumer Position - offset

Kafka Producer
pom.xml change is as follow:
<!-- kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.2</version>
</dependency>

Running the Producer to send 10 messages to the topic of Kafka
package com.sillycat.sparkjava.app;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerApp {

    private final static String TOPIC = "sillycat-topic";

    private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";

    private Producer<Long, String> createProducer() {
        Properties props = new Properties();
        // server list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // client ID
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    public void runProducer(final int sendMessageCount) throws Exception {
        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();
        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index,
                        "Hello Sillycat " + index);
                RecordMetadata metadata = producer.send(record).get();
                long elapsedTime = System.currentTimeMillis() - time;
                System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n",
                        record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime);
            }
        } finally {
            producer.flush();
            producer.close();
        }
    }

    public static void main(String[] args) {
        KafkaProducerApp app = new KafkaProducerApp();
        try {
            app.runProducer(10);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

Kafka Consumer to Pull the Data from Topic
package com.sillycat.sparkjava.app;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaConsumerApp {

    private final static String TOPIC = "sillycat-topic";

    private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";

    private Consumer<Long, String> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerApp");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        // Create the consumer using props.
        final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }

    public void runConsumer() throws InterruptedException {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;
        int noRecordsCount = 0;
        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count() == 0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp)
                    break;
                else
                    continue;
            }
            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
                        record.partition(), record.offset());
            });
            // which commit offsets returned on the last call to consumer.poll
consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }

    public static void main(String[] args) {
        KafkaConsumerApp app = new KafkaConsumerApp();
        try {
            app.runConsumer();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Kafka Architecture
Records - key(optional), value, timestamp
A topic has a Log which is the topic’s storage on disk, it can be broken up into partitions and segments.

Kafka appends records from a producer(s) to the end of a topic log. A topic log consists of many partitions that are spread over multiple files which can be spread on multiple Kafka cluster nodes.

Consumers read from Kafka topics from where they left off reading.
http://cloudurable.com/blog/kafka-architecture/index.html

Kafka Topic Architecture
http://cloudurable.com/blog/kafka-architecture-topics/index.html

Kafka Consumer Architecture
http://cloudurable.com/blog/kafka-architecture-consumers/index.html

Kafka Producer Architecture
http://cloudurable.com/blog/kafka-architecture-producers/index.html

References:
http://sillycat.iteye.com/blog/2215237
http://sillycat.iteye.com/blog/2406572
http://sillycat.iteye.com/blog/2370527

http://www.cnblogs.com/gaopeng527/p/4959633.html
https://www.jianshu.com/p/d49460799e5b
http://www.cnblogs.com/huxi2b/p/6223228.html
http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html

Kafka update 1
http://sillycat.iteye.com/blog/2406569

http://blog.csdn.net/eric_sunah/article/details/49762839
https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/0.1%20Spark%20Streaming%20%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E4%B8%8E%E6%A8%A1%E5%9D%97%E6%A6%82%E8%BF%B0.md
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/index.html
http://mangocool.com/1479867274843.html
https://spark.apache.org/docs/latest/streaming-programming-guide.html
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
分享到:
评论

相关推荐

    Kafka客户端producer/consumer样例

    Kafka客户端producer/consumer样例

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...

    linux安装kafka教程

    /DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties & sleep 3 # 启动 Kafka /DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh /DATA/...

    kafka-2.12-3.6.1.tgz

    /usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...

    wurstmeister/kafka.tar

    kafka的docker镜像wurstmeister/kafka,有一段时间docker上老是拉不下来,故存为资源

    pentaho-kafka-producer.zip

    2. **解压插件**:下载`pentaho-kafka-producer.zip`文件后,使用解压缩工具将其解压。这将包含插件的各类文件,如Java类、配置文件等。 3. **复制到steps文件夹**:将解压后的所有文件和文件夹复制到`安装目录&gt;/...

    Kafkaphp使用纯粹的PHP编写的kafka客户端

    $consumer = new RdKafka\KafkaConsumer($conf); // 订阅主题 $consumer-&gt;subscribe(['my-topic']); while (true) { $msg = $consumer-&gt;consume(1000); if ($msg === null) continue; echo "Received message...

    kafka-2.11-2.4.1安装包以及linux(centos7)安装kafka-2.11-2.4.1详细文档

    echo "Hello, Kafka" | sudo -u kafka /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 消费消息 sudo -u kafka /usr/local/kafka/bin/kafka-console-consumer.sh --...

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...

    kafka 解决log4j:ERROR Failed to rename错误解决办法错误的jar包

    log4j:ERROR Failed to rename

    KafKa安装使用手册.docx

    /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & 二、KafkaManager 安装 KafkaManager 是一个基于 Web 的 Kafka 集群管理工具,提供了简单的界面来管理 Kafka 集群。 1...

    Streaming Architecture New Designs Using Apache Kafka and MapR Streams

    本文将深入探讨"Streaming Architecture New Designs Using Apache Kafka and MapR Streams"这一主题,阐述如何利用这两种强大的工具构建高效、可扩展的流处理系统。 Apache Kafka是一种分布式流处理平台,由...

    RdKafka::KafkaConsumer使用实例

    在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...

    redhat/centos/linux系统上zookeeper和kafka进程监控脚本

    如下所示,手动指定zookeeper和kafka的bin目录、配置文件以及需要添加定时任务的周期几个参数后,执行当前脚本,脚本会自动添加定时任务并开始监控zookeeper及kafka进程,如果进程不存在则重启并放入后台,存在则...

    KCenter是Apache Kafka 集群管理和维护,生产/消费监控,生态组件使用的统一一站式平台

    KCenter具有丰富的监控指标,涵盖了Kafka的核心性能参数,如 brokers、topics、partitions、producer、consumer 的状态等。这些指标实时更新,有助于识别潜在的性能瓶颈,预防故障的发生。此外,它还支持自定义报警...

    Kafka Producer机制优化-提高发送消息可靠性

    ### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...

    kafka消息队列集群安装说明

    编辑 `/opt/kafka_2.11-0.10.0.0/config/producer.properties` 和 `/opt/kafka_2.11-0.10.0.0/config/consumer.properties` 文件,分别设置Producer和Consumer的相关配置。 **5. 启动Kafka** ```bash # 进入Kafka...

    kafka-manager2.0 运行 nohup bin/kafka-manager -Dhttp.port=7456

    application配置增加:kafka-manager.zkhosts="hadoop104:2181

    pentaho-kafka-consumer.zip

    2. **数据预处理**:对从Kafka获取的数据进行清洗、转换,以满足后续处理的需求。 3. **数据集成**:将Kafka数据与其他数据源(如数据库、文件系统等)的数据进行整合。 4. **容错机制**:利用Kafka的高可用性和消息...

    kafka–2-Java客户端

    从kafka官网找例子: http://kafka.apache.org/ –&gt;click Documentation http://kafka.apache.org/documentation/–&gt;click 1.0.X http://kafka.apache.org/10/documentation.html –&gt;click API ...

Global site tag (gtag.js) - Google Analytics