`
290434409
  • 浏览: 27214 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

windons下远程提交任务到linux上的spark集群

阅读更多

一、开发环境:

     操作系统:win8 64位

     IDE:IntelliJ IDEA

     JDK:1.7

     scala:scala-2.11.7

     spark:linux上spark集群版本:1.4.1,本地依赖spakr的jar直接拷贝linux上$SPARK_HOME/lib/spark-assembly-1.4.1-hadoop2.4.0.jar

      maven关键依赖:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <!-- 进行LDA 会使用到一下jar,否则可不引入 -->
        <dependency>
            <groupId>com.github.scopt</groupId>
            <artifactId>scopt_2.10</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>19.0</version>
        </dependency>

 二、环境配置:

      1、在window下配置环境变量:SCALA_HOME,然后引入到Path中

      2、确保linux上spark集群的Master的spark-env.sh中SPARK_MASTER_IP的值本机能ping通,一般有两种配置:

            1)直接配置IP为10.x.x.x,确保能ping同此IP即可,Master即为:spark://10.x.x.x:7077

            2)配置的为linux机器名称如Master1.Hadoop,则需要在windows的hosts文件中将其配置进来,Master为spark://Master1.Hadoop:7077

     

三、提交流程

     1、将spark任务类打jar包,生成d://....//spark-demo.jar,此处打的jar包放在linux使用spark-submit命令行调用也能执行

     2、执行任务类或采用SparkSubmit.main(args)提交

四、代码实测:

       1、spark 圆周率,此程序无须提交数据文件

       新建项目,引入以上pom依赖,且将spark-assembly-1.4.1-hadoop2.4.0.jar手动加入项目中,新建类MyPi.java

   

package com.alleyz.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by Mr on 2016/1/5.
 */
public class MyPI {

    public static void main(String[] atrs){
        SparkConf conf = new SparkConf();
        conf.setAppName("alleyz-lad").setMaster("spark://Master1.Hadoop:7077");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        jsc.addJar("G:\\workspace\\idea\\spark-demo\\target\\artifacts\\spark_demo_jar\\spark-demo.jar");
        int slices =  20;
        int n = 100000 * slices;
        List<Integer> l = new ArrayList<Integer>(n);
        for (int i = 0; i < n; i++) {
            l.add(i);
        }

        JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);

        int count = dataSet.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                double x = Math.random() * 2 - 1;
                double y = Math.random() * 2 - 1;
                return (x * x + y * y < 1) ? 1 : 0;
            }
        }).reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        });

        System.out.println("Pi is roughly " + 4.0 * count / n);

        jsc.stop();
    }
}

 

    先打jar包,然后点击右键运行,浏览器打开 http://Master1.Hadoop:8080   则可以看到正在执行的任务,稍后控制台会输入具体的结果,程序结束;

 

         2、LDA

         需要引入pom文件依赖,准备数据,我自己的如下(词组成的一个不规则矩阵):

......
系统 三毛 查询 查询 核实 卡已经 目前欠 块八毛 
核实 核实 账号 开通 业务 一个来电显示
包五 流量包 流量包 核实 非常抱歉 可以包一个 五十 流量包 现在办理 生效 确定办理 需要办理 办理 操作办理 
电话 停机 还有一 毛三 办理一个 停机保 功能 手机 导致 手机
....

         新建java类:

package com.alleyz.spark;

import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.feature.IDF;
import org.apache.spark.mllib.feature.IDFModel;
import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;

import java.io.*;
import java.util.*;

/**
 * Created by Mr on 2016/1/4.
 */
public class SparkLda {
    public static void main(String[] args) {
        final HashingTF hashingTF=new HashingTF(20000);
        SparkConf conf = new SparkConf();
        conf.setAppName("alleyz-lda").setMaster("spark://Master1.Hadoop:7077");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.addJar("G:\\workspace\\idea\\spark-demo\\target\\artifacts\\spark_demo_jar\\spark-demo.jar");
        // Load and parse the data
//        sc.addFile("D:\\spark\\sample_lda_data.txt");
        String path = "D:\\spark\\sample_lda_data.txt";
        File file = new File(path);
        final List<String> list=new ArrayList<>();
        try{
            BufferedReader fr = new BufferedReader(new InputStreamReader(new FileInputStream(file)));//new FileReader(file);
            String line;
            while((line=fr.readLine())!=null){
                list.add(line);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        //原始数据
        JavaRDD<String> data = sc.parallelize(list);


        JavaPairRDD<Long,List<String>> javaRdd=JavaPairRDD.fromJavaRDD(data.map(new Function<String, Tuple2<Long,List<String>>>() {
            @Override
            public Tuple2<Long,List<String>> call(String s) throws Exception {
                return new Tuple2<Long, List<String>>((long)s.hashCode(), Arrays.asList(s.split(" ")));
            }
        }));
        JavaPairRDD<Long,Vector> tfData=javaRdd.mapValues(new Function<List<String>, Vector>() {
            @Override
            public Vector call(List<String> strings) throws Exception {
                return hashingTF.transform(strings);
            }
        });
        JavaRDD<String> tokens=javaRdd.values().flatMap(new FlatMapFunction<List<String>, String>() {
            @Override
            public Iterable<String> call(List<String> strings) throws Exception {
                return strings;
            }
        }).distinct();
        Multimap<Integer,String> mapping= Multimaps.index(tokens.toArray(), new com.google.common.base.Function<String, Integer>() {
            public Integer apply(String t){
                return hashingTF.indexOf(t);
            }
        });
        final IDFModel idfModel=new IDF().fit(tfData.values());
        JavaPairRDD<Long,Vector> tIdfData=tfData.mapValues(new Function<Vector, Vector>() {
            @Override
            public Vector call(Vector vector) throws Exception {
                return idfModel.transform(vector);
            }
        });

        DistributedLDAModel ldaModel = (DistributedLDAModel) new LDA().setK(10).setMaxIterations(100).run(tIdfData);
        Tuple2<int[],double[]>[] d=ldaModel.describeTopics(100);
        String out="D:\\spark\\lda-out";
        BufferedWriter bw=null;
        try {
            File outFile=new File(out);
            if(!outFile.exists())outFile.createNewFile();
            bw= new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile)));
            for (int i = 0; i < d.length; i++) {
                System.out.println("Topic " + i + ":");
                bw.write("Topic "+i+":\r\n");
                for (int j = 0; j < d[i]._1().length; j++) {
                    Collection col = mapping.get(d[i]._1()[j]);
                    if (col.isEmpty()) continue;
                    System.out.println("" + col + " " + d[i]._2()[j]);
                    bw.write("" + col + " " + d[i]._2()[j]+"\r\n");
                }
                bw.write("\r\n-----------------------------------------------");
            }
            bw.flush();
            bw.close();
        }catch (Exception e){
            e.printStackTrace();
        }

        
        sc.stop();
    }
}

 

    代码写完,bulid artifacts 打jar包,然后右键运行,就可以看到结果:

 

Topic 0:
[密码] 0.009866712747027252
[设置] 0.009242580065070845
[手机] 0.007976218361447634
[电脑, 音量] 0.007185679520142035
[声音] 0.006859034109454243
[发给] 0.006323283466899422
[实用, 号码] 0.005875648054184159
[姓名] 0.00514309514424346
[身份证] 0.00442075176344923
[开通, 详细咨询] 0.0044181003876376
[信息] 0.0042643124381648205
[听到] 0.0042532126807515915
[操作] 0.004231734979013577
.....
-----------------------------------------------Topic 2:
[宽带] 0.0193832008376721
[活动] 0.015378671944555595
[账号] 0.009303726013777125
[刷新一下] 0.009030500798620545
[刷新] 0.007823174929503695
[宽带账号] 0.007740455193261026
[实用, 号码] 0.0075097838150508505
[关机] 0.006752236405560853
[不能上网, 开户] 0.0064310542757245156
.....

  

     总结一下:网上大多都是在yarn上传,,这个是不依赖hadoop的,最后那个LDA是借鉴git:https://github.com/igorekpotworek/NLP  ,这是将lambda表达式改为java1.7的方法,其次修改了读取文件的策略,如有问题,请各位路过的大神及时指正,不胜感激

0
0
分享到:
评论

相关推荐

    Spark集群及开发环境搭建(完整版)

    ### Spark集群及开发环境搭建(完整版) #### 一、软件及下载 本文档提供了详细的步骤来指导初学者搭建Spark集群及其开发环境。首先需要准备的软件包括: - **VirtualBox-5.1**:虚拟机软件,用于安装CentOS操作...

    hadoop-Apache2.7.3+Spark2.0集群搭建

    在Linux版本方面,本例采用的是CentOS 6.7版本,并确认JDK版本为1.8.0_77,这是运行Hadoop和Spark集群所必须的Java环境。 接着,集群需要至少三个节点来分别担当Master(主节点)和Slaves(从节点)的角色。本例中...

    linux 搭建 高性能集群

    在Linux环境下搭建高性能集群是一项复杂而关键的任务,它涉及到网络配置、资源管理、负载均衡以及分布式计算等多个领域。本文将深入探讨如何构建这样的集群,并基于提供的文件“马路遥--搭建高性能集群c.pdf”来解析...

    集群搭建与使用文档

    总的来说,这个“集群搭建与使用手册”将为你提供一套全面的指导,涵盖从无到有构建Hadoop和Spark集群的全过程,以及如何在Linux环境中有效利用这些集群进行大数据处理。通过深入学习和实践,你将能够熟练掌握大数据...

    基于Linux的Hadoop集群搭建的探索与实现.docx

    总的来说,这篇论文旨在为读者提供一个全面的Hadoop集群搭建指南,使读者能够从理论到实践,从基础到进阶,逐步掌握在Linux环境下构建和优化Hadoop集群的技能,以应对大数据时代的挑战。通过阅读和实践,读者将能够...

    Ubuntu下安装spark.pdf

    在Ubuntu操作系统中安装Apache Spark的过程涉及到一系列详细的步骤,需要按照既定的顺序进行操作。下面是根据给定文件内容整理出来的知识点: 首先,需要了解Apache Spark是一个快速的、开源的分布式计算系统,它...

    基于Spark云计算技术的Linux实验教学研究.pdf

    在Linux教学中,学生需要学会在多台服务器上部署和运行Spark,这就必须面对如何避免在每台服务器上重复安装配置,以及如何进行远程编程和管理的问题。为了解决这些问题,教学过程中可以采用Docker、VMware等技术,...

    hadoop 2.6.0 及Spark1.3.1平台搭建20150505-优化版

    在 Hadoop 集群上运行 Spark,还需要配置 `spark-defaults.conf`,包括指定 HDFS 的地址和 YARN 的资源管理器。 为了测试 Spark 的功能,可以运行 Spark Shell 或编写简单的 Scala、Python 或 Java 应用来执行数据...

    Hadoop集群搭建部署与MapReduce程序关键点个性化开发.doc

    在Eclipse中直接运行MapReduce程序,可以进行快速的本地测试和调试,减少了实际在集群上运行的时间。 任务3是对开发过程的总结和反思,通常包括遇到的问题、解决策略以及优化建议。在实践中,可能需要根据硬件资源...

    Linux环境下Hadoop搭建与Eclipse配置

    通过这些步骤,开发者可以在本地或远程Linux集群上快速开发和测试Hadoop应用,为大数据处理提供有力的工具支持。记住,每个步骤都可能需要根据实际环境进行微调,确保所有配置正确无误才能保证Hadoop的顺利运行。

    windows版hadoop插件

    在Windows环境下,由于Hadoop最初是为Linux设计的,因此在Windows上安装和配置Hadoop可能会遇到一些挑战。这个“windows版hadoop插件”正是为了解决这个问题,它简化了Windows上的Hadoop开发环境搭建过程。安装该...

    在虚拟机中安装anaconda

    ### 在虚拟机中安装Anaconda并配置Spark集群下的Python开发环境 #### 一、安装Anaconda 在虚拟机环境中安装Anaconda是进行大数据处理及数据分析的重要步骤之一,尤其是在需要使用Python进行Spark集群开发的情况下。...

    大数据Linux基础学习笔记

    学习如何在Linux上配置这些工具,包括环境变量设置、集群部署、配置优化等,是大数据学习的基础。 八、Shell脚本编程 编写Shell脚本可以自动化重复任务,提高工作效率。学习bash语言,掌握变量、条件语句、循环、...

    2022高职 大数据技术与应用 任务书1(赛项赛题).docx

    完成这些任务需要团队具备扎实的Linux系统管理能力,熟悉大数据生态系统,包括Hadoop的HDFS和YARN,以及Spark和Flink的配置和操作。同时,还需要掌握使用SSH进行远程访问和文件传输,以及通过命令行进行环境配置和...

    Centos64位Linux版本的eclipse

    Xshell是一款强大的终端模拟器,允许用户通过SSH协议连接到远程Linux服务器,进行命令行操作。这对于开发者在本地主机上操作远程服务器非常有用,特别是在没有图形界面的情况下。在虚拟机中部署Eclipse,可以利用...

    大数据环境集群环境搭建.pdf

    - 运行测试任务,验证集群工作正常,例如在Hadoop上运行WordCount示例,在Spark上运行简单的数据分析任务。 以上就是搭建大数据环境集群的基本步骤,涉及了从基础环境配置到各个组件的安装和配置。整个过程需要...

    hadoop搭建 zookeeper_hbase_hive_sqoop_mysql_spark_hdfs.doc

    在构建一个完整的Hadoop生态系统时,我们需要搭建多个组件,包括Zookeeper、HBase、Hive、MySQL、Kafka以及Spark,并且它们都要运行在HDFS之上。下面将详细介绍这些组件的安装与配置过程。 1. **Zookeeper**: ...

    spark之Standalone模式部署配置详解

    在 Standalone 模式下,Spark 可以在本地或远程机器上运行,且可以使用 ZooKeeper 实现高可用性。 准备工作 在部署 Spark 之前,需要进行以下准备工作: 1. 下载 Spark 的编译版本或自行编译 Spark。 2. 安装 ...

Global site tag (gtag.js) - Google Analytics