`
bit1129
  • 浏览: 1069794 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark二十】运行Spark Streaming的NetworkWordCount实例

 
阅读更多

Spark Streaming简介

 

NetworkWordCount代码

 

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/
object NetworkWordCount {
	def main(args: Array[String]) {
		if (args.length < 2) {
		System.err.println("Usage: NetworkWordCount <hostname> <port>")
		System.exit(1)
	}
	StreamingExamples.setStreamingLogLevels()
	// Create the context with a 1 second batch size
	// 创建SparkConf实例
	val sparkConf = new SparkConf().setAppName("NetworkWordCount")
	
	///创建Spark Streaming Context,每隔1秒钟处理一批数据,那么这一秒收集的数据存放在哪,如何将收集的数据推送出去?是生产者主动推出去还是消费者每隔1秒钟来拉取一次数据
	val ssc = new StreamingContext(sparkConf, Seconds(1))
	
	// Create a socket stream on target ip:port and count the
	// words in input stream of \n delimited text (eg. generated by 'nc')
	// Note that no duplication in storage level only for running locally.
	// Replication necessary in distributed scenario for fault tolerance.
	val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
	//flatMap是把将每一行使用空格做分解,那么words对应的数据结构是怎么样的?
        ///words是个集合,每个集合元素依然是个集合,这个集合存放单词
        val words = lines.flatMap(_.split(" "))
	val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
	wordCounts.print()
	//启动计算作业
	ssc.start()
	
	//等待结束,什么时候结束作业,即触发什么条件会让作业执行结束
	ssc.awaitTermination()
   }
}

 

 运行NetworkWordCount

 1. 启动NetCat命令

 

[hadoop@hadoop ~]$ nc -lk 9999
This is a book 

启动后,可以在后面输入文本,比如This is a book

 

2. 在另外一个终端,Spark提交NetworkWordCount任务

 

./bin/run-example streaming.NetworkWordCount localhost 9999

 

启动后,发现每隔一秒钟,打印一行日志,例如

 

15/01/11 00:10:09 INFO scheduler.JobScheduler: Added jobs for time 1420953009000 ms
15/01/11 00:10:10 INFO scheduler.JobScheduler: Added jobs for time 1420953010000 ms
15/01/11 00:10:11 INFO scheduler.JobScheduler: Added jobs for time 1420953011000 ms
15/01/11 00:10:12 INFO scheduler.JobScheduler: Added jobs for time 1420953012000 ms
15/01/11 00:10:13 INFO scheduler.JobScheduler: Added jobs for time 1420953013000 ms
15/01/11 00:10:14 INFO scheduler.JobScheduler: Added jobs for time 1420953014000 ms
15/01/11 00:10:15 INFO scheduler.JobScheduler: Added jobs for time 1420953015000 ms
15/01/11 00:10:16 INFO scheduler.JobScheduler: Added jobs for time 1420953016000 ms
15/01/11 00:10:17 INFO scheduler.JobScheduler: Added jobs for time 1420953017000 ms
15/01/11 00:10:18 INFO scheduler.JobScheduler: Added jobs for time 1420953018000 ms

///Added jobs是个什么概念?不就是一个Job吗
15/01/11 00:10:19 INFO scheduler.JobScheduler: Added jobs for time 1420953019000 ms
15/01/11 00:10:20 INFO scheduler.JobScheduler: Added jobs for time 1420953020000 ms
15/01/11 00:10:21 INFO scheduler.JobScheduler: Added jobs for time 1420953021000 ms
15/01/11 00:10:22 INFO scheduler.JobScheduler: Added jobs for time 1420953022000 ms
15/01/11 00:10:23 INFO scheduler.JobScheduler: Added jobs for time 1420953023000 ms
15/01/11 00:10:24 INFO scheduler.JobScheduler: Added jobs for time 1420953024000 ms
15/01/11 00:10:25 INFO scheduler.JobScheduler: Added jobs for time 1420953025000 ms
15/01/11 00:10:26 INFO scheduler.JobScheduler: Added jobs for time 1420953026000 ms
15/01/11 00:10:27 INFO scheduler.JobScheduler: Added jobs for time 1420953027000 ms
15/01/11 00:10:28 INFO scheduler.JobScheduler: Added jobs for time 1420953028000 ms

 

3. 在nc -lk运行的终端,输入文本,发现Spark作业没有将文本输出

不过通过nc -lk输入一行文本后,控制台会显示如下信息

 

5/01/11 00:29:08 INFO storage.MemoryStore: ensureFreeSpace(11) called with curMem=91198, maxMem=280248975
15/01/11 00:29:08 INFO storage.MemoryStore: Block input-0-1420954147800 stored as bytes in memory (estimated size 11.0 B, free 267.2 MB)
15/01/11 00:29:08 INFO scheduler.JobScheduler: Added jobs for time 1420954148000 ms
15/01/11 00:29:08 INFO storage.BlockManagerInfo: Added input-0-1420954147800 in memory on localhost:57786 (size: 11.0 B, free: 267.2 MB)
15/01/11 00:29:08 INFO storage.BlockManagerMaster: Updated info of block input-0-1420954147800
15/01/11 00:29:08 INFO receiver.BlockGenerator: Pushed block input-0-1420954147800

 

4. 关闭nc命令的执行,此时,Spark报错,提示9999端口连接不上

 

///相对于数据源而言,Spark是Receiver,所以Spark Streaming有Receiver这个模块
15/01/11 00:09:36 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0
15/01/11 00:09:37 INFO scheduler.JobScheduler: Added jobs for time 1420952977000 ms
15/01/11 00:09:38 INFO scheduler.JobScheduler: Added jobs for time 1420952978000 ms
///重提开始Receiver
15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Starting receiver again
15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Starting receiver
15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
15/01/11 00:09:38 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver
15/01/11 00:09:38 INFO receiver.ReceiverSupervisorImpl: Receiver started again
15/01/11 00:09:38 INFO dstream.SocketReceiver: Connecting to localhost:9999
15/01/11 00:09:38 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:9999
java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at java.net.Socket.connect(Socket.java:528)
	at java.net.Socket.<init>(Socket.java:425)
	at java.net.Socket.<init>(Socket.java:208)
	at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)

 

关于接不到消息

下面的代码收不到消息

 

val sparkConf = new SparkConf().setAppName("SparkStreamingExample").setMaster("local")

 

而下面的代码则能收到消息

 

val sparkConf = new SparkConf().setAppName("SparkStreamingExample").setMaster("local[2]")

原因来自于http://spark.apache.org/docs/latest/streaming-programming-guide.html:

  • When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).

  • Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.

 关于Receiver的线程数

上面提到,有一个线程用于运行Receiver,实际中,可能需要多个线程运行Receiver:

Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data. Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). For example, a single Kafka input DStream receiving two topics of data can be split into two Kafka input streams, each receiving only one topic. This would run two receivers on two workers, thus allowing data to be received in parallel, and increasing overall throughput. These multiple DStream can be unioned together to create a single DStream. Then the transformations that was being applied on the single input DStream can applied on the unified stream. This is done as follows

 

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

 

关于Spark内部处理数据的并行数

Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like reduceByKey and reduceByKeyAndWindow, the default number of parallel tasks is controlled by thespark.default.parallelism configuration property. You can pass the level of parallelism as an argument (see PairDStreamFunctions documentation), or set the spark.default.parallelism configuration property to change the default.

 

关于谁先启动:

如果SparkStreaming先启动而9999端口后启动,会发生什么样的情况?

首先,SparkStreaming每次到了时间点都会提示连接失败,也就是说,每次操作都是尝试新建连接。

1. 实际上,当SparkStreaming每次时间间隔到了进行提交作业时,都会首先start Receiver,本次Job运行完成时再stop Receiver,如下是一个时间点在没有开启9999的一个情况

[hadoop@hadoop ~]$ cat spark.log | grep "2015-02-20 00:50:04"
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Slicing from 1424411402000 ms to 1424411404000 ms (aligned to 1424411402000 ms and 1424411404000 ms)
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Added jobs for time 1424411404000 ms
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Starting job: foreachRDD at LogAnalyzerStreamingSQL.scala:44
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Job 238 finished: foreachRDD at LogAnalyzerStreamingSQL.scala:44, took 0.000036 s
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Starting job streaming job 1424411404000 ms.0 from job set of time 1424411404000 ms
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Finished job streaming job 1424411404000 ms.0 from job set of time 1424411404000 ms
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Total delay: 0.012 s for time 1424411404000 ms (execution: 0.000 s)
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Removing RDD 711 from persistence list
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Removing RDD 711
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Removing RDD 704 from persistence list
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Removing RDD 704
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Removing RDD 703 from persistence list
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Removing RDD 703
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Removing blocks of RDD BlockRDD[703] at socketTextStream at LogAnalyzerStreamingSQL.scala:38 of time 1424411404000 ms
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Deleting batches ArrayBuffer()
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Deleting batches ArrayBuffer(1424411400000 ms)
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Starting receiver again
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Starting receiver
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Called receiver onStart
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Registered receiver for stream 0 from akka://sparkDriver
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Receiver started again
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Connecting to hadoop.master:9999
[WARN ] [2015-02-20 00:50:04] [Logging$class:logWarning:92] Restarting receiver with delay 2000 ms: Error connecting to hadoop.master:9999
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Stopping receiver with message: Restarting receiver with delay 2000ms: Error connecting to hadoop.master:9999: java.net.ConnectException: Connection refused
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Called receiver onStop
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Deregistering receiver 0
[ERROR] [2015-02-20 00:50:04] [Logging$class:logError:75] Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to hadoop.master:9999 - java.net.ConnectException: Connection refused
[INFO ] [2015-02-20 00:50:04] [Logging$class:logInfo:59] Stopped receiver 0

 

 上面的日志有Starting receiver和Stopped Receiver的输出,同时,也有java.net.Connection refused的异常

 

2. 虽然9999端口没有监听上,但是此时由于提交了spark streaming application,那么spark streaming application已经开始正常运行了,即上面的wordcount例子,words这个RDD为空,下面的代码更能说明问题:

 

package spark.examples.databricks.reference.apps.loganalysis

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Duration}

/**
 * The LogAnalyzerStreamingSQL is similar to LogAnalyzerStreaming, except
 * it computes stats using Spark SQL.
 *
 * To feed the new lines of some logfile into a socket for streaming,
 * Run this command:
 *   % tail -f [YOUR_LOG_FILE] | nc -lk 9999
 *
 * If you don't have a live log file that is being written to,
 * you can add test lines using this command:
 *   % cat ../../data/apache.access.log >> [YOUR_LOG_FILE]
 *
 * Example command to run:
 * % spark-submit
 *   --class "com.databricks.apps.logs.chapter1.LogAnalyzerStreaming"
 *   --master local[4]
 *   target/scala-2.10/spark-logs-analyzer_2.10-1.0.jar
 */
object LogAnalyzerStreamingSQL {
  val WINDOW_LENGTH = new Duration(4 * 1000)
  val SLIDE_INTERVAL = new Duration(2 * 1000)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming SQL in Scala")
    val sc = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sc)
    import sqlContext.createSchemaRDD

    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)

    //首先应该监听上localhost:9999端口,如果不监听会发生什么情况?
    //下面的逻辑  println("No access com.databricks.app.logs received in this time interval")已经执行了
    //同时日志中报出Connection Refused错误
    val logLinesDStream = streamingContext.socketTextStream("localhost", 9999)

    //转换成DStream[ApacheAccessLog]
    val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache()

    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)

    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.count() == 0) {
        println("No access com.databricks.app.logs received in this time interval")
      } else {
        accessLogs.registerTempTable("TBL_ACCESS_LOG")

        // Calculate statistics based on the content size.
        val contentSizeStats = sqlContext
          .sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM TBL_ACCESS_LOG")
          .first()
        println("Content Size Avg: %s, Min: %s, Max: %s".format(
          contentSizeStats.getLong(0) / contentSizeStats.getLong(1),
          contentSizeStats(2),
          contentSizeStats(3)))

        // Compute Response Code to Count.
        val responseCodeToCount = sqlContext
          .sql("SELECT responseCode, COUNT(*) FROM TBL_ACCESS_LOG GROUP BY responseCode")
          .map(row => (row.getInt(0), row.getLong(1)))
          .take(1000)
        println(s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""")

        // Any IPAddress that has accessed the server more than 10 times.
        val ipAddresses =sqlContext
          .sql("SELECT ipAddress, COUNT(*) AS total FROM TBL_ACCESS_LOG GROUP BY ipAddress HAVING total > 10")
          .map(row => row.getString(0))
          .take(100)
        println(s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""")

        val topEndpoints = sqlContext
          .sql("SELECT endpoint, COUNT(*) AS total FROM TBL_ACCESS_LOG GROUP BY endpoint ORDER BY total DESC LIMIT 10")
          .map(row => (row.getString(0), row.getLong(1)))
          .collect()
        println(s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""")
      }
    })

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

 虽然9999端口没有开启监听,但是spark streaming application的console,定时的输出No access com.databricks.app.logs received in this time interval

 

 

 

 

 

 

分享到:
评论

相关推荐

    Spark Streaming 示例

    `SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...

    【SparkStreaming篇02】SparkStreaming之Dstream创建1

    需要注意的是,为了使SparkStreaming能够持续运行并处理数据,我们需要调用`ssc.start()`来启动计算任务,并调用`ssc.awaitTermination()`等待任务结束。在这个例子中,我们向队列中添加了5个RDD,每个包含1到300的...

    Scala代码积累之spark streaming kafka 数据存入到hive源码实例

    Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。

    spark -streaming实例

    spark streaming demo,应用实例代码,可供参考学习整理

    06Spark Streaming原理和实践

    ### Spark Streaming原理与实践 #### 一、为什么需要流处理? 传统的批处理框架如MapReduce在处理实时数据流时存在一些局限性,主要是因为每次处理一批数据就需要启动一个新任务,而任务的启动过程(包括输入切分...

    SparkStreaming和kafka的整合.pdf

    - **配置Spark Streaming**:通过`SparkConf`配置Spark Streaming环境,指定应用名称和运行模式。 - **Kafka参数配置**:`kafkaParams`定义了Kafka的相关参数,包括Broker列表、消费者组ID以及自动偏移重置策略等。 ...

    Spark Streaming实时流处理项目实战.rar.rar

    Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...

    7.SparkStreaming(下)--SparkStreaming实战.pdf

    -SparkStreaming原理介绍.pdf7.SparkStreaming(下)--SparkStreaming实战.pdf8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf8.SparkMLlib(下)--SparkMLlib实战.pdf9.SparkGraphX介绍及实例.pdf10.分布式内存...

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    4.Spark运行架构.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    sparkStreaming实战学习资料

    当你使用Java或Scala编写Spark应用程序时,通常会在`main`方法中创建一个`SparkContext`实例。如果是在Spark Shell中开发,则`SparkContext`会被自动创建,并可以通过预定义的变量`sc`访问。 #### Executor 在Spark...

    9.SparkGraphX介绍及实例.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    Spark 入门实战系列

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    8.SparkMLlib(下)--SparkMLlib实战.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    Spark Streaming解析

    为了运行 Spark Streaming 应用程序,首先需要在 IDE(如 IntelliJ IDEA)中设置项目,并添加相应的依赖库。在 `pom.xml` 文件中添加 Spark Streaming 的依赖: ```xml &lt;groupId&gt;org.apache.spark &lt;artifactId&gt;...

    03_尚硅谷大数据技术之SparkStreaming1

    【Spark Streaming 概述】 Spark Streaming 是 Apache Spark 项目中的一个模块,专为实时数据流处理设计。它提供了一个高级抽象,使得开发人员能够轻松构建可扩展且容错的流处理应用。Spark Streaming 支持多种数据...

    Kafka集成Spark Streaming并写入数据到HBase

    **Kafka、Spark Streaming与HBase的集成** 在大数据处理领域,Kafka作为一个高吞吐量的分布式消息系统,常用于实时数据流处理;Spark Streaming则提供了基于微批处理的实时计算框架,能够高效地处理持续的数据流;...

    2.Spark编译与部署(下)--Spark编译安装.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    Spark算子实例maven版

    Spark的核心特性包括对批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)和机器学习(MLlib)的支持。 在本项目中,我们重点关注的是Spark的算子应用,这些算子是Spark编程模型的基础,允许开发者...

Global site tag (gtag.js) - Google Analytics