Spark(3)More Examples and Deployment
1. More Spark Examples
1.1 Text Search
val file = spark.textFile()
val errors = file.filter(line => line.contains("ERROR"))
//count all the errors
errors.count()
errors.filter(line => line.contains("MySQL")).count()
errors.filter(line => line.contains("MySQL")).collect()
//show all the error messages related to MYSQL
1.2 IN-Memory text search
errors.cache()
1.3 Word Count
val file = spark.textFile()
val counts = file.flatMap(line =>line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile()
2. We can find more Code Examples
https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/examples
3. My Example
val item1 = Product(None,"CK","good",DateTime.now)
val item2 = Product(None,"apple","good",DateTime.now)
val item3 = Product(None,"nike","bad",DateTime.now)
val item4 = Product(None,"cat","bad",DateTime.now)
val products = Seq(item1,item2,item3,item4)
val name = "good"
val rdd = sc.makeRDD(products,2)
val rdd2 = rdd.groupBy{ s => s.productName == name}
println("rdd first array ============" + rdd2.toArray()(0)._2)
println("rdd second array ============" + rdd2.toArray()(1)._2)
rdd2.toArray()(1)._2.foreach{ s =>
println("Products are good ============== " + s)
//persist
}
sc.stop
The output of this example will be as follow:
Products are good ============== Product(None,CK,good,2013-07-09T10:04:03.949-05:00)
Products are good ============== Product(None,apple,good,2013-07-09T10:04:04.075-05:00)
In my official example, we can also put the DAO layer in another project, spark only deal with the calculate things.
One problem we need to take care about is
Every class we need to use in the closure of spark, we need to make sure it is extends from Serializable
4. Deployment of Spark - Spark Standalone Mode
4.1 Starting a Cluster Manually
>cd SPARK_HOME
>./run spark.deploy.master.Master
We can visit the UI page with URL http://localhost:8080/
This should be the URL of spark master spark://Carls-MacBook-Pro.local:7077
I can also change the URL of spark with these parameters
>./run spark.deploy.master.Master -i localhost
My master URL will be spark://localhost:7077 then.
And I also can start one work node with this command
>./run spark.deploy.worker.Worker spark://localhost:7077
After that we can also see the URL and other information on the UI of master
Carls-MacBook-Pro.local:52875
I also want to define the HOST part of the URL with parameters.
>./run spark.deploy.worker.Worker spark://localhost:7077 -i localhost
There are a lot of other parameters
-i IP, --ip IP
-p PORT, --port PORT
--webui-port PORT Port for web UI(default: 8080 for master, 8081 for worker)
-c CORES, --cores CORES only on worker, Total CPU cores to allow Spark jobs to use
-m MEM, --memory MEM Total amount of memory to allow Spark jobs to use, e.g.: 1000M or 2G, only on worker
-d DIR, --work-dir DIR Default is SPARK_HOME/work
And there is another way to launch the Cluster
4.2 Cluster Launch Scripts
Prepare the Server to Work on
Add user on Remote Server
>/usr/sbin/useradd spark
Give a password
>passwd spark
Generate the Key Pair on Client Server
>ssh-keygen -t rsa
The pairs will be generated here /Users/carl/.ssh/
Place the public key on remote server
>su spark
>mkdir ~/.ssh
>vi authorized_keys
Place the content in public key here
>chmod 711 ~/.ssh
>chmod 644 ~/.ssh/authorized_keys
Try from the client Server
>sudo vi config
Place something like this
Host server1.com
user root
identityFile "~/.ssh/demo1"
Host server2.com
user spark
identityFile "~/.ssh/demo2"
Then I can connect to the server directly now.
The cluster scripts are as follow:
bin/start-master.sh
bin/start-slaves.sh
bin/start-all.sh
bin/stop-master.sh
bin/stop-slaves.sh
bin/stop-all.sh
Need to execute these commands on the master machines.
Watch on the file SPARK_HOME/conf/slaves
There is only localhost right now. And we can config the parameters in SPARK_HOME/conf/spark-env.sh
SCALA_HOME=/opt/scala2.10.0
SPARK_MASTER_IP=localhost
Then I can start the cluster with command
>bin/start-all.sh
Error Message:
Carls-MacBook-Pro:spark carl$ ssh localhost
ssh: connect to host localhost port 22: Connection refused
We can get the same error Message with command
>ssh localhost
Solution:
System Preferences -----> Internet & Wireless -------> Sharing -----> Check 'Remote Login'
System Preferences -----> Internet & Wireless -------> Sharing -----> click 'Edit' to change the name of my MAC book
4.3 Connecting the Job to the Cluster
>MASTER=spark://IP:PORT ./spark-shell
Connecting with App.
val sparkMaster = "spark://192.168.10.115:7070"
val sc = new SparkContext(sparkMaster,
"Complex Job",
"/opt/spark",
List("target/scala-2.10/easysparkserver_2.10-1.0.jar"),
Map())
Try to use app to connect to master
Error Message:
3:40:55 INFO client.Client$ClientActor: Connecting to master spark://192.168.10.115:7070
13/07/09 13:40:55 ERROR client.Client$ClientActor: Connection to master failed; stopping client
Solution:
Error port number, change to 7077
Better, but still get Error Message
13/07/09 13:44:54 INFO cluster.ClusterScheduler: Adding task set 1.0 with 2 tasks
13/07/09 13:45:09 WARN cluster.ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered
13/07/09 14:31:19 ERROR NettyRemoteTransport(akka://sparkMaster@localhost:7077): RemoteServerError@akka://sparkMaster@localhost:7077] Error[java.io.OptionalDataException]
Solution:
>./run spark.examples.SparkPi spark://localhost:7077
It is working, so the cluster should be fine.
Finally I found the reason. I need to start the spark server with sudo user.
Error Message:
13/07/09 14:45:03 ERROR executor.Executor: Exception in task ID 0
java.lang.NoClassDefFoundError: Lorg/joda/time/DateTime;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2339)
13/07/09 14:45:03 ERROR executor.Executor: Exception in task ID 1
java.lang.NoClassDefFoundError: Lorg/joda/time/DateTime;
at java.lang.Class.getDeclaredFields0(Native Method)
Solution:
>sbt assembly
Error Message
[info] Merging 'javax/servlet/SingleThreadModel.class' with strategy 'deduplicate'
java.lang.RuntimeException: deduplicate: different file contents found in the following:
/Users/carl/.ivy2/cache/javax.servlet/servlet-api/jars/servlet-api-2.5.jar:javax/servlet/SingleThreadModel.class
/Users/carl/.ivy2/cache/org.mortbay.jetty/servlet-api-2.5/jars/servlet-api-2.5-6.1.14.jar:javax/servlet/SingleThreadModel.class
/Users/carl/.ivy2/cache/org.mortbay.jetty/servlet-api/jars/servlet-api-2.5-20081211.jar:javax/servlet/SingleThreadModel.class
javax/servlet/SingleThreadModel
org/apache/jasper/compiler/Node
org/fusesource/jansi/Ansi$1.class' with strategy 'deduplicate'
'org/apache/commons/beanutils/converters/FloatArrayConverter
META-INF/native/osx/libjansi.jnilib' with strategy 'deduplicate'
Merging 'org/apache/commons/collections
Merging 'META-INF/NOTICE.txt' with strategy 'deduplicate'
'META-INF/native/windows32/jansi.dll' with strategy 'reduplicate'
'about.html' with strategy 'reduplicate'
Solution:
import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "jasper", xs @ _*) => MergeStrategy.first
case PathList("org", "fusesource", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "commons", "beanutils", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "commons", "collections", xs @ _*) => MergeStrategy.first
case PathList("META-INF", xs @ _*) =>
(xs map {_.toLowerCase}) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
MergeStrategy.discard
case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
MergeStrategy.discard
case "plexus" :: xs =>
MergeStrategy.discard
case "services" :: xs =>
MergeStrategy.filterDistinctLines
case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) =>
MergeStrategy.filterDistinctLines
case ps @ (x :: xs) if ps.last.endsWith(".jnilib") || ps.last.endsWith(".dll") =>
MergeStrategy.first
case ps @ (x :: xs) if ps.last.endsWith(".txt") =>
MergeStrategy.discard
case ("notice" :: Nil) | ("license" :: Nil)=>
MergeStrategy.discard
case _ => MergeStrategy.deduplicate
}
case "application.conf" => MergeStrategy.concat
case "about.html" => MergeStrategy.discard
case x => old(x)
}
}
>sbt clean update compile package assembly
>sbt run
That is it. I hope it works fine. Certainly, I need to change the SparkContext as follow:
val sc = new SparkContext(sparkMaster,
"Complex Job", "/opt/spark",
List("/Users/carl/work/easy/easysparkserver/target/easysparkserver-assembly-1.0.jar"), Map())
Alternatively
excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
cp filter {_.data.getName == "parboiled-scala_2.10.0-RC5-1.1.4.jar" }
cp filter {_.data.getName == "minlog-1.2.jar" }
}
References:
spark shell
http://spark-project.org/docs/latest/quick-start.html
Spark Grammer and Examples
http://spark-project.org/examples/
cassandra thrift
http://wiki.apache.org/cassandra/ThriftExamples
http://wiki.apache.org/cassandra/ClientExamples
API document
http://spark-project.org/docs/latest/scala-programming-guide.html
Running Configuration
http://spark-project.org/docs/latest/running-on-yarn.html
http://spark-project.org/docs/latest/running-on-mesos.html
http://spark-project.org/docs/latest/spark-standalone.html
private key with SSH
http://sillycat.iteye.com/blog/1100363
http://sillycat.iteye.com/blog/1756114
sbt assembly
https://github.com/sbt/sbt-assembly
- 浏览: 2542375 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
发表评论
-
Update Site will come soon
2021-06-02 04:10 1672I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 288Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 440Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 282Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 244Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 314AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 306Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 333Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 442Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 494Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 364Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 322Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 365Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 433Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 513MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 456RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 316Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 316Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 322ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 393Jetty Server and Cookie Domain ...
相关推荐
本数据集"Dataset for R and data mining examples and case studies"旨在提供一系列R语言编程实例和数据挖掘案例研究,帮助用户深入理解和掌握相关技术。 首先,让我们来探讨R语言在数据处理中的优势。R语言拥有...
文件"original-spark-examples_2.11_hardfixed-2.4.3.jar"即为经过上述步骤定制编译后的Spark示例库,适用于HBase2环境。你可以将其添加到Spark的类路径中,然后使用`pyspark`命令启动Python交互式环境,测试对...
pyspark访问hbase2报错的解决方案,下载spark2.4.3的源码重新打包。
spark-examples-1.6.1-hadoop2.6.0.jar包下载,用于spark开发使用 用于spark开发使用 用于spark开发使用
"learning-spark-examples-master"这一项目主要涵盖了使用Apache Spark进行数据处理和分析的实例代码,旨在帮助学习者深入理解Spark的核心功能和使用方式。Spark作为一个快速、通用且可扩展的大数据处理框架,以其...
Graph Algorithms: Practical Examples in Apache Spark and Neo4j By 作者: Mark Needham – Amy E. Hodler ISBN-10 书号: 1492047686 ISBN-13 书号: 9781492047681 Edition 版本: 1 出版日期: 2019-01-04 pages ...
Filled with practical examples and use cases, this book will hot only help you get up and running with Spark, but will also take you farther down the road to becoming a data scientist.
Frank Kane's Taming Big Data with Apache Spark and Python English | 2017 | ISBN-10: 1787287947 | 296 pages | AZW3/PDF/EPUB (conv) | 6.12 Mb ... - Learning More About Spark and Data Science
7. **spark_examples-master项目解析** - 此项目包含了一系列Java编写的Spark示例程序,覆盖了基本的RDD操作、DataFrame操作以及Spark SQL等功能。 - 通过学习和运行这些示例,开发者可以直观地理解Spark API的...
HbaseAPI资源包,pyspark读写habase必备jar包,有余力的同学可以自己修改定义。
spark_python_ml_examples, Spark 2.0 python 机器学习示例 Spark python-机器学习示例这个库是 Apache Spark 示例系列的一部分,旨在演示如何用Spark支持的不同编程语言实现机器学习解决方案的实现。 Java是唯一未...
解决Spark与Hbase版本升级后Pyspark写入,报错找不到StringToImmutableBytesWritableConverte类和找不到:org.apache.hadoop.hbase.client.Put类中的add方法的问题
3. Spark Streaming:支持实时数据流处理,通过微批次处理实现高吞吐量和容错性。 4. MLlib:Spark的机器学习库,包含了各种常见的机器学习算法和工具。 5. GraphX:用于图处理的API,可以进行图计算和分析。 在...
algorithm examples utilize the Spark and Neo4j platforms, this book will also be helpful for understanding more general graph concepts, regardless of your choice of graph technologies. The first two ...
Companion Software for Digitale Signalverarbeitung: Grundlagen und Anwendungen Beispiele und Ubungen mit MATLAB (Digital Signal Processing: Fundamentals and ... Examples and Exercises with MATLAB)
spark-examples Anagrams、BigramAnalysis、Inverted index、vihicle 使用在这一点上是稳定的,所有其他类都是 WIP。 一些示例依赖于本地文件,我将修复这些输入路径。 为了从本地机器运行示例,请确保您的路径中...
spark-3.5.0-bin-hadoop3.tgz 是Apache Spark的一个特定版本,针对Hadoop 3.x版本进行了优化和构建。Apache Spark是一个强大的分布式计算系统,用于大数据处理和分析。它提供了高效的数据处理能力,支持多种编程语言...
Probability: Theory and Examples,by Durrent,概率论专业经典教材。答案很少见,特意贡献~~~ 教材电子版已更新至第四版,可以在Durrent‘s homepage下载~!