`
m635674608
  • 浏览: 5061643 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

SparkStream demo

 
阅读更多

炼数成金 课程

1、监控本地文件夹下的文件信息

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._


object HdfsWordCount {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")//这里指在本地运行,2个线程,一个监听,一个处理数据
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(20))// 时间划分为20秒

    val lines = ssc.textFileStream("/home/mmicky/temp/")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

   2、 网络socket监控

 

   1)构建socket模拟周期发送数据

   

import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source

object SaleSimulation {
  def index(length: Int) = { //销售模拟器:参数1:读入的文件;参数2:端口;参数3:发送时间间隔ms
    import java.util.Random
    val rdm = new Random

    rdm.nextInt(length)
  }

  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println("Usage: <filename> <port> <millisecond>")
      System.exit(1)
    }

    val filename = args(0)
    val lines = Source.fromFile(filename).getLines.toList
    val filerow = lines.length

    val listener = new ServerSocket(args(1).toInt)
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args(2).toLong)
            val content = lines(index(filerow))
            println(content)
            out.write(content + '\n')
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}

   

运行:java -cp week5.jar week5.SaleSimulation /home/mmicky/data/spark/people.txt 9999 1000 //从people文件随机读取,发送端口9999,间隔1秒

   2)sparkStream 监控端

   

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))//5秒间隔
 
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)// 服务器地址,端口,序列化方案
    val words = lines.flatMap(_.split(","))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

    3、监控有状态(stateful)

     

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object StatefulWordCount {
  def main(args: Array[String]) {

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { //StateFul需要定义的处理函数,第一个参数是本次进来的值,第二个是过去处理后保存的值
      val currentCount = values.foldLeft(0)(_ + _)<span style="white-space:pre">	</span>//求和
      val previousCount = state.getOrElse(0)<span style="white-space:pre">		</span>// 如果过去没有 即取0
      Some(currentCount + previousCount)<span style="white-space:pre">		</span>// 求和<span style="white-space:pre">	</span>
    }

    val conf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //创建StreamingContext
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(".")<span style="white-space:pre">		</span>//因为是有状态的,需要保存之前的信息,所以这里设定了 checkpoint的目录,以防断电后内存数据丢失。
<span style="white-space:pre">				</span>//这里因为没有设置checkpoint的时间间隔,所以会发现每一次数据块过来 即切分一次,产生一个 .checkpoint 文件
    //获取数据
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(","))
    val wordCounts = words.map(x => (x, 1))

    //使用updateStateByKey来更新状态
    val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)<span style="white-space:pre">	</span>//调用 处理函数 updateFunc
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()  
  }
}

    4、windows操作

    

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //创建StreamingContext
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(".")

    // //获取数据
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
    val words = lines.flatMap(_.split(","))

    //windows操作
    val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
<span style="white-space:pre">		</span>//第二个参数是 windows的窗口时间间隔,比如是 监听间隔的 倍数,上面是 5秒,这里必须是5的倍数。eg :30
<span style="white-space:pre">		</span>//第三个参数是 windows的滑动时间间隔,也必须是监听间隔的倍数。eg :10
<span style="white-space:pre">		</span>//那么这里的作用是, 每隔10秒钟,对前30秒的数据, 进行一次处理,这里的处理就是 word count。
    //val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))
<span style="white-space:pre">		</span>//这个是优化方法, 即加上上一次的结果,减去 上一次存在又不在这一次的数据块的部分。

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
} 

   http://blog.csdn.net/escaflone/article/details/43341275

   http://www.aboutyun.com/thread-8900-1-1.html

分享到:
评论

相关推荐

    SparkDemo.rar

    Spark Streaming通过DStream(Discretized Stream)来抽象时间序列的数据,允许开发者处理来自各种源(如Kafka、Flume等)的实时数据。在SparkDemo中,你可以学习如何设置流处理作业,接收数据,以及执行实时分析。...

    JavaSparkStreaming-kafkaDemo

    2. **创建DStream**: 使用`JavaInputDStream`从Kafka创建一个Discretized Stream(DStream),这是Spark Streaming中的基本抽象,代表了一个连续的数据流。 3. **数据处理**: 对DStream进行操作,这可能包括转换...

    demo-kafka-sparkstream-hbase

    演示-kafka-sparkstream-hbase 通过 SparkStreaming 从 Kafka 加载数据到 HBase 表的演示。 以分钟为基础计算 MIN、MAX、AVG(SUM、CNT)。 Kafka 主题:demo-stream-topic HBase 表:演示日志HBase 家族:demo-ts...

    阿里云emr spark kafka redis MongoDB例子demo

    在这个"阿里云EMR spark kafka redis MongoDB例子demo"中,我们看到了如何整合这些技术,构建一个实时数据处理系统,从Kafka获取数据,利用Spark Streaming进行实时分析,然后将结果存储到Redis和MongoDB,实现了...

    Spark Streaming 示例

    在创建 Spark Streaming 应用时,你需要定义 DStream(Discretized Stream),这是 Spark Streaming 对实时数据流的基本抽象。DStream 可以通过从各种源(如 Kafka)接收数据,然后进行转换和操作。在与 Kafka 集成...

    test-spark.zip

    demo包含了spark streaming 相关的kafkaStream、队列Stream、socketStream、目录文本Stream测试demo,及单词统计,累加器、广播变量,hutool-setting配置库实例,idea或eclipse导入maven工程后,下载相关库后即可...

    spark大数据案例

    总结,本压缩包中的SparkDemo案例全面展示了Spark在大数据处理中的应用,包括基础的Spark Core操作,高级的SQL查询,实时流处理以及与Hadoop的紧密集成。通过深入学习和实践这些案例,你将能够熟练掌握Spark,并应用...

    spark streaming 2.3.0

    spark streaming 2.3.0 API 中文翻译介绍,知识点整理,demo介绍

    达内Java大数据 Day03练习题及答案

    10. **大数据技术**:虽然题目主要关注Java编程,但考虑到标签“达内大数据”,可能涉及到Hadoop、Spark、Flink等大数据处理框架的基础知识,如MapReduce编程模型、DataFrame API等。 以上是根据给定的文件名和主题...

    数据开发过程辅助文档(开发规范)

    接下来,文档提到了组件概览,涵盖了大数据开发中常用的一些工具和技术,如YARN、Spark、Kafka、Kudu、Hive、Doris、Streamsets、Azkaban、IDEA、Gitlab、Jenkins、RedMine等。这些组件在数据处理、任务调度、代码...

Global site tag (gtag.js) - Google Analytics