`

spark streaming准实时计算demo

阅读更多

 

 

package com.chexun.statistic

import java.sql.{Connection, DriverManager}
import java.util.Date

import com.chexun.statistic.RealTimeAdv._
import kafka.serializer.StringDecoder
import org.apache.commons.lang.time.DateFormatUtils
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 记录最近1分钟的数据
 * Created by hanyiting on 2015/08/13.
 */
object  RealtimeCount {

  case class AdvLoging(vtime: Long, userIp: Long, muid: String, uid: String, ucp: String, adurl: String)

  case class Adv(userIp: Long, muid: String, ucp: String, adurl: String, location: String)

  def main(args: Array[String]) {

    val url = "jdbc:mysql://10.0.0.198:3306/test"
    val usr = "test"
    val pwd = "test"

    val sparkConf = new SparkConf().set("spark.streaming.unpersist", "true").set("spark.cleaner.ttl", "43200")
      .setExecutorEnv("SPARK_JAVA_OPTS", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps").setAppName("RealtimeCount")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(60))


    //define the kafka parameters, broker list must be specified
    val kafkaParams = Map("metadata.broker.list" -> "10.0.0.37:9092,10.0.0.30:9092,10.0.0.35:9092,10.0.0.26:9092,10.0.0.27:9092")

    //define which topics to read from
    val topics = Set("chexun1", "chexun2")

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(x => x._2)

    //过滤掉adurl为空的数据,然后进行转换,提取出location,并过滤掉location为空的数据
    val tmpdf = lines.map(_.split("\t")).map(x => AdvLoging(x(9).toLong, x(8).toLong, x(1), x(0), x(3), x(24))).filter(y => (y.adurl != null && !y.adurl.equals("null"))).map(x => Adv(x.userIp, x.muid, x.ucp, getUrl(x.adurl), getLocation(x.adurl))).filter(z => z.location != null && !("").equals(z.location))
    tmpdf.foreachRDD { rdd =>
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      val df = rdd.toDF().registerTempTable("adv")
      //获取当前时间,精确到分
      val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:00")
      //对不同位置的广告进行分组,求pv和uv
      val rcount = sqlContext.sql("select location,count(*),count(distinct muid) from adv group by location").foreachPartition(
        datas => {
          val conn: Connection = DriverManager.getConnection(url, usr, pwd)
          val pstat = conn.prepareStatement("insert into loging_adv_realtime(stat_time,location,pv,uv) values (?,?,?,?)")
          for (data <- datas) {
            pstat.setString(1, stattime)
            pstat.setString(2, data(0).toString)
            pstat.setString(3, data(1).toString)
            pstat.setString(4, data(2).toString)
            pstat.executeUpdate()
          }
        }
      )
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

 

分享到:
评论

相关推荐

    基于Spark Streaming的大数据实时流计算平台和框架,并且是基于运行在yarn模式运行的spark streaming

    一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming

    Spark Streaming 示例

    `SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...

    SparkDemo.rar

    《Spark技术深度解析:从SparkCount到SparkSQL与SparkStreaming》 Spark,作为大数据处理领域的重要框架,以其高效、易用的特点受到了广泛的关注。在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark...

    Scala代码积累之spark streaming kafka 数据存入到hive源码实例

    Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。

    基于Scala的Spark RDD、Spark SQL、Spark Streaming相关Demo设计源码

    这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...

    阿里云emr spark kafka redis MongoDB例子demo

    在这个"阿里云EMR spark kafka redis MongoDB例子demo"中,我们看到了如何整合这些技术,构建一个实时数据处理系统,从Kafka获取数据,利用Spark Streaming进行实时分析,然后将结果存储到Redis和MongoDB,实现了...

    spark-streaming-demo

    此应用程序的主要目的是演示和 DSE 中的 Spark Streaming 功能。 它产生两种类型的结果: ...cd datastax-spark-streaming-demo sbt assembly run 在 DSE 集群上部署 Spark 应用程序: dse spark-submit --c

    SparkStreaming之WordCount案例

    一、案例简介 使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统 计不同单词出现的次数 。 二、netcat操作 1、虚拟机中安装netcat ...object SparkStreamingDemo { def main(arg

    大数据Spark技术分享 使用Streaming Spark和FPGAaaS加速实时分析 共25页.pdf

    ### 大数据Spark技术分享:使用Streaming Spark与FPGAaaS加速实时分析 #### 概述 随着大数据时代的到来,实时数据分析成为了许多业务场景的核心需求。本篇内容将深入探讨如何利用Spark Streaming结合现场可编程...

    sparkdemo_202108.7z

    3. Spark Streaming:处理实时数据流的模块,基于微批次处理,提供高吞吐量和容错性。 4. MLlib:Spark的机器学习库,提供了多种常用的机器学习算法和工具,如分类、回归、聚类等。 5. GraphX:用于图处理的API,...

    SparkDemo12

    - **Spark Streaming**:处理实时流数据,通过微批处理实现高吞吐量。 - **MLlib**:机器学习库,提供多种算法和实用工具。 - **GraphX**:处理图形数据的API。 4. **Spark运行模式** - **Local**:在本地单机...

    JavaSparkStreaming-kafkaDemo

    **SparkStreaming-Kafka_demo** 的核心部分将包括以下步骤: 1. **设置和连接**: 首先,我们需要设置Spark和Kafka的连接参数,如Kafka的broker列表、topic名以及Spark的streaming超时时间等。 2. **创建DStream**:...

    spark streaming 2.3.0

    spark streaming 2.3.0 API 中文翻译介绍,知识点整理,demo介绍

    spark_streaming_aggregation:使用Spark Streaming进行事件聚合

    spark_streaming_aggregation 通过Spark Streaming进行事件聚合。 该示例包括基于Kafka或TCP事件流的事件聚合。 这些说明是但是应该在独立群集上工作。 生成并运行Kafka示例 生成程序集./sbt/sbt package 确保您有...

    spark-demo.7z

    Spark的主要组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)以及GraphX(图计算框架)。 Spark Core是Spark的基础,负责任务调度、资源管理和容错管理。它采用弹性分布式数据集(Resilient...

    Spark各种demo学习

    5. Spark Streaming:用于实时流处理,通过微批处理实现高吞吐和低延迟。 二、Spark操作示例 1. Word Count:Spark最经典的例子,用于统计文本中单词出现的次数。展示了RDD的基本操作,如map、reduceByKey和count。...

    spark -streaming实例

    spark streaming demo,应用实例代码,可供参考学习整理

    spark计数demo

    Spark是Apache Hadoop生态系统中的一个快速、通用且可扩展的大数据处理框架,它适用于批处理、交互式查询、实时流处理等多种应用场景。本教程将基于Java语言,介绍如何使用Spark进行简单的词频统计(WordCount)操作...

    基于Flume+Kafka+Spark Streaming的大数据处理Demo+源代码+文档说明

    - 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! &lt;项目介绍&gt; 1、该资源内项目代码都经过测试运行成功,...

    SparkDemo学习样例

    6. **Spark Streaming**:了解如何处理实时数据流,包括DStream的创建、转换和持久化操作,以及如何实现窗口和状态管理。 7. **Spark Job优化**:探讨如何调整配置参数,如executor的数量、内存大小和并行度,以...

Global site tag (gtag.js) - Google Analytics