- 浏览: 2568531 次
- 性别:
- 来自: 成都
-
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
Spark 2017 BigData Update(5)Spark Streaming in Java
The Streaming Small example Class, WordCountStreamingApp.java
package com.sillycat.sparkjava.app;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class WordCountStreamingApp extends SparkBaseApp {
private static final long serialVersionUID = 7401544141510431796L;
protected String getAppName() {
return "WordCountStreamingApp";
}
public void executeTask(List<String> params) {
SparkConf conf = this.getSparkConf();
// The time interval at which streaming data will be divided into
// batches
logger.info("Start to have the streaming");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(30000));
ssc.checkpoint(this.getAppName());
logger.info("Prepare the resource for streaming");
processStream(ssc, "carl");
logger.info("Streaming is working");
try {
ssc.start();
ssc.awaitTermination();
} catch (InterruptedException e) {
logger.error("InterruptedException:", e);
}
}
private void processStream(JavaStreamingContext ssc, String keyword) {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "WordCountStreamingApp");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("sillycat-topic");
logger.info("Init the Kafka Clients to fetch lines");
JavaInputDStream<ConsumerRecord<String, String>> dStream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
dStream.foreachRDD(rdd -> {
processRows(rdd, keyword);
});
}
private void processRows(JavaRDD<ConsumerRecord<String, String>> rdds, String keyword) {
JavaRDD<String> rows = rdds.map(record -> record.value());
JavaRDD<String> lines = rows.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
if (s == null || s.trim().length() < 1) {
return false;
}
if (!s.contains(keyword)) {
return false;
}
return true;
}
});
long count = lines.count();
logger.info("Kafka received " + count + " " + keyword);
}
}
Here is how we should run all these testing:
#Run the local#
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on local#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /Users/carl/work/sillycat/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on Remote YARN Cluster#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
References:
http://sillycat.iteye.com/blog/2407639
The Streaming Small example Class, WordCountStreamingApp.java
package com.sillycat.sparkjava.app;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class WordCountStreamingApp extends SparkBaseApp {
private static final long serialVersionUID = 7401544141510431796L;
protected String getAppName() {
return "WordCountStreamingApp";
}
public void executeTask(List<String> params) {
SparkConf conf = this.getSparkConf();
// The time interval at which streaming data will be divided into
// batches
logger.info("Start to have the streaming");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(30000));
ssc.checkpoint(this.getAppName());
logger.info("Prepare the resource for streaming");
processStream(ssc, "carl");
logger.info("Streaming is working");
try {
ssc.start();
ssc.awaitTermination();
} catch (InterruptedException e) {
logger.error("InterruptedException:", e);
}
}
private void processStream(JavaStreamingContext ssc, String keyword) {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "WordCountStreamingApp");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("sillycat-topic");
logger.info("Init the Kafka Clients to fetch lines");
JavaInputDStream<ConsumerRecord<String, String>> dStream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
dStream.foreachRDD(rdd -> {
processRows(rdd, keyword);
});
}
private void processRows(JavaRDD<ConsumerRecord<String, String>> rdds, String keyword) {
JavaRDD<String> rows = rdds.map(record -> record.value());
JavaRDD<String> lines = rows.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
if (s == null || s.trim().length() < 1) {
return false;
}
if (!s.contains(keyword)) {
return false;
}
return true;
}
});
long count = lines.count();
logger.info("Kafka received " + count + " " + keyword);
}
}
Here is how we should run all these testing:
#Run the local#
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>java -jar target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on local#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /Users/carl/work/sillycat/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
#Run binary on Remote YARN Cluster#
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit --class com.sillycat.sparkjava.SparkJavaApp --master yarn-client /home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jar com.sillycat.sparkjava.app.WordCountStreamingApp
References:
http://sillycat.iteye.com/blog/2407639
发表评论
-
Update Site will come soon
2021-06-02 04:10 1696I am still keep notes my tech n ... -
Stop Update Here
2020-04-28 09:00 332I will stop update here, and mo ... -
NodeJS12 and Zlib
2020-04-01 07:44 493NodeJS12 and Zlib It works as ... -
Docker Swarm 2020(2)Docker Swarm and Portainer
2020-03-31 23:18 378Docker Swarm 2020(2)Docker Swar ... -
Docker Swarm 2020(1)Simply Install and Use Swarm
2020-03-31 07:58 381Docker Swarm 2020(1)Simply Inst ... -
Traefik 2020(1)Introduction and Installation
2020-03-29 13:52 351Traefik 2020(1)Introduction and ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 440Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 450Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 392Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 475VPN Server 2020(2)Docker on Cen ... -
Buffer in NodeJS 12 and NodeJS 8
2020-02-25 06:43 402Buffer in NodeJS 12 and NodeJS ... -
NodeJS ENV Similar to JENV and PyENV
2020-02-25 05:14 496NodeJS ENV Similar to JENV and ... -
Prometheus HA 2020(3)AlertManager Cluster
2020-02-24 01:47 440Prometheus HA 2020(3)AlertManag ... -
Serverless with NodeJS and TencentCloud 2020(5)CRON and Settings
2020-02-24 01:46 346Serverless with NodeJS and Tenc ... -
GraphQL 2019(3)Connect to MySQL
2020-02-24 01:48 262GraphQL 2019(3)Connect to MySQL ... -
GraphQL 2019(2)GraphQL and Deploy to Tencent Cloud
2020-02-24 01:48 463GraphQL 2019(2)GraphQL and Depl ... -
GraphQL 2019(1)Apollo Basic
2020-02-19 01:36 336GraphQL 2019(1)Apollo Basic Cl ... -
Serverless with NodeJS and TencentCloud 2020(4)Multiple Handlers and Running wit
2020-02-19 01:19 322Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(3)Build Tree and Traverse Tree
2020-02-19 01:19 331Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(2)Trigger SCF in SCF
2020-02-19 01:18 306Serverless with NodeJS and Tenc ...
相关推荐
Spark: Big Data Cluster Computing in Production English | 2016 | ISBN: 1119254019 | 216 pages | PDF | 5 MB Production-targeted Spark guidance with real-world use cases Spark: Big Data Cluster ...
Scala and Spark for Big Data Analytics by Md. Rezaul Karim English | 25 July 2017 | ISBN: 1785280848 | ASIN: B072J4L8FQ | 898 Pages | AZW3 | 20.56 MB Harness the power of Scala to program Spark and ...
《Big Data Analytics with Spark》这本书由Mohammed Guller撰写,旨在为读者提供一个实用指南,帮助大家利用Apache Spark进行大规模数据分析。 ### Apache Spark简介 Apache Spark是一种开源的大规模数据处理框架...
There is a critical shortage of people with big data expertise, so companies are willing to pay top dollar for people with skills in areas like Spark and Scala. So reading this book and absorbing its ...
Spark-The Definitive Guide Big Data Processing Made Simple 完美true pdf。 Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters. As of ...
Process continuous streams of data in real time using the Spark streaming module Perform complex network analysis using Spark's GraphX library Use Amazon's Elastic MapReduce service to run your Spark ...
《Spark: The Definitive Guide: Big Data Processing Made Simple》是大数据处理领域的经典著作,由Databricks的创始人之一Michael Armbrust等专家撰写。这本书深入浅出地介绍了Apache Spark的核心概念、架构以及...
本项目涉及的核心知识点包括Spark Core、Spark SQL和Spark Streaming,同时结合了Scala和Java编程语言,以及Maven构建工具,实现了混合框架的搭建。下面将详细阐述这些关键点。 1. **Spark Core**:作为Spark的基础...
Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...
这本书《Taming Big Data with Apache Spark and Python》由Frank Kane所著,主要讲解了如何使用Apache Spark和Python来分析大规模数据集,并提供了真实的案例帮助读者理解和实践。Apache Spark是一个开源的分布式...
- "Spark in Action":详细介绍了如何使用Spark进行数据处理和分析。 - "Advanced Analytics with Spark":深入探讨了使用Spark进行数据挖掘和机器学习的高级主题。 - Springer出版社的书籍系列,例如"Realtime Data...
Apache Spark is the buzzword in the big data industry right now, especially with the increasing need for real-time streaming and data processing. While Spark is built on Scala, the Spark Java API ...
Spark Streaming是Apache Spark的一个扩展,它为实时处理大数据流提供了支持。Tathagata Das在2013年Spark Summit会议上发表了关于Spark Streaming的演讲,他首先介绍了为什么需要Spark Streaming。随着大数据的兴起...
**Spark Streaming:实时大数据处理** Spark Streaming是Apache Spark框架的一部分,专为实时数据处理而设计。它构建在Spark核心之上,提供了对实时数据流的高吞吐量、容错性和可伸缩性处理能力。Spark Streaming...
Whether you are currently working on a big data project or interested in learning more about topics like machine learning, streaming data processing, and graph data analytics, this book is for you....