常用命令:
val rdd1 = sc.parallelize(List(('a',1),('a',2))) val rdd = sc.textFile(“/usr/local/spark/tmp/char.data") rdd.count rdd.cache val word_count = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) word_count.saveAsTextFile("/usr/local/spark/tmp/result") val word_count = rdd.flatMap(_.split(" ")).map((_,1)).groupByKey() rdd1.lookup('a') val rdd2 = sc.parallelize(List(1,2,3,4,5)) rdd2.reduce(_+_) word_count.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._1,x._2)).collcet val rdd = sc.textFile(“/usr/local/spark/tmp/SogouQ1.txt") rdd.map(_.split("\\t")(0)).filter(_< "20111230010101").count //parallelize演示 val num=sc.parallelize(1 to 10) //并行化 val doublenum = num.map(_*2) val threenum = doublenum.filter(_ % 3 == 0) threenum.collect threenum.toDebugString val num1=sc.parallelize(1 to 10,6) val doublenum1 = num1.map(_*2) val threenum1 = doublenum1.filter(_ % 3 == 0) threenum1.collect threenum1.toDebugString threenum.cache() val fournum = threenum.map(x=>x*x) fournum.collect fournum.toDebugString threenum.unpersist() num.reduce (_ + _) num.take(5) num.first num.count num.take(5).foreach(println) //K-V演示 val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) kv1.sortByKey().collect //注意sortByKey的小括号不能省 kv1.groupByKey().collect kv1.reduceByKey(_+_).collect val kv2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5))) kv2.distinct.collect kv1.union(kv2).collect val kv3=sc.parallelize(List(("A",10),("B",20),("D",30))) kv1.join(kv3).collect kv1.cogroup(kv3).collect val kv4=sc.parallelize(List(List(1,2),List(3,4))) kv4.flatMap(x=>x.map(_+1)).collect //文件读取演示 val rdd1 = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/directory/") rdd1.toDebugString val words=rdd1.flatMap(_.split(" ")) val wordscount=words.map(x=>(x,1)).reduceByKey(_+_) wordscount.collect wordscount.toDebugString val rdd2 = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/directory/*.txt") rdd2.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect //gzip压缩的文件 val rdd3 = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/test.txt.gz") rdd3.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect //日志处理演示 //http://download.labs.sogou.com/dl/q.html 完整版(2GB):gz格式 //访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL //SogouQ1.txt、SogouQ2.txt、SogouQ3.txt分别是用head -n 或者tail -n 从SogouQ数据日志文件中截取 //搜索结果排名第1,但是点击次序排在第2的数据有多少? val rdd1 = sc.textFile("hdfs://hadoop1:8000/dataguru/data/SogouQ1.txt") val rdd2=rdd1.map(_.split("\t")).filter(_.length==6) //非6列数据 rdd2.count() val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2) rdd3.count() rdd3.toDebugString //session查询次数排行榜 val rdd4=rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)) //(x._2,x._1)表示互换 rdd4.toDebugString rdd4.saveAsTextFile("hdfs://hadoop1:8000/dataguru/week2/output1") //cache()演示 //检查block命令:bin/hdfs fsck /dataguru/data/SogouQ3.txt -files -blocks -locations val rdd5 = sc.textFile("hdfs://hadoop1:8000/dataguru/data/SogouQ3.txt") rdd5.cache() rdd5.count() rdd5.count() //比较时间 //join演示 val format = new java.text.SimpleDateFormat("yyyy-MM-dd") case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float) case class Click (d: java.util.Date, uuid: String, landing_page: Int) val reg = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/join/reg.tsv").map(_.split("\t")).map(r => (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat))) val clk = sc.textFile("hdfs://hadoop1:8000/dataguru/week2/join/clk.tsv").map(_.split("\t")).map(c => (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt))) reg.join(clk).take(2)
相关推荐
**Kafka Tool:高效管理Apache Kafka集群的利器** Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而...
**Kafka Tool 连接 Kafka 工具详解** 在大数据处理和实时流处理领域,Apache Kafka 是一个不可或缺的组件,它作为一个分布式的消息中间件,提供高效、可扩展且可靠的发布订阅服务。为了方便管理和操作 Kafka 集群,...
Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...
在IT行业中,Kafka是一种广泛使用的分布式流处理平台,它由Apache软件基金会开发,主要用于构建实时数据管道和流应用。本文将围绕标题和描述中提到的两种Kafka工具——kafkatool-64bit.exe和kafka-eagle-bin-1.4.6....
**Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...
在Spring Boot应用中,我们可以利用Spring Kafka框架来与Apache Kafka进行集成,实现高效的消息传递。本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:...
本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...
《Kafka报文模拟工具深度解析》 在现代大数据处理领域,Apache Kafka作为一个高效、可扩展的实时流处理平台,已经成为了许多企业级应用的核心组件。然而,在开发和测试过程中,有时我们需要模拟Kafka报文的发送,...
**Kafka详细课程讲义** 本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache...
《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...
**Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...
**Kafka Tool 2.0.7 在 Linux 系统中的使用详解** Kafka Tool 是一款功能强大的 Apache Kafka 管理工具,适用于监控、管理、以及数据迁移等任务。在 Linux 系统中,我们可以方便地利用此工具进行各种 Kafka 相关的...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
### 关于Kafka资源下载kafka_2.11-2.0.0.tgz的知识点 #### Kafka简介 Apache Kafka是一种开源的消息队列服务,它最初由LinkedIn开发,并于2011年成为Apache软件基金会的一个顶级项目。Kafka因其高性能、可扩展性和...
在IT行业中,网络通信和大数据处理是两个至关重要的领域,Netty和Kafka分别是这两个领域的佼佼者。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka...
【Kafka基础知识】 Kafka是由Apache开发的分布式流处理平台,它主要被设计用来处理实时数据流。在大数据处理领域,Kafka常被用于构建实时数据管道和流应用,能够高效地处理大量的实时数据。 【Java与Kafka的结合】...
《Apache Kafka 2.8.2:分布式流处理平台详解》 Apache Kafka是一个开源的分布式流处理平台,由LinkedIn开发并捐赠给Apache软件基金会。Kafka 2.8.2是该平台的一个重要版本,提供了丰富的功能和改进,旨在满足大...
**Kafka 2.5.1 知识点详解** Kafka 是一个分布式流处理平台,由 Apache 软件基金会开发,广泛应用于大数据实时处理、日志收集、消息系统等多个领域。`kafka_2.12-2.5.1` 是 Kafka 的一个特定版本,针对 Scala 2.12 ...
kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-...
**Kafka概述** Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源消息系统,它是一个高性能、可扩展的分布式消息中间件。Kafka最初设计的目标是处理网站活动流数据,但随着时间的发展,它已被广泛应用于...