- 浏览: 2568548 次
- 性别:
- 来自: 成都
-
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
Spark 2017 BigData Update(4)Spark Core in JAVA
Spark in JAVA
I set up a project called sillycat-spark-java, I plan to set up it running in JAVA.
Document for Kafka Stream
https://spark.apache.org/docs/2.2.1/streaming-kafka-0-10-integration.html
all the dependencies goes to pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sillycat</groupId>
<artifactId>sillycat-spark-java</artifactId>
<version>1.0</version>
<description>Fetch the Events from Kafka</description>
<name>Spark Streaming System</name>
<packaging>jar</packaging>
<properties>
<jackson.version>2.9.2</jackson.version>
<logging.version>1.7.25</logging.version>
<springframework.version>5.0.1.RELEASE</springframework.version>
<spark.version>2.2.1</spark.version>
</properties>
<dependencies>
<!-- spark framework -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.sillycat.sparkjava.SparkJavaApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
The Main Class for Common Usage SparkJavaApp.java
package com.sillycat.sparkjava;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class SparkJavaApp {
protected final static Logger logger = LoggerFactory.getLogger(SparkJavaApp.class);
public static void main(String[] args) {
logger.info("Start to Spark tasks------");
String className = "com.sillycat.sparkjava.app.CountLinesOfKeywordApp";
if (args != null && args.length > 0) {
for (int i = 0; i < args.length; i++) {
logger.info("args: " + args[i]);
}
logger.info("first args :" + args[0]);
className = args[0];
}
logger.info("className: " + className);
logger.info("--------------------------");
Class<?> classInstance;
try {
classInstance = Class.forName(className);
SparkBaseApp task = (SparkBaseApp) classInstance.newInstance();
task.executeTask(Arrays.asList(args));
} catch (ClassNotFoundException e) {
logger.info("ClassNotFoundException:", e);
} catch (InstantiationException e) {
logger.info("InstantiationException:", e);
} catch (IllegalAccessException e) {
logger.info("IllegalAccessException:", e);
}
}
}
The Base Class SparkBaseApp.java
package com.sillycat.sparkjava.base;
import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkBaseApp implements Serializable {
private static final long serialVersionUID = 3926361971198654215L;
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
public void executeTask(List<String> params) {
}
protected String getAppName() {
return "defaultJob";
}
protected SparkConf getSparkConf() {
SparkConf conf = new SparkConf();
conf.setAppName(this.getAppName());
conf.setIfMissing("spark.master", "local[4]");
conf.setSparkHome("/opt/spark");
conf.setJars(SparkContext.jarOfClass(this.getClass()).toList());
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
return conf;
}
}
The first example of Spark Core, CountLinesOfKeywordApp.java
package com.sillycat.sparkjava.app;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class CountLinesOfKeywordApp extends SparkBaseApp {
private static final long serialVersionUID = 7912776380212093020L;
protected String getAppName() {
return "CountLinesOfKeywordApp";
}
public void executeTask(List<String> params) {
SparkConf conf = this.getSparkConf();
SparkContext sc = new SparkContext(conf);
String logFile = "file:////opt/spark/README.md";
String keyword = "a";
logger.info("Prepare the resource from " + logFile);
JavaRDD<String> rdd = this.generateRdd(sc, logFile);
logger.info("Executing the calculation based on keyword " + keyword);
long result = processRows(rdd, keyword);
logger.info("Lines with keyword " + keyword + ":" + result);
sc.stop();
}
private JavaRDD<String> generateRdd(SparkContext sc, String logFile) {
JavaRDD<String> logData = sc.textFile(logFile, 10).toJavaRDD();
return logData;
}
private long processRows(JavaRDD<String> rows, String keyword) {
JavaRDD<String> lines = rows.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
if (s == null || s.trim().length() < 1) {
return false;
}
if(!s.contains(keyword)){
return false;
}
return true;
}
});
long count = lines.count();
return count;
}
}
References:
https://spark.apache.org/docs/latest/streaming-programming-guide.html
http://www.cnblogs.com/gaopeng527/p/4959633.html
Spark in JAVA
I set up a project called sillycat-spark-java, I plan to set up it running in JAVA.
Document for Kafka Stream
https://spark.apache.org/docs/2.2.1/streaming-kafka-0-10-integration.html
all the dependencies goes to pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sillycat</groupId>
<artifactId>sillycat-spark-java</artifactId>
<version>1.0</version>
<description>Fetch the Events from Kafka</description>
<name>Spark Streaming System</name>
<packaging>jar</packaging>
<properties>
<jackson.version>2.9.2</jackson.version>
<logging.version>1.7.25</logging.version>
<springframework.version>5.0.1.RELEASE</springframework.version>
<spark.version>2.2.1</spark.version>
</properties>
<dependencies>
<!-- spark framework -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.sillycat.sparkjava.SparkJavaApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
The Main Class for Common Usage SparkJavaApp.java
package com.sillycat.sparkjava;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class SparkJavaApp {
protected final static Logger logger = LoggerFactory.getLogger(SparkJavaApp.class);
public static void main(String[] args) {
logger.info("Start to Spark tasks------");
String className = "com.sillycat.sparkjava.app.CountLinesOfKeywordApp";
if (args != null && args.length > 0) {
for (int i = 0; i < args.length; i++) {
logger.info("args: " + args[i]);
}
logger.info("first args :" + args[0]);
className = args[0];
}
logger.info("className: " + className);
logger.info("--------------------------");
Class<?> classInstance;
try {
classInstance = Class.forName(className);
SparkBaseApp task = (SparkBaseApp) classInstance.newInstance();
task.executeTask(Arrays.asList(args));
} catch (ClassNotFoundException e) {
logger.info("ClassNotFoundException:", e);
} catch (InstantiationException e) {
logger.info("InstantiationException:", e);
} catch (IllegalAccessException e) {
logger.info("IllegalAccessException:", e);
}
}
}
The Base Class SparkBaseApp.java
package com.sillycat.sparkjava.base;
import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkBaseApp implements Serializable {
private static final long serialVersionUID = 3926361971198654215L;
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
public void executeTask(List<String> params) {
}
protected String getAppName() {
return "defaultJob";
}
protected SparkConf getSparkConf() {
SparkConf conf = new SparkConf();
conf.setAppName(this.getAppName());
conf.setIfMissing("spark.master", "local[4]");
conf.setSparkHome("/opt/spark");
conf.setJars(SparkContext.jarOfClass(this.getClass()).toList());
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
return conf;
}
}
The first example of Spark Core, CountLinesOfKeywordApp.java
package com.sillycat.sparkjava.app;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import com.sillycat.sparkjava.base.SparkBaseApp;
public class CountLinesOfKeywordApp extends SparkBaseApp {
private static final long serialVersionUID = 7912776380212093020L;
protected String getAppName() {
return "CountLinesOfKeywordApp";
}
public void executeTask(List<String> params) {
SparkConf conf = this.getSparkConf();
SparkContext sc = new SparkContext(conf);
String logFile = "file:////opt/spark/README.md";
String keyword = "a";
logger.info("Prepare the resource from " + logFile);
JavaRDD<String> rdd = this.generateRdd(sc, logFile);
logger.info("Executing the calculation based on keyword " + keyword);
long result = processRows(rdd, keyword);
logger.info("Lines with keyword " + keyword + ":" + result);
sc.stop();
}
private JavaRDD<String> generateRdd(SparkContext sc, String logFile) {
JavaRDD<String> logData = sc.textFile(logFile, 10).toJavaRDD();
return logData;
}
private long processRows(JavaRDD<String> rows, String keyword) {
JavaRDD<String> lines = rows.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String s) throws Exception {
if (s == null || s.trim().length() < 1) {
return false;
}
if(!s.contains(keyword)){
return false;
}
return true;
}
});
long count = lines.count();
return count;
}
}
References:
https://spark.apache.org/docs/latest/streaming-programming-guide.html
http://www.cnblogs.com/gaopeng527/p/4959633.html
发表评论
-
Stop Update Here
2020-04-28 09:00 332I will stop update here, and mo ... -
NodeJS12 and Zlib
2020-04-01 07:44 493NodeJS12 and Zlib It works as ... -
Docker Swarm 2020(2)Docker Swarm and Portainer
2020-03-31 23:18 378Docker Swarm 2020(2)Docker Swar ... -
Docker Swarm 2020(1)Simply Install and Use Swarm
2020-03-31 07:58 381Docker Swarm 2020(1)Simply Inst ... -
Traefik 2020(1)Introduction and Installation
2020-03-29 13:52 351Traefik 2020(1)Introduction and ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 440Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 450Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 392Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 475VPN Server 2020(2)Docker on Cen ... -
Buffer in NodeJS 12 and NodeJS 8
2020-02-25 06:43 402Buffer in NodeJS 12 and NodeJS ... -
NodeJS ENV Similar to JENV and PyENV
2020-02-25 05:14 496NodeJS ENV Similar to JENV and ... -
Prometheus HA 2020(3)AlertManager Cluster
2020-02-24 01:47 440Prometheus HA 2020(3)AlertManag ... -
Serverless with NodeJS and TencentCloud 2020(5)CRON and Settings
2020-02-24 01:46 346Serverless with NodeJS and Tenc ... -
GraphQL 2019(3)Connect to MySQL
2020-02-24 01:48 262GraphQL 2019(3)Connect to MySQL ... -
GraphQL 2019(2)GraphQL and Deploy to Tencent Cloud
2020-02-24 01:48 463GraphQL 2019(2)GraphQL and Depl ... -
GraphQL 2019(1)Apollo Basic
2020-02-19 01:36 336GraphQL 2019(1)Apollo Basic Cl ... -
Serverless with NodeJS and TencentCloud 2020(4)Multiple Handlers and Running wit
2020-02-19 01:19 322Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(3)Build Tree and Traverse Tree
2020-02-19 01:19 331Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(2)Trigger SCF in SCF
2020-02-19 01:18 306Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(1)Running with Component
2020-02-19 01:17 321Serverless with NodeJS and Tenc ...
相关推荐
Spark: Big Data Cluster Computing in Production English | 2016 | ISBN: 1119254019 | 216 pages | PDF | 5 MB Production-targeted Spark guidance with real-world use cases Spark: Big Data Cluster ...
The book describes the emergence of big data technologies and the role of Spark in the entire big data stack. It compares Spark and Hadoop and identifies the shortcomings of Hadoop that have been ...
Scala and Spark for Big Data Analytics by Md. Rezaul Karim English | 25 July 2017 | ISBN: 1785280848 | ASIN: B072J4L8FQ | 898 Pages | AZW3 | 20.56 MB Harness the power of Scala to program Spark and ...
Chapter 1, Introduction to Scala, will teach big data analytics using the Scalabased APIs of Spark. Spark itself is written with Scala and naturally, as a starting point, we will discuss a brief ...
Scala and Spark for Big Data Analytics 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请...
Scala and Spark for Big Data Analytics 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
The book describes the emergence of big data technologies and the role of Spark in the entire big data stack. It compares Spark and Hadoop and identifies the shortcomings of Hadoop that have been ...
《Big Data Analytics with Spark》这本书由Mohammed Guller撰写,旨在为读者提供一个实用指南,帮助大家利用Apache Spark进行大规模数据分析。 ### Apache Spark简介 Apache Spark是一种开源的大规模数据处理框架...
There is a critical shortage of people with big data expertise, so companies are willing to pay top dollar for people with skills in areas like Spark and Scala. So reading this book and absorbing its ...
Big Data with Apache Spark and Python 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请...
Spark-The Definitive Guide Big Data Processing Made Simple 完美true pdf。 Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters. As of ...
Apache Spark has emerged as the next big thing in the Big Data domain – quickly rising from an ascending technology to an established superstar in just a matter of years. Spark allows you to quickly ...
《Spark: The Definitive Guide: Big Data Processing Made Simple》是大数据处理领域的经典著作,由Databricks的创始人之一Michael Armbrust等专家撰写。这本书深入浅出地介绍了Apache Spark的核心概念、架构以及...
Big Data with Apache Spark and Python 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除