package com.chexun.statistic import java.sql.{Connection, DriverManager} import java.util.Date import com.chexun.statistic.RealTimeAdv._ import kafka.serializer.StringDecoder import org.apache.commons.lang.time.DateFormatUtils import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * 记录最近1分钟的数据 * Created by hanyiting on 2015/08/13. */ object RealtimeCount { case class AdvLoging(vtime: Long, userIp: Long, muid: String, uid: String, ucp: String, adurl: String) case class Adv(userIp: Long, muid: String, ucp: String, adurl: String, location: String) def main(args: Array[String]) { val url = "jdbc:mysql://10.0.0.198:3306/test" val usr = "test" val pwd = "test" val sparkConf = new SparkConf().set("spark.streaming.unpersist", "true").set("spark.cleaner.ttl", "43200") .setExecutorEnv("SPARK_JAVA_OPTS", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps").setAppName("RealtimeCount") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(60)) //define the kafka parameters, broker list must be specified val kafkaParams = Map("metadata.broker.list" -> "10.0.0.37:9092,10.0.0.30:9092,10.0.0.35:9092,10.0.0.26:9092,10.0.0.27:9092") //define which topics to read from val topics = Set("chexun1", "chexun2") val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(x => x._2) //过滤掉adurl为空的数据,然后进行转换,提取出location,并过滤掉location为空的数据 val tmpdf = lines.map(_.split("\t")).map(x => AdvLoging(x(9).toLong, x(8).toLong, x(1), x(0), x(3), x(24))).filter(y => (y.adurl != null && !y.adurl.equals("null"))).map(x => Adv(x.userIp, x.muid, x.ucp, getUrl(x.adurl), getLocation(x.adurl))).filter(z => z.location != null && !("").equals(z.location)) tmpdf.foreachRDD { rdd => val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df = rdd.toDF().registerTempTable("adv") //获取当前时间,精确到分 val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:00") //对不同位置的广告进行分组,求pv和uv val rcount = sqlContext.sql("select location,count(*),count(distinct muid) from adv group by location").foreachPartition( datas => { val conn: Connection = DriverManager.getConnection(url, usr, pwd) val pstat = conn.prepareStatement("insert into loging_adv_realtime(stat_time,location,pv,uv) values (?,?,?,?)") for (data <- datas) { pstat.setString(1, stattime) pstat.setString(2, data(0).toString) pstat.setString(3, data(1).toString) pstat.setString(4, data(2).toString) pstat.executeUpdate() } } ) } ssc.start() ssc.awaitTermination() } }
相关推荐
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming
`SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...
《Spark技术深度解析:从SparkCount到SparkSQL与SparkStreaming》 Spark,作为大数据处理领域的重要框架,以其高效、易用的特点受到了广泛的关注。在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark...
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
在这个"阿里云EMR spark kafka redis MongoDB例子demo"中,我们看到了如何整合这些技术,构建一个实时数据处理系统,从Kafka获取数据,利用Spark Streaming进行实时分析,然后将结果存储到Redis和MongoDB,实现了...
这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...
此应用程序的主要目的是演示和 DSE 中的 Spark Streaming 功能。 它产生两种类型的结果: ...cd datastax-spark-streaming-demo sbt assembly run 在 DSE 集群上部署 Spark 应用程序: dse spark-submit --c
一、案例简介 使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统 计不同单词出现的次数 。 二、netcat操作 1、虚拟机中安装netcat ...object SparkStreamingDemo { def main(arg
### 大数据Spark技术分享:使用Streaming Spark与FPGAaaS加速实时分析 #### 概述 随着大数据时代的到来,实时数据分析成为了许多业务场景的核心需求。本篇内容将深入探讨如何利用Spark Streaming结合现场可编程...
3. Spark Streaming:处理实时数据流的模块,基于微批次处理,提供高吞吐量和容错性。 4. MLlib:Spark的机器学习库,提供了多种常用的机器学习算法和工具,如分类、回归、聚类等。 5. GraphX:用于图处理的API,...
- **Spark Streaming**:处理实时流数据,通过微批处理实现高吞吐量。 - **MLlib**:机器学习库,提供多种算法和实用工具。 - **GraphX**:处理图形数据的API。 4. **Spark运行模式** - **Local**:在本地单机...
**SparkStreaming-Kafka_demo** 的核心部分将包括以下步骤: 1. **设置和连接**: 首先,我们需要设置Spark和Kafka的连接参数,如Kafka的broker列表、topic名以及Spark的streaming超时时间等。 2. **创建DStream**:...
spark streaming 2.3.0 API 中文翻译介绍,知识点整理,demo介绍
spark_streaming_aggregation 通过Spark Streaming进行事件聚合。 该示例包括基于Kafka或TCP事件流的事件聚合。 这些说明是但是应该在独立群集上工作。 生成并运行Kafka示例 生成程序集./sbt/sbt package 确保您有...
Spark的主要组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)以及GraphX(图计算框架)。 Spark Core是Spark的基础,负责任务调度、资源管理和容错管理。它采用弹性分布式数据集(Resilient...
5. Spark Streaming:用于实时流处理,通过微批处理实现高吞吐和低延迟。 二、Spark操作示例 1. Word Count:Spark最经典的例子,用于统计文本中单词出现的次数。展示了RDD的基本操作,如map、reduceByKey和count。...
spark streaming demo,应用实例代码,可供参考学习整理
Spark是Apache Hadoop生态系统中的一个快速、通用且可扩展的大数据处理框架,它适用于批处理、交互式查询、实时流处理等多种应用场景。本教程将基于Java语言,介绍如何使用Spark进行简单的词频统计(WordCount)操作...
- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,...
6. **Spark Streaming**:了解如何处理实时数据流,包括DStream的创建、转换和持久化操作,以及如何实现窗口和状态管理。 7. **Spark Job优化**:探讨如何调整配置参数,如executor的数量、内存大小和并行度,以...