`
gaojingsong
  • 浏览: 1201519 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【JAVA语言之spark单词统计】

阅读更多

一、单词统计源码

package cn.com.sparkdemo.myspark;

 

import java.util.Arrays;

import java.util.List;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.SparkConf;  

import org.apache.spark.api.java.JavaPairRDD;  

import org.apache.spark.api.java.JavaRDD;  

import org.apache.spark.api.java.function.*;  

import org.apache.spark.streaming.api.java.JavaPairDStream;

 

import scala.Tuple2;

 

public class Demo {

 

/**

* -Xms256m -Xmx1024m

* @param args

*/

public static void main(String[] args) {

       // 为Spark环境中服务于本App的各个Executor程序设置使用内存量的上限

      //System.setProperty("spark.executor.memory", "512m"); 

      SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");

      conf.set("spark.testing.memory", "2147480000");

 

    //Could not locate executable null\bin\winutils.exe

 

    JavaSparkContext sc = new JavaSparkContext(conf);

 

     JavaRDD<String> input=sc.textFile("F:/BigData_workspace/myspark/src/main/resources/1.txt");  

         //flatMap 将行数据切分为单词  

             JavaRDD<String> words=input.flatMap(new FlatMapFunction<String, String>() {  

                 public Iterable<String> call(String s) throws Exception {  

                     return Arrays.asList(s.split(" "));  

                 }  

         });  

 

           //对其中的单词进行统计

             JavaPairRDD<String, Integer> wordCounts = words.mapToPair(

                   new PairFunction<String, String, Integer>() {

                     public Tuple2<String, Integer> call(String s) {

                       return new Tuple2<String, Integer>(s, 1);

                     }

                   }).reduceByKey(new Function2<Integer, Integer, Integer>() {

                     public Integer call(Integer i1, Integer i2) {

                       return i1 + i2;

                     }

                   });

             

        //             try {

               //             //为了访问http://192.168.1.101:4040/故此休眠暂停

                  //Thread.sleep(1000*4000);

       //} catch (InterruptedException e) {

      //// TODO Auto-generated catch block

               //e.printStackTrace();

       //}

          

             List<Tuple2<String, Integer>> output = wordCounts.collect();

                  for (Tuple2<?, ?> tuple : output) {

                   System.out.println(tuple._1() + ": " + tuple._2());

                  }

}

 

}

 

二、控制台日志分析

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

17/02/17 20:07:30 INFO SparkContext: Running Spark version 1.6.0

17/02/17 20:07:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

17/02/17 20:07:31 INFO SecurityManager: Changing view acls to: Administrator

17/02/17 20:07:31 INFO SecurityManager: Changing modify acls to: Administrator

17/02/17 20:07:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Administrator); users with modify permissions: Set(Administrator)

17/02/17 20:07:31 INFO Utils: Successfully started service 'sparkDriver' on port 2955.

17/02/17 20:07:32 INFO Slf4jLogger: Slf4jLogger started

17/02/17 20:07:32 INFO Remoting: Starting remoting

17/02/17 20:07:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.101:2968]

17/02/17 20:07:32 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 2968.

17/02/17 20:07:32 INFO SparkEnv: Registering MapOutputTracker

17/02/17 20:07:32 INFO SparkEnv: Registering BlockManagerMaster

17/02/17 20:07:32 INFO DiskBlockManager: Created local directory at C:\Documents and Settings\Administrator\Local Settings\Temp\blockmgr-8687bffc-358c-45de-9f90-bcf5f0279dd6

17/02/17 20:07:32 INFO MemoryStore: MemoryStore started with capacity 1311.0 MB

17/02/17 20:07:32 INFO SparkEnv: Registering OutputCommitCoordinator

17/02/17 20:07:32 INFO Utils: Successfully started service 'SparkUI' on port 4040.

17/02/17 20:07:32 INFO SparkUI: Started SparkUI at http://192.168.1.101:4040

17/02/17 20:07:32 INFO Executor: Starting executor ID driver on host localhost

17/02/17 20:07:32 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 2975.

17/02/17 20:07:32 INFO NettyBlockTransferService: Server created on 2975

17/02/17 20:07:32 INFO BlockManagerMaster: Trying to register BlockManager

17/02/17 20:07:32 INFO BlockManagerMasterEndpoint: Registering block manager localhost:2975 with 1311.0 MB RAM, BlockManagerId(driver, localhost, 2975)

17/02/17 20:07:32 INFO BlockManagerMaster: Registered BlockManager

17/02/17 20:07:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes

17/02/17 20:07:33 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 98.8 KB, free 98.8 KB)

17/02/17 20:07:33 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.8 KB, free 108.6 KB)

17/02/17 20:07:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:2975 (size: 9.8 KB, free: 1311.0 MB)

17/02/17 20:07:33 INFO SparkContext: Created broadcast 0 from textFile at Demo.java:34

17/02/17 20:07:35 WARN : Your hostname, MS-201609261921 resolves to a loopback/non-reachable address: 192.168.56.1, but we couldn't find any external IP address!

17/02/17 20:07:35 INFO FileInputFormat: Total input paths to process : 1

17/02/17 20:07:35 INFO SparkContext: Starting job: collect at Demo.java:54

17/02/17 20:07:35 INFO DAGScheduler: Registering RDD 3 (mapToPair at Demo.java:43)

17/02/17 20:07:35 INFO DAGScheduler: Got job 0 (collect at Demo.java:54) with 1 output partitions

17/02/17 20:07:35 INFO DAGScheduler: Final stage: ResultStage 1 (collect at Demo.java:54)

17/02/17 20:07:35 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

17/02/17 20:07:35 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

17/02/17 20:07:35 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at Demo.java:43), which has no missing parents

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 113.4 KB)

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 116.0 KB)

17/02/17 20:07:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:2975 (size: 2.7 KB, free: 1311.0 MB)

17/02/17 20:07:35 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

17/02/17 20:07:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at Demo.java:43)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

17/02/17 20:07:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2150 bytes)

17/02/17 20:07:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

17/02/17 20:07:35 INFO HadoopRDD: Input split: file:/F:/BigData_workspace/myspark/src/main/resources/1.txt:0+169

17/02/17 20:07:35 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

17/02/17 20:07:35 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

17/02/17 20:07:35 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

17/02/17 20:07:35 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

17/02/17 20:07:35 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

17/02/17 20:07:35 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver

17/02/17 20:07:35 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 141 ms on localhost (1/1)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 

17/02/17 20:07:35 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at Demo.java:43) finished in 0.172 s

17/02/17 20:07:35 INFO DAGScheduler: looking for newly runnable stages

17/02/17 20:07:35 INFO DAGScheduler: running: Set()

17/02/17 20:07:35 INFO DAGScheduler: waiting: Set(ResultStage 1)

17/02/17 20:07:35 INFO DAGScheduler: failed: Set()

17/02/17 20:07:35 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at Demo.java:48), which has no missing parents

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.9 KB, free 118.9 KB)

17/02/17 20:07:35 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1748.0 B, free 120.6 KB)

17/02/17 20:07:35 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:2975 (size: 1748.0 B, free: 1311.0 MB)

17/02/17 20:07:35 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

17/02/17 20:07:35 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at Demo.java:48)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

17/02/17 20:07:35 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)

17/02/17 20:07:35 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

17/02/17 20:07:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

17/02/17 20:07:35 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 16 ms

17/02/17 20:07:35 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1739 bytes result sent to driver

17/02/17 20:07:35 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 31 ms on localhost (1/1)

17/02/17 20:07:35 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 

17/02/17 20:07:35 INFO DAGScheduler: ResultStage 1 (collect at Demo.java:54) finished in 0.031 s

17/02/17 20:07:35 INFO DAGScheduler: Job 0 finished: collect at Demo.java:54, took 0.401950 s

this: 1

Spark: 1

spark: 1

is: 3

LZ4: 1

general: 1

a: 3

fast: 2

Apache: 1

processing: 1

data: 1

large-scale: 1

very: 1

hello: 1

compression: 1

for: 1

and: 2

engine: 1

decompression: 1

algorithm.: 1

test: 1

world: 1

17/02/17 20:07:35 INFO SparkContext: Invoking stop() from shutdown hook

17/02/17 20:07:35 INFO SparkUI: Stopped Spark web UI at http://192.168.1.101:4040

17/02/17 20:07:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

17/02/17 20:07:35 INFO MemoryStore: MemoryStore cleared

17/02/17 20:07:35 INFO BlockManager: BlockManager stopped

17/02/17 20:07:35 INFO BlockManagerMaster: BlockManagerMaster stopped

17/02/17 20:07:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

17/02/17 20:07:35 INFO SparkContext: Successfully stopped SparkContext

17/02/17 20:07:35 INFO ShutdownHookManager: Shutdown hook called

17/02/17 20:07:35 INFO ShutdownHookManager: Deleting directory C:\Documents and Settings\Administrator\Local Settings\Temp\spark-11aa6dd4-c479-45ed-9929-c2ed7053d137

17/02/17 20:07:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

 

三、错误解决方案

1)内存问题



 

2)Hadoop中util.exe没有可执行文件



 

三、结果验证



 四、Spark的WebUI



 

 

  • 大小: 110 KB
  • 大小: 57.8 KB
  • 大小: 62.7 KB
  • 大小: 82 KB
  • 大小: 53.6 KB
0
0
分享到:
评论

相关推荐

    Java实现Spark词配对Wordcount计数代码实现

    本示例将探讨如何使用Java API来实现Spark的Wordcount功能,即对文本文件中的单词进行配对计数。在Java中,我们可以利用Spark的RDD(弹性分布式数据集)和MapReduce编程模型来完成这一任务。 首先,我们需要引入...

    大数据技术实践——Spark词频统计

    本实践旨在基于已经搭建的Hadoop平台,利用Spark组件进行文本词频统计,以此深入理解Scala语言,并掌握Spark编程思想。 **一、Spark核心特性** Spark的核心在于其内存计算模型,它能够在内存中存储中间结果,避免...

    【Spark资源】Spark单词统计实验.pdf

    【Spark资源】Spark单词统计实验主要是一个针对初学者的实践教程,旨在帮助用户熟悉Scala编程语言和Apache Spark的编程模型,特别是如何实现一个简单的SparkWordCount程序。这个实验旨在通过实际操作来提升对Spark的...

    spark计数demo

    本教程将基于Java语言,介绍如何使用Spark进行简单的词频统计(WordCount)操作,这也是Spark入门的经典示例。 首先,我们需要了解Spark的基本架构。Spark的核心概念是弹性分布式数据集(Resilient Distributed ...

    Spark集群实现统计文档单词频次实例.docx

    "Spark集群实现统计文档单词频次实例" 以下是从给定的文件中生成的相关知识点: 1. 大数据技术概述 大数据技术是指处理大量复杂数据集的技术,可以处理结构化、半结构化和非结构化数据,具有高效、灵活和可扩展的...

    使用spark 对文本分词统计

    可能包括使用Java、Scala或Python等编程语言,以及Spark的MLlib库或者第三方的NLP库如Stanford NLP或jieba分词。 标签“源码”和“工具”暗示了这篇博文可能包含实际的代码示例,可能是通过展示一个简单的Spark应用...

    JAVA-词频统计工具.zip

    虽然压缩包没有包含相关的库,但在实际项目中,Java有许多用于数据分析的库,如Apache Commons Math、Guava或者更专业的库如Apache Spark,可以帮助进行复杂的统计计算。 10. **代码调试与测试**: 开发完成后,...

    java词频统计

    在Java编程语言中,进行词频统计是一项常见的文本处理任务,尤其在数据分析、自然语言处理(NLP)和信息检索领域。这篇博文“java词频统计”可能详细讲解了如何利用Java来实现一个简单的词频统计程序。在这个过程中...

    SparkML算法详解(关于DataFrame的API操作)--数据挖掘(Scala与Java版)

    该库的设计重点在于实现高性能、易于使用的接口,支持多种编程语言(如 Scala、Java 和 Python),并且具有高度的可扩展性和灵活性。 #### Spark MLlib 概述 - **特性**: - **分布式的机器学习算法**:Spark ...

    spark-windows helloword入门例子

    Spark提供了丰富的API,包括Scala、Java、Python和R,使得开发者能够方便地创建和操作RDD。 在Windows环境下搭建Spark环境,通常需要下载预编译的Spark版本,包括Hadoop兼容性组件,因为Spark经常与Hadoop生态系统...

    一个基于Spring Boot的Spark开发手脚架(Java+Scala),开箱即用!模拟了1个WordCount.zip

    模拟了1个WordCount.zip"表明这是一个使用Spring Boot框架构建的Spark应用程序,它结合了Java和Scala两种编程语言,旨在提供一个快速启动和运行的开发环境。其中,WordCount是经典的分布式计算案例,用于统计文本中...

    Spark+SparkSQL+Spark Streaming+Spark Core+数据处理

    Spark Core还提供了丰富的API,支持多种编程语言,如Scala、Java、Python和R。 Spark SQL Spark SQL是Spark用于处理结构化数据的组件,引入DataFrame概念,将SQL查询与Spark的分布式计算能力相结合。DataFrame是一...

    大数据技术之Spark.docx

    WordCount是一个经典的示例程序,用于统计文本文件中每个单词出现的次数。在Sparkshell中,可以通过简单的几行代码实现WordCount。 **3.5 在IDEA中编写WordCount程序** 可以使用IDEA等集成开发环境来编写更复杂的...

    hadoop scala spark 例子项目,运行了单机wordcount

    在大数据处理领域,WordCount是入门级的经典示例,用于统计文本文件中单词出现的频率。 【描述】中的内容与标题相同,再次强调这是一个基于Hadoop、Scala和Spark的项目,而且已经在单机环境下成功运行了WordCount...

    文件分析(给定一个非中文的文本文件以及一系列查询单词,请输出所查询单词的出现次数。)

    本问题聚焦于如何分析非中文的文本文件并统计指定查询单词的出现次数,这通常涉及到编程语言如Java的使用。以下是这个任务的详细解析: 首先,我们需要理解题目要求。给定一个非中文的文本文件,意味着文件可能包含...

    《Spark编程基础及项目实践》课后习题及答案6.pdf

    2. (2)选择题:Spark支持多种编程语言,如Java、Scala和Python,但Ruby不在其官方支持的列表中,因此选项C不正确。 3. (3)选择题:在Dstream的转换操作中,`reduce()`用于对RDD进行聚合操作,将相同key的元素...

Global site tag (gtag.js) - Google Analytics