- 浏览: 2569660 次
- 性别:
- 来自: 成都
-
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
Kafka 2017 Update(2)Kafka Producer/Consumer and Architecture
Spark Streaming/Apache Storm
Input Data Stream —> Spark Streaming —> batches of input data —> Spark Engine —> batches of processed data
Try the Word Count Example
https://spark.apache.org/docs/latest/streaming-programming-guide.html
Try that in Scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
Exceptions:
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Solution:
Zeppelin already created a sc context for us, so we can directly use this.
val ssc = new StreamingContext(sc, Seconds(10))
Or we can directly run the example
>bin/run-example streaming.NetworkWordCount localhost 9999
>nc -lk 9999
hello world
It will show in the console
Time: 1515044374000 ms
-------------------------------------------
(hello,1)
(world,1)
Kafka Concept
Consumer Group - group ID
Consumer Position - offset
Kafka Producer
pom.xml change is as follow:
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
Running the Producer to send 10 messages to the topic of Kafka
package com.sillycat.sparkjava.app;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerApp {
private final static String TOPIC = "sillycat-topic";
private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";
private Producer<Long, String> createProducer() {
Properties props = new Properties();
// server list
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// client ID
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
public void runProducer(final int sendMessageCount) throws Exception {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index,
"Hello Sillycat " + index);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime);
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String[] args) {
KafkaProducerApp app = new KafkaProducerApp();
try {
app.runProducer(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Kafka Consumer to Pull the Data from Topic
package com.sillycat.sparkjava.app;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerApp {
private final static String TOPIC = "sillycat-topic";
private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";
private Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerApp");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100;
int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count() == 0) {
noRecordsCount++;
if (noRecordsCount > giveUp)
break;
else
continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
record.partition(), record.offset());
});
// which commit offsets returned on the last call to consumer.poll
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
public static void main(String[] args) {
KafkaConsumerApp app = new KafkaConsumerApp();
try {
app.runConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Kafka Architecture
Records - key(optional), value, timestamp
A topic has a Log which is the topic’s storage on disk, it can be broken up into partitions and segments.
Kafka appends records from a producer(s) to the end of a topic log. A topic log consists of many partitions that are spread over multiple files which can be spread on multiple Kafka cluster nodes.
Consumers read from Kafka topics from where they left off reading.
http://cloudurable.com/blog/kafka-architecture/index.html
Kafka Topic Architecture
http://cloudurable.com/blog/kafka-architecture-topics/index.html
Kafka Consumer Architecture
http://cloudurable.com/blog/kafka-architecture-consumers/index.html
Kafka Producer Architecture
http://cloudurable.com/blog/kafka-architecture-producers/index.html
References:
http://sillycat.iteye.com/blog/2215237
http://sillycat.iteye.com/blog/2406572
http://sillycat.iteye.com/blog/2370527
http://www.cnblogs.com/gaopeng527/p/4959633.html
https://www.jianshu.com/p/d49460799e5b
http://www.cnblogs.com/huxi2b/p/6223228.html
http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html
Kafka update 1
http://sillycat.iteye.com/blog/2406569
http://blog.csdn.net/eric_sunah/article/details/49762839
https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/0.1%20Spark%20Streaming%20%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E4%B8%8E%E6%A8%A1%E5%9D%97%E6%A6%82%E8%BF%B0.md
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/index.html
http://mangocool.com/1479867274843.html
https://spark.apache.org/docs/latest/streaming-programming-guide.html
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
Spark Streaming/Apache Storm
Input Data Stream —> Spark Streaming —> batches of input data —> Spark Engine —> batches of processed data
Try the Word Count Example
https://spark.apache.org/docs/latest/streaming-programming-guide.html
Try that in Scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
Exceptions:
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Solution:
Zeppelin already created a sc context for us, so we can directly use this.
val ssc = new StreamingContext(sc, Seconds(10))
Or we can directly run the example
>bin/run-example streaming.NetworkWordCount localhost 9999
>nc -lk 9999
hello world
It will show in the console
Time: 1515044374000 ms
-------------------------------------------
(hello,1)
(world,1)
Kafka Concept
Consumer Group - group ID
Consumer Position - offset
Kafka Producer
pom.xml change is as follow:
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
Running the Producer to send 10 messages to the topic of Kafka
package com.sillycat.sparkjava.app;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerApp {
private final static String TOPIC = "sillycat-topic";
private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";
private Producer<Long, String> createProducer() {
Properties props = new Properties();
// server list
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// client ID
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
public void runProducer(final int sendMessageCount) throws Exception {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index,
"Hello Sillycat " + index);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime);
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String[] args) {
KafkaProducerApp app = new KafkaProducerApp();
try {
app.runProducer(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Kafka Consumer to Pull the Data from Topic
package com.sillycat.sparkjava.app;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerApp {
private final static String TOPIC = "sillycat-topic";
private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";
private Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerApp");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100;
int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count() == 0) {
noRecordsCount++;
if (noRecordsCount > giveUp)
break;
else
continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
record.partition(), record.offset());
});
// which commit offsets returned on the last call to consumer.poll
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
public static void main(String[] args) {
KafkaConsumerApp app = new KafkaConsumerApp();
try {
app.runConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Kafka Architecture
Records - key(optional), value, timestamp
A topic has a Log which is the topic’s storage on disk, it can be broken up into partitions and segments.
Kafka appends records from a producer(s) to the end of a topic log. A topic log consists of many partitions that are spread over multiple files which can be spread on multiple Kafka cluster nodes.
Consumers read from Kafka topics from where they left off reading.
http://cloudurable.com/blog/kafka-architecture/index.html
Kafka Topic Architecture
http://cloudurable.com/blog/kafka-architecture-topics/index.html
Kafka Consumer Architecture
http://cloudurable.com/blog/kafka-architecture-consumers/index.html
Kafka Producer Architecture
http://cloudurable.com/blog/kafka-architecture-producers/index.html
References:
http://sillycat.iteye.com/blog/2215237
http://sillycat.iteye.com/blog/2406572
http://sillycat.iteye.com/blog/2370527
http://www.cnblogs.com/gaopeng527/p/4959633.html
https://www.jianshu.com/p/d49460799e5b
http://www.cnblogs.com/huxi2b/p/6223228.html
http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html
Kafka update 1
http://sillycat.iteye.com/blog/2406569
http://blog.csdn.net/eric_sunah/article/details/49762839
https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/0.1%20Spark%20Streaming%20%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E4%B8%8E%E6%A8%A1%E5%9D%97%E6%A6%82%E8%BF%B0.md
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/index.html
http://mangocool.com/1479867274843.html
https://spark.apache.org/docs/latest/streaming-programming-guide.html
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
发表评论
-
Update Site will come soon
2021-06-02 04:10 1701I am still keep notes my tech n ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 442Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 453Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 396Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 480VPN Server 2020(2)Docker on Cen ... -
Nginx Deal with OPTIONS in HTTP Protocol
2020-02-15 01:33 374Nginx Deal with OPTIONS in HTTP ... -
PDF to HTML 2020(1)pdftohtml Linux tool or PDFBox
2020-01-29 07:37 438PDF to HTML 2020(1)pdftohtml Li ... -
Elasticsearch Cluster 2019(2)Kibana Issue or Upgrade
2020-01-12 03:25 750Elasticsearch Cluster 2019(2)Ki ... -
Spark Streaming 2020(1)Investigation
2020-01-08 07:19 312Spark Streaming 2020(1)Investig ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 313Hadoop Docker 2019 Version 3.2. ... -
MongoDB 2019(3)Security and Auth
2019-11-16 06:48 257MongoDB 2019(3)Security and Aut ... -
MongoDB 2019(1)Install 4.2.1 Single and Cluster
2019-11-11 05:07 303MongoDB 2019(1) Follow this ht ... -
Monitor Tool 2019(1)Monit Installation and Usage
2019-10-17 08:22 337Monitor Tool 2019(1)Monit Insta ... -
Ansible 2019(1)Introduction and Installation on Ubuntu and CentOS
2019-10-12 06:15 328Ansible 2019(1)Introduction and ... -
Timezone and Time on All Servers and Docker Containers
2019-10-10 11:18 349Timezone and Time on All Server ... -
Kafka Cluster 2019(6) 3 Nodes Cluster on CentOS7
2019-10-05 23:28 301Kafka Cluster 2019(6) 3 Nodes C ... -
K8S Helm(1)Understand YAML and Kubectl Pod and Deployment
2019-10-01 01:21 347K8S Helm(1)Understand YAML and ... -
Rancher and k8s 2019(5)Private Registry
2019-09-27 03:25 384Rancher and k8s 2019(5)Private ... -
Jenkins 2019 Cluster(1)Version 2.194
2019-09-12 02:53 466Jenkins 2019 Cluster(1)Version ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 385Redis Cluster 2019(3)Redis Clus ...
相关推荐
Kafka客户端producer/consumer样例
在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...
/DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties & sleep 3 # 启动 Kafka /DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh /DATA/...
/usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...
kafka的docker镜像wurstmeister/kafka,有一段时间docker上老是拉不下来,故存为资源
2. **解压插件**:下载`pentaho-kafka-producer.zip`文件后,使用解压缩工具将其解压。这将包含插件的各类文件,如Java类、配置文件等。 3. **复制到steps文件夹**:将解压后的所有文件和文件夹复制到`安装目录>/...
$consumer = new RdKafka\KafkaConsumer($conf); // 订阅主题 $consumer->subscribe(['my-topic']); while (true) { $msg = $consumer->consume(1000); if ($msg === null) continue; echo "Received message...
echo "Hello, Kafka" | sudo -u kafka /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 消费消息 sudo -u kafka /usr/local/kafka/bin/kafka-console-consumer.sh --...
标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...
log4j:ERROR Failed to rename
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & 二、KafkaManager 安装 KafkaManager 是一个基于 Web 的 Kafka 集群管理工具,提供了简单的界面来管理 Kafka 集群。 1...
本文将深入探讨"Streaming Architecture New Designs Using Apache Kafka and MapR Streams"这一主题,阐述如何利用这两种强大的工具构建高效、可扩展的流处理系统。 Apache Kafka是一种分布式流处理平台,由...
在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...
如下所示,手动指定zookeeper和kafka的bin目录、配置文件以及需要添加定时任务的周期几个参数后,执行当前脚本,脚本会自动添加定时任务并开始监控zookeeper及kafka进程,如果进程不存在则重启并放入后台,存在则...
KCenter具有丰富的监控指标,涵盖了Kafka的核心性能参数,如 brokers、topics、partitions、producer、consumer 的状态等。这些指标实时更新,有助于识别潜在的性能瓶颈,预防故障的发生。此外,它还支持自定义报警...
### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...
编辑 `/opt/kafka_2.11-0.10.0.0/config/producer.properties` 和 `/opt/kafka_2.11-0.10.0.0/config/consumer.properties` 文件,分别设置Producer和Consumer的相关配置。 **5. 启动Kafka** ```bash # 进入Kafka...
application配置增加:kafka-manager.zkhosts="hadoop104:2181
2. **数据预处理**:对从Kafka获取的数据进行清洗、转换,以满足后续处理的需求。 3. **数据集成**:将Kafka数据与其他数据源(如数据库、文件系统等)的数据进行整合。 4. **容错机制**:利用Kafka的高可用性和消息...
从kafka官网找例子: http://kafka.apache.org/ –>click Documentation http://kafka.apache.org/documentation/–>click 1.0.X http://kafka.apache.org/10/documentation.html –>click API ...