`
sillycat
  • 浏览: 2550678 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Spark(3)More Examples and Deployment

 
阅读更多

Spark(3)More Examples and Deployment

1. More Spark Examples
1.1 Text Search
val file = spark.textFile()
val errors = file.filter(line => line.contains("ERROR"))
//count all the errors
errors.count()

errors.filter(line => line.contains("MySQL")).count()
errors.filter(line => line.contains("MySQL")).collect()
//show all the error messages related to MYSQL

1.2 IN-Memory text search
errors.cache()

1.3 Word Count
val file = spark.textFile()
val counts = file.flatMap(line =>line.split(" "))
                         .map(word => (word, 1))
                         .reduceByKey(_ + _)
counts.saveAsTextFile()

2. We can find more Code Examples
https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/examples

3. My Example
val item1 = Product(None,"CK","good",DateTime.now)
val item2 = Product(None,"apple","good",DateTime.now)
val item3 = Product(None,"nike","bad",DateTime.now)
val item4 = Product(None,"cat","bad",DateTime.now)
val products = Seq(item1,item2,item3,item4)

val name = "good"

val rdd = sc.makeRDD(products,2)


val rdd2 = rdd.groupBy{ s => s.productName == name}


println("rdd first array ============" + rdd2.toArray()(0)._2)
println("rdd second array ============" + rdd2.toArray()(1)._2)


rdd2.toArray()(1)._2.foreach{  s =>
       println("Products are good ============== " + s)
       //persist
}

sc.stop


The output of this example will be as follow:
Products are good ============== Product(None,CK,good,2013-07-09T10:04:03.949-05:00)
Products are good ============== Product(None,apple,good,2013-07-09T10:04:04.075-05:00)

In my official example, we can also put the DAO layer in another project, spark only deal with the calculate things.

One problem we need to take care about is 
Every class we need to use in the closure of spark, we need to make sure it is extends from Serializable 

4. Deployment of Spark - Spark Standalone Mode
4.1 Starting a Cluster Manually
>cd SPARK_HOME
>./run spark.deploy.master.Master

We can visit the UI page with URL http://localhost:8080/

This should be the URL of spark master spark://Carls-MacBook-Pro.local:7077

I can also change the URL of spark with these parameters
>./run spark.deploy.master.Master -i localhost

My master URL will be spark://localhost:7077 then.

And I also can start one work node with this command
>./run spark.deploy.worker.Worker spark://localhost:7077 

After that we can also see the URL and other information on the UI of master
Carls-MacBook-Pro.local:52875

I also want to define the HOST part of the URL with parameters.
>./run spark.deploy.worker.Worker spark://localhost:7077 -i localhost

There are a lot of other parameters
-i IP, --ip IP 
-p PORT, --port PORT
--webui-port PORT         Port for web UI(default: 8080 for master, 8081 for worker)
-c CORES, --cores CORES   only on worker, Total CPU cores to allow Spark jobs to use
-m MEM, --memory MEM      Total amount of memory to allow Spark jobs to use, e.g.: 1000M or 2G, only on worker
-d DIR, --work-dir DIR             Default is SPARK_HOME/work

And there is another way to launch the Cluster
4.2 Cluster Launch Scripts
Prepare the Server to Work on
Add user on Remote Server
>/usr/sbin/useradd spark

Give a password
>passwd spark

Generate the Key Pair on Client Server
>ssh-keygen -t rsa

The pairs will be generated here /Users/carl/.ssh/

Place the public key on remote server
>su spark
>mkdir ~/.ssh
>vi authorized_keys
Place the content in public key here

>chmod 711 ~/.ssh
>chmod 644 ~/.ssh/authorized_keys

Try from the client Server
>sudo vi config
Place something like this

Host server1.com
 user root
 identityFile "~/.ssh/demo1"
Host server2.com
 user spark
 identityFile "~/.ssh/demo2"

Then I can connect to the server directly now.

The cluster scripts are as follow:
bin/start-master.sh
bin/start-slaves.sh
bin/start-all.sh
bin/stop-master.sh
bin/stop-slaves.sh
bin/stop-all.sh

Need to execute these commands on the master machines.
Watch on the file SPARK_HOME/conf/slaves

There is only localhost right now. And we can config the parameters in SPARK_HOME/conf/spark-env.sh
SCALA_HOME=/opt/scala2.10.0 
SPARK_MASTER_IP=localhost 

Then I can start the cluster with command
>bin/start-all.sh

Error Message:
Carls-MacBook-Pro:spark carl$ ssh localhost
ssh: connect to host localhost port 22: Connection refused

We can get the same error Message with command
>ssh localhost

Solution:
System Preferences -----> Internet & Wireless -------> Sharing -----> Check 'Remote Login'
System Preferences -----> Internet & Wireless -------> Sharing -----> click 'Edit' to change the name of my MAC book

4.3 Connecting the Job to the Cluster
>MASTER=spark://IP:PORT ./spark-shell

Connecting with App.
  
val sparkMaster = "spark://192.168.10.115:7070"
  val sc = new SparkContext(sparkMaster,
    "Complex Job",
    "/opt/spark",
    List("target/scala-2.10/easysparkserver_2.10-1.0.jar"),
    Map())

Try to use app to connect to master
Error Message:
3:40:55 INFO client.Client$ClientActor: Connecting to master spark://192.168.10.115:7070
13/07/09 13:40:55 ERROR client.Client$ClientActor: Connection to master failed; stopping client

Solution:
Error port number, change to 7077

Better, but still get Error Message
13/07/09 13:44:54 INFO cluster.ClusterScheduler: Adding task set 1.0 with 2 tasks
13/07/09 13:45:09 WARN cluster.ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered

13/07/09 14:31:19 ERROR NettyRemoteTransport(akka://sparkMaster@localhost:7077): RemoteServerError@akka://sparkMaster@localhost:7077] Error[java.io.OptionalDataException] 

Solution:
>./run spark.examples.SparkPi spark://localhost:7077
It is working, so the cluster should be fine.

Finally I found the reason. I need to start the spark server with sudo user.

Error Message:
13/07/09 14:45:03 ERROR executor.Executor: Exception in task ID 0
java.lang.NoClassDefFoundError: Lorg/joda/time/DateTime;
        at java.lang.Class.getDeclaredFields0(Native Method) 
        at java.lang.Class.privateGetDeclaredFields(Class.java:2339)
13/07/09 14:45:03 ERROR executor.Executor: Exception in task ID 1
java.lang.NoClassDefFoundError: Lorg/joda/time/DateTime; 
        at java.lang.Class.getDeclaredFields0(Native Method)

Solution:
>sbt assembly 

Error Message
[info] Merging 'javax/servlet/SingleThreadModel.class' with strategy 'deduplicate'
java.lang.RuntimeException: deduplicate: different file contents found in the following:
/Users/carl/.ivy2/cache/javax.servlet/servlet-api/jars/servlet-api-2.5.jar:javax/servlet/SingleThreadModel.class
/Users/carl/.ivy2/cache/org.mortbay.jetty/servlet-api-2.5/jars/servlet-api-2.5-6.1.14.jar:javax/servlet/SingleThreadModel.class
/Users/carl/.ivy2/cache/org.mortbay.jetty/servlet-api/jars/servlet-api-2.5-20081211.jar:javax/servlet/SingleThreadModel.class

javax/servlet/SingleThreadModel 
org/apache/jasper/compiler/Node 
org/fusesource/jansi/Ansi$1.class' with strategy 'deduplicate' 
'org/apache/commons/beanutils/converters/FloatArrayConverter 
META-INF/native/osx/libjansi.jnilib' with strategy 'deduplicate' 
Merging 'org/apache/commons/collections 
Merging 'META-INF/NOTICE.txt' with strategy 'deduplicate' 
'META-INF/native/windows32/jansi.dll' with strategy 'reduplicate'
'about.html' with strategy 'reduplicate'

Solution:
import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
    case PathList("org", "apache", "jasper", xs @ _*) => MergeStrategy.first
    case PathList("org", "fusesource", xs @ _*) => MergeStrategy.first
    case PathList("org", "apache", "commons", "beanutils", xs @ _*) => MergeStrategy.first
    case PathList("org", "apache", "commons", "collections", xs @ _*) => MergeStrategy.first
    case PathList("META-INF", xs @ _*) =>
      (xs map {_.toLowerCase}) match {
            case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
              MergeStrategy.discard
            case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
              MergeStrategy.discard
            case "plexus" :: xs =>
              MergeStrategy.discard
            case "services" :: xs =>
              MergeStrategy.filterDistinctLines
            case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) =>
              MergeStrategy.filterDistinctLines
            case ps @ (x :: xs) if ps.last.endsWith(".jnilib") || ps.last.endsWith(".dll") =>
              MergeStrategy.first
            case ps @ (x :: xs) if ps.last.endsWith(".txt") =>
              MergeStrategy.discard
            case ("notice" :: Nil) | ("license" :: Nil)=>
              MergeStrategy.discard
            case _ => MergeStrategy.deduplicate
      }
    case "application.conf" => MergeStrategy.concat
    case "about.html" => MergeStrategy.discard
    case x => old(x)
  }
}

>sbt clean update compile package assembly
>sbt run

That is it. I hope it works fine. Certainly, I need to change the SparkContext as follow:
  val sc = new SparkContext(sparkMaster,
    "Complex Job", "/opt/spark",
    List("/Users/carl/work/easy/easysparkserver/target/easysparkserver-assembly-1.0.jar"), Map())

Alternatively

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
  cp filter {_.data.getName == "parboiled-scala_2.10.0-RC5-1.1.4.jar" }
  cp filter {_.data.getName == "minlog-1.2.jar" }
}

References:
spark shell
http://spark-project.org/docs/latest/quick-start.html

Spark Grammer and Examples
http://spark-project.org/examples/

cassandra thrift
http://wiki.apache.org/cassandra/ThriftExamples
http://wiki.apache.org/cassandra/ClientExamples

API document
http://spark-project.org/docs/latest/scala-programming-guide.html

Running Configuration
http://spark-project.org/docs/latest/running-on-yarn.html
http://spark-project.org/docs/latest/running-on-mesos.html
http://spark-project.org/docs/latest/spark-standalone.html

private key with SSH
http://sillycat.iteye.com/blog/1100363
http://sillycat.iteye.com/blog/1756114

sbt assembly
https://github.com/sbt/sbt-assembly

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics