`

Scala mapreduce

阅读更多

本文只是带你进入 Scala 的世界,包括安装、不可变变量 val、可变变量 var、定义类、集合(包括列表(list)、集(set)、映射(map))以及集合遍历和集合库(能达到并行/并发效果)。

题外话,如果 Java 争气的话,还就真不会出现像 Scala 这些语言。对于函数编程风格的支持,尤其是对于 Lambda 表达式的支持,能够有助于减少必须要编写的逻辑无关样板代码,也许让它可以更简单的关注要面对的任务本身。而 Java 对 Lamdba 表达式的支持到 JavaSE8 才实现(你可以查一下 Java SE8 什么发布的,而其他语言何时支持匿名函数、Lambda 表达式、函数式编程、并行编程……)。

Scala,一门强类型定义的静态类型语言,结合了面向对象编程与函数编程思想,语法简洁,完全兼容Java,运行在 JVM 上。JVM 上的其他语言:Groovy、JRuby、Clojure。那么 Scala 有什么不同?能同时提供函数式风格和良好并发支持的强类型语言,只有 Scala。JRuby 和 Groovy 都是动态语言(Scala 是静态类型语言),它们不是函数式的,也无法提供比 Java 更好的并发解决方案。另一方面,Clojure 是一种混合型的函数式语言,它天生就是动态的,因此不是静态类型。而且它的语法类似 Lisp,除非你很熟悉,否则这可不是一种易于掌握的语言(Lisp 是号称高智商的人才能使用的语言,如果你看过《黑客与画家》,应该记得作者的一句话,大意是,如果竞争对手采用 Lisp 开发 Web,那就应该小心了,言下之意是,Lisp 跟其他语言相比,生产效率太高了,很容易实现一个想法)。

总结起来,Scala 特点体现在以下几方面:

  • Scala 运行在 JVM 上,这使得 Scala 可以和现存的应用同时运行;
  • Scala 可以直接使用 Java 类库,使得开发人员可以利用现有的框架和遗留代码;
  • Scala 和 Java 一样都是静态类型语言。因此,遵循相同的编程哲学;
  • Scala 语法与 Java 比较接近,使得开发人员可以快速掌握语言基础;
  • Scala 既支持面向对象范型,也支持函数式编程范型,这样,开发人员就可以逐步在代码中运用函数式编程的思想。

Scala 对 Java 的不同:

  • 类型推断。在 Java 中,必须声明每个变量、实参或形参的类型。Scala 则会在可能的情况下推断出变量的类型;
  • 函数式编程。Scala 将函数式编程的重要概念引入 Java,包括代码块、高阶函数(high-order function)以及复杂的集合库;
  • 不变量。Java 的确允许使用不变量,不过是通过提供一个很少使用的修饰符实现的。Scala 会要求你明确地决定一个变量是否可变。这些决定将对应程序在并发环境中的行为,产生深远的影响;
  • 高级程序构造。Scala 很好地使用了基础语言,并将有用的概念分层。包括并发应用的 Actor 模型、使用高阶函数的 Ruby 风格的集合以及作为一等对象类型(first-class)的 XML 的处理。

文中代码本人在 Scala 2.11 上编译并运行通过。

作为第一步,先安装好最新的 Scala 发布包 Typesafe stack,打开命令行窗口,键入“scala”:这将启动 REPL(读入-运算 输出 循环)交互式编码环境。然后就可以写下你的第一行 Scala 代码:

scala> val columbus: Int = 1492
 
columbus: Int = 1492
 
scala>

声明了一个类型为 Int 变量,初始值为 1492,就像在Java里 Int columbus = 1492; 一样。

Scala 把类型放在变量之后(反向的声明方式),还使用“val”显性地把变量声明为不可变。如果想修改这个变量:

scala> columbus=1500
 
<console>:8: error: reassignment to val
 
       columbus=1500
 
               ^
 
scala>

错误消息精确地指出了错误位于行的位置。

再尝试声明这个变量,但这次用“var”,让其可变更。这样编译器能推断出 1492 是一个整数,也就不需要指定类型了:

scala> var columbus = 1492
 
columbus: Int = 1492
 
scala> columbus = 1500
 
columbus: Int = 1500
 
scala>

接下来,我们来定义一个类,名为 Employee,有三个不可变更的字段:name、age 和 company,拥有各自的缺省值。

scala> case class Employee(name:String="guest",
 
     | age:Int=30,
 
     | company:String="DevCode")
 
defined class Employee
 
scala>

关键字“case”相当于 Java 里的 switch 语句,只不过更为灵活。它说明该类具有模式匹配的额外机制,以及其他一些特性,包括用来创建实例的工厂方法(不需要使用“new”关键字来构造),同样也不需要创建缺省的 getter 方法。与 Java 中不同的是,变量缺省下的访问控制是 public(而不是protected),而Scala为公开变量创建一个 getter 方法,并命名为变量名。如果你愿意,你也可以把字段定义成可变且/或私有(private)的,只需要在参数之前使用“var”(例如:case class Person(private var name:String))。

我们再来用不同方式创建一些实例,看看其他的特性,像是命名参数和缺省参数(从Scala2.8开始引入):

scala> val guest=Employee()
 
guest: Employee = Employee(guest,30,DevCode)
 
scala> val guestAge=guest.age
 
guestAge: Int = 30
 
scala> val anna=Employee("Anna")
 
anna: Employee = Employee(Anna,30,DevCode)
 
scala> val thomas=Employee("Thomas",41)
 
thomas: Employee = Employee(Thomas,41,DevCode)
 
scala> val luke=Employee("Luke",company="LucasArt")
 
luke: Employee = Employee(Luke,30,LucasArt)
 
scala> val yoda=luke.copy("Yoda",age=800)
 
yoda: Employee = Employee(Yoda,800,LucasArt)
 
scala>

不过,下面的写法是行不通的(可不是因为 Darth 不是 DevCode 的雇员!)

scala> val darth=Employee("Darth","DevCode")
 
<console>:9: error: type mismatch;
 
 found   : String("DevCode")
 
 required: Int
 
       val darth=Employee("Darth","DevCode")
 
                                  ^
 
scala>

这是由于构造函数在这个位置需要 age 作为参数,因为函数参数没有显性地进行命名。

现在我们再来看集合,这才是真正让人兴奋的地方。Scala 主要集合类型包括列表(list)、集(set)和映射(map)。

有了泛型(Java5 以上),Java可以遍历一个列表,比方说整数型列表,用下面代码:

 
List<Integer> numbers = new arrayList<Integer>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
for(Integer n:numbers) {
    System.out.println("Number "+n);
}

运行结果:

Number 1
 
Number 2
 
Number 3

Scala 对于可变集合和不可变集合进行了系统性地区别处理,不过,鼓励使用不可变集合,也因此在缺省情况下创建不可变集合。这些集合是通过模拟的方式实现添加、更新和删除操作,在这些操作中,不是修改集合,而是返回新的集合。

与前面的 Java 代码等价的 Scala 代码可能像下面这样:

scala> val numbers=List(1,2,3)
 
numbers: List[Int] = List(1, 2, 3)
 
scala> for(n<-numbers) println("Number "+n)
 
Number 1
 
Number 2
 
Number 3
 
scala>

这里的“for”循环语法结构非常接近于 Java 的命令式编程风格。在 Scala(以及 Java 虚拟机上其他很多语言如:Groovy、JRuby 或 JPython)里还有另外一种方式来实现上面的逻辑。这种方式使用一种更加偏向函数编程的风格,引入了 Lambda 表达式(有时也称为闭包——closure)。简单地说,Lambda 表达式就是你可以拿来当作参数传递的函数。这些函数使用参数作为输入(在我们的例子中就是“n”整型变量),返回语句作为函数体的最终语句。他们的形式如下:

functionName { input =>
    body
}
scala> numbers.foreach{n:Int=> println("Number "+n) }
 
Number 1
 
Number 2
 
Number 3
 
scala>

上面的例子中,函数体只有一条语句(println……),返回的是单位(Unit,也就是“空结果”),也就是大致相当于 Java 中的 void,不过有一点不同的是——void是不返回任何结果的。

除了打印数值列表外,我们更想做处理和变换这些元素,这时我们需要调用方法来生成结果列表,以便后面接着使用。让我们尝试一些例子:

scala> val reversedList=numbers.reverse
 
reversedList: List[Int] = List(3, 2, 1)
 
scala> val numbersLessThan3=numbers.filter{n=>n<3}
 
numbersLessThan3: List[Int] = List(1, 2)
 
scala> val oddNumbers=numbers.filterNot{n=>n%2==0}
 
oddNumbers: List[Int] = List(1, 3)
 
scala> val highterNumbers=numbers.map{n=>n+10}
 
highterNumbers: List[Int] = List(11, 12, 13)
 
scala>

变换“map”非常有用,它对列表的每个元素应用闭包,结果是一个同样大小的、包含了每个变换后元素的列表。

我们在这里还想介绍最后的一个方法,就是“foldLeft”方法,它把状态从一个元素传播到另一个元素。比如说,要算出一个列表里所有元素的和,你需要累加,并在切换元素的时候保存中间的计数:

scala> val sumOfNumbers=numbers.foldLeft(0) {( total,element)=>
 
     | total+element
 
     | }
 
sumOfNumbers: Int = 6
 
scala>

作为第一个变量传递给 foldLeft 的值0是初始值(也就是说在把函数用到第一个列表元素的时候 total=0)。

(total,element) 代表了一个 Tuple2,在 Scala 里这是一个二元组(就像要表示三维空间坐标,经常要用到 Tuple3(x,y,z) 等等)。

在合计时,Scala 编程接口实际上提供了一个“sum”方法,这样上一条语句就可以写成:

scala> val sumOfNumbers=numbers.sum
 
sumOfNumbers: Int = 6
 
scala>

还有许多其他的类似的集合变换方法,可以参照 scaladoc API。你也可以把这些方法组合起来(例如:numbers.reverse.filter……),让代码更加简洁,不过这样会影响可读性。

最后,{ n => n + 10 } 还可以简单地写成 (_ + 10),也就是说如果输入参数只是用于你调用的方法,则不需要声明它;在我们的例子里,“n”被称为匿名变量,因为你可以把它用任何形式来代替,比如说“x”或者“number”,而下划线则表示一处需要用你的列表的每个元素来填补的空白。(与“_”的功能类似,Groovy保留了关键字“it”,而Python则使用的是“self”)。

scala> val hightNumbers=numbers.map(_+10)
 
hightNumbers: List[Int] = List(11, 12, 13)
 
scala>

在介绍了对整数的基本处理后,我们可以迈入下一个阶段,看看复杂对象集合的变换,例如,使用我们上面所定义的Employee 类:

scala> val allEmployees=List(luke, anna, guest, yoda, thomas)
 
allEmployees: List[Employee] = List(Employee(Luke,30,LucasArt), Employee(Anna,30
 
,DevCode), Employee(guest,30,DevCode), Employee(Yoda,800,LucasArt), Employee(Tho
 
mas,41,DevCode))
 
scala>

包含上面五个元素的列表,我们可以应用匿名方法,用一个条件来过滤,符合条件的员工——比如属于 DevCode 的雇员:

scala>  val devCodeEmployees=allEmployees.filter {_.company=="DevCode"}
 
devCodeEmployees: List[Employee] = List(Employee(Anna,30,DevCode), Employee(gues
 
t,30,DevCode), Employee(Thomas,41,DevCode))
 
scala> val oldEmployees=allEmployees.filter(_.age>100).map(_.name)
 
oldEmployees: List[String] = List(Yoda)
 
scala>

假设我们手头的 allEmployees 集合是我们使用SQL查询获得的结果集,查询语句可能类似于“SELECT * FROM employees WHERE company = ‘DevCode’ ”。现在我们可以把 List[Employee] 变换到以 company 名称作为键、属于该公司的所有员工的列表作为值的 Map 类型,这样就可以把雇员按 company 来排序:

scala> val sortedEmployees=allEmployees.groupBy(_.company)
 
sortedEmployees: scala.collection.immutable.Map[String,List[Employee]] = Map(Dev
 
Code -> List(Employee(Anna,30,DevCode), Employee(guest,30,DevCode), Employee(Tho
 
mas,41,DevCode)), LucasArt -> List(Employee(Luke,30,LucasArt), Employee(Yoda,800
 
,LucasArt)))
 
scala>

每一个列表已经作为一个值存入了(键——值)哈希表,为了示范如何进一步处理这些列表,可以设想我们需要计算每个公司的雇员平均年龄。

这具体意味着我们必须要计算每个列表的每个雇员的的“age”字段的和,然后除以该列表中雇员的数量。让我们先计算一下 DevCode

scala> devCodeEmployees
 
res3: List[Employee] = List(Employee(Anna,30,DevCode), Employee(guest,30,DevCod
 
), Employee(Thomas,41,DevCode))
 
scala> val devCodeAges=devCodeEmployees.map(_.age)
 
devCodeAges: List[Int] = List(30, 30, 41)
 
scala> val devCodeAverageAge=devCodeAges.sum / devCodeAges.size
 
devCodeAverageAge: Int = 33
 
scala>

回到我们的 Map (key:String ->value:List[Employee]),下面是个更加一般性的例子。我们现在可以归并并计算每个公司的平均年龄,要做的只是写几行代码:

scala> val averageAgeByCompany = sortedEmployees.map{ case(key,value)=>
 
     | value(0).copy(name="average",age=(value.map(_.age).sum)/value.size)}
 
averageAgeByCompany: scala.collection.immutable.Iterable[Employee] = List(Employ
 
ee(average,33,DevCode), Employee(average,415,LucasArt))
 
scala>

这里的“case(key,value)”说明了Scala提供的模式匹配机制是多么强大。请参考Scala的文档来获取更多的信息。

到这里我们的任务就完成了。我们实现的是一个简单的Map-Reduce算法。由于每个公司雇员的归并是完全独立于其他公司,这个算法非常直观地实现了并行计算。

在后面的附录里给出了此算法的等价的实现,分为Java版本和Scala版本。

参考资料


附录


Map Reduce.Java

public class Employee {
 
    final String name;
    final Integer age;
    final String company;
 
    public Employee(String name, Integer age, String company) {
        this.name = name == null ? "guest" : name;
        this.age = age == null ? 30 : age;
        this.company = company == null ? "DevCode" : company;
    }
 
    public String getName() {
        return name;
    }
 
    public int getAge() {
        return age;
    }
 
    public String getCompany() {
        return company;
    }
 
    @Override
    public String toString() {
        return "Employee [name=" + name + ", age=" + age + ",
               company="
               + company + "]";
    }
}
 
class Builder {
    String name, company;
    Integer age;
 
    Builder(String name) {
        this.name = name;
 
    }
 
    Employee build() {
        return new Employee(name, age, company);
    }
 
    Builder age(Integer age) {
        this.age = age;
        return this;
    }
 
    Builder company(String company) {
        this.company = company;
        return this;
    }
}
 
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimaps;
 
public class MapReduce {
 
    public static final void main(String[] args) {
        Employee guest = new Builder("Guest").build();
        Employee anna = new Builder("Anna").build();
        Employee thomas = new Builder("Thomas").age(41).build();
        Employee luke = new
            Builder("Luke").company("LucasArt").build();
        Employee yoda = new
            Builder("Yoda").age(800).company("LucasArt").build();
 
        Collection<Employee> employees = new ArrayList<Employee>();
        employees.add(guest);
        employees.add(anna);
        employees.add(thomas);
        employees.add(luke);
        employees.add(yoda);
 
        ImmutableListMultimap<String, Employee>
            personsGroupByCompany = Multimaps.index(employees, new Function<Employee,String>() {
 
                public String apply(Employee person) {
                   return person.getCompany();
                 }
 
              });
 
        ImmutableSet<String> companyNamesFromMap =
            personsGroupByCompany.keySet();
 
        List<Employee> averageAgeByCompany = new
            ArrayList<Employee>();
 
        for(String company: companyNamesFromMap) {
             List<Employee> employeesForThisCompany =
                personsGroupByCompany.get(company);
             int sum = 0;
             for(Employee employee: employeesForThisCompany) {
                 sum+= employee.getAge();
             }
             averageAgeByCompany.add(new
                Employee("average",sum/employeesForThisCompany.size(),company));
     }
     System.out.println("Result: "+averageAgeByCompany);
 
    }
}

MapReduce.scala:

case class Employee(name: String = "guest", age: Int = 30, company: String = "DevCode")
 
    object MapReduce {
        def main(args: Array[String]): Unit = {
 
        val guest = Employee()
        val anna = Employee("Anna")
        val thomas = Employee("Thomas", 41)
        val luke = Employee("Luke", company = "LucasArt")
        val yoda = luke.copy("Yoda", age = 800)
 
        val allEmployees = List(luke, anna, guest, yoda, thomas)
        val sortedEmployees = allEmployees.groupBy(_.company)
        val averageAgeByCompany = sortedEmployees.map { case (key, value) =>
            value(0).copy(name = "average", age = (value.map(_.age).sum) / value.size)
      }
        println("Result: "+averageAgeByCompany)
    }
}

关于作者

Scala基础入门翻译

Thomas Alexandre是DevCode的高级咨询顾问,专注于Java和Scala软件开发。他热爱技术,热衷于分享知识,永远在寻求方法、采用新的开源软件和标准来实现更加有效的编程。在十四年的Java开发经验之外,过去几年他集中精力在新的编程语言和Web框架上,例如Groovy/Grails和Scala/Lift。Thomas从法国里尔大学获得了计算机科学博士学位,在卡耐基梅隆大学度过了两年的博士后研究生涯,研究方向是安全和电子商务。

分享到:
评论

相关推荐

    基于Python Java Scala语言的MapReduce及Spark分词及词频统计效率对比

    通过使用三种不同语言编写来编写分词及词频统计程序,比较在大数数据背景下,MapReduce和Spark对三种语言的适应性及其各自的效率对比;项目均采用IDEA+Maven进行构建,相关依赖均在对应pom.xml中给出; 软件架构 ...

    scala-2.12.11.tgz

    Hadoop是另一个在大数据领域广泛使用的框架,它提供了一个分布式文件系统(HDFS)和MapReduce计算模型。虽然Hadoop不直接依赖Scala,但很多基于Hadoop的项目,如Apache Spark,都使用Scala编写,因此理解Scala对于...

    mapreduce项目 数据清洗

    MapReduce的编程模型通常使用Java实现,但也有其他语言如Python和Scala的实现,如Apache Hadoop的MapReduce API(Hadoop Streaming)允许使用任何可执行程序作为mapper和reducer。在本项目中,开发者可以选择最适合...

    scala-2.11.12.tgz

    虽然Hadoop的原生编程模型是基于Java,但Scala提供了更简洁、高效的语法,使得开发人员可以更容易地编写MapReduce作业。 2. **Spark**:Spark是建立在Hadoop之上的数据处理引擎,它最初就是用Scala编写的,并且深度...

    MapReduce和Scalding中的电影推荐等_Scala_下载.zip

    Scalding则是Twitter开源的一个基于Scala的Hadoop MapReduce库,它提供了更加高级和易用的API,使得开发人员可以利用Scala的强大功能来编写MapReduce作业。Scalding利用了Scala的函数式编程特性,使得代码更简洁、...

    scala-2.10.7.zip,scala-2.11.11.zip

    Hadoop是另一个大数据处理框架,虽然Hadoop主要使用Java编写,但通过使用Scala,开发者可以利用其强大的编程特性,如模式匹配和高阶函数,编写出更简洁、可读性强的Hadoop MapReduce作业。 在使用这两个版本时,应...

    MapReduce2.0程序设计多语言编程(理论+实践)

    Scala由于与Java API的兼容性,也是MapReduce的常见选择。 实战部分: 1. **MapReduce实例**:实战部分通常会涵盖不同类型的MapReduce应用,例如网页链接分析、日志分析、文本挖掘等。这些实例将展示如何设计和实现...

    scala-2.12.4_.zip

    Hadoop的核心组件包括HDFS和MapReduce,而Scala作为一门强大且灵活的编程语言,经常被用来编写Hadoop MapReduce作业,因为它提供了比Java更简洁的语法和函数式编程的优势。 Scala 2.12.x系列是一个稳定版本,包含了...

    Scala和Spark大数据分析函数式编程、数据流和机器学习

    Spark以其内存计算模型著称,极大地提高了数据处理速度,比传统的Hadoop MapReduce快上许多倍。Spark的核心组件包括:Spark Core(基础执行引擎)、Spark SQL(用于SQL查询和数据集成)、Spark Streaming(处理实时...

    scala-2.9.3.tgz

    Spark的核心设计目标是提供一种比Hadoop MapReduce更快的并行计算模型。Spark通过内存计算(In-Memory Computing)显著提高了数据处理速度,它可以将数据缓存在内存中,从而避免了Hadoop每次计算都要进行磁盘I/O的...

    读书笔记:用java模拟scala函数式编程模拟实现mapreduce.zip

    读书笔记:用java模拟scala函数式编程模拟实现mapreduce

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    此外,Hadoop 还支持使用其他编程语言,如 Python 和 Scala,通过 Pig 或 Hive 等高级接口编写 MapReduce 作业,简化开发过程。然而,对于更复杂的逻辑,Java 仍然是首选,因为它提供了更大的灵活性和性能。 在项目...

    java大数据作业_5Mapreduce、数据挖掘

    【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...

    scala写的第一个wordcount例子

    在Scala中,我们可以实现一个简单的MapReduce模拟,将文本数据分割成单词(map阶段),然后统计每个单词的出现次数(reduce阶段)。 4. 分词(Tokenization): 在WordCount程序中,首先需要将输入的文本分解成...

    Scala程序设计(第2版)

    18.2 用Scala改善MapReduce 387 18.3 超越MapReduce 392 18.4 数学范畴 393 18.5 Scala数据工具列表 394 18.6 本章回顾与下一章提要 394 第19章 Scala动态调用 396 19.1 一个较为激进的示例:...

    大数据MapReduce Ubuntu Linux上的Hadoop Scala by Maven intellj Idea

    标题中的“大数据MapReduce Ubuntu Linux上的Hadoop Scala by Maven intellj Idea”表明,这篇文章将深入探讨如何在Ubuntu Linux操作系统上使用Scala编程语言,通过Maven构建工具来开发Hadoop MapReduce项目,并在...

    scala和spark的安装

    Spark则是基于内存计算的大数据并行处理框架,能够提供比Hadoop MapReduce更快的数据处理速度。本文将详细介绍Scala和Spark的安装过程,包括环境变量配置、启动Spark服务及分发节点等内容。 #### 二、准备工作 在...

    scala与spark文档合集

    Spark的核心特性是其内存计算,通过将数据存储在内存中,实现了比Hadoop MapReduce更快的迭代计算,从而在大数据分析领域取得了显著的优势。 **Scala知识点** 1. **类型系统**:Scala具有强类型和静态类型,支持...

    scala+spark

    它的设计理念是提供比Hadoop MapReduce更快的计算速度,同时保持易用性和灵活性。Spark的核心是弹性分布式数据集(Resilient Distributed Datasets,RDDs),这是一种容错的、可分区的数据结构,可以在内存中进行...

Global site tag (gtag.js) - Google Analytics