`
cjcrobin
  • 浏览: 12725 次
  • 性别: Icon_minigender_1
  • 来自: 厦门
社区版块
存档分类
最新评论

在Spark上使用CLI读取Cassandra数据

阅读更多

最近在研究将Spark架设到Cassandra之上。发现这方面的信息比较少,在学习的过程中也遇到了不少问题,因此在此记录下,也和大家分享。此例为最经典的WordCount示例。

首先我先说下我所使用的各种环境和版本。由于Spark和Cassandra更新较快,如果之后版本有异可能运行不能成功需要一些微调。

暂时使用的是Windows 7, 之后会转到Linux平台,但是这个影响不大。使用的是Scala2.9.3,Spark 0.8, Cassandra 1.2.10,sbt 0.13.0,Java 7。

 

首先需要我们自己生成下Spark的jar包。这个需要我们运行sbt命令来得到。转到Spark所在的目录然后运行

sbt\sbt assembly(如果是Linux的话,应该是sbt/sbt assembly)。运行结束后,可以在spark\assembly\target\scala-2.9.3下面发现一个名字类似于spark-assembly-0.8.0-*.jar的包。我们需要将这个包加入到编译路径中。

 

其次我们需要在Cassandra中插入一些数据。首先需要在命令行中运行cassandra/bin下面的cassandra命令来开启服务。然后运行cassandra-cli,这样我们就能够输入我们需要的数据。本文结尾可以找到此例使用的数据示例。

 

然后我们就可以开始了。

val sc = new SparkContext("local[3]", "casDemo")

 新建一个Spark上下文对象,第一个参数是将要连接到的Cluster地址,这里我们仅仅使用localhost来运行,所以可以简单设置为local[*],*为1,2,3之类的数字。第二个只是一个显示参数,可以随意。

val job = new Job()

 新建一个Hadoop job。因为Spark是没有提供直接的API访问Cassandra,但是Spark是建于Hadoop之上,Cassandra提供了访问Hadoop的接口所以我们需要先创建一个Hadoop的job来连接它两。

job.setInputFormatClass(classOf[ColumnFamilyInputFormat])

 设置input的类,这个没有什么其他可选项,这是Cassandra默认的jar包中提供的接口。

 

ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost")
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160")
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")

通过Cassandra提供的静态类ConfigHelper来设置相对应的一些参数。“casDemo"是这个例子使用的keyspace,words是column family。9160是Cassandra默认使用的端口。最后一个是设置多台机器运行时使用的Partitioner hash算法。有三个值可以选择,分别是org.apache.cassandra.dht.Murmur3Partitioner,org.apache.cassandra.dht.RandomPartitioner和ByteOrderedPartitioner。第三个已经不推荐使用了。在这里我们使用第一个。

 

此外需要说明的是,还有其他参数可以设置比如slice predicate之类的,这里略过了,仅仅介绍了最简单的设置。

 

然后我们就能够去创建Spark独有的RDD对象了,并使用它来完成我们要的工作。

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
            classOf[ColumnFamilyInputFormat],
            classOf[ByteBuffer],//key
            classOf[SortedMap[ByteBuffer, IColumn]]) //value

 可以看到这里创建了一个RDD对象,第一个参数就是将之间我们配置好的参数,第二个就是之前提到的Cassandra提供的接口,第三和第四个参数其他没有其他的可选项,这两个参数是被ColumnFamilyInputFormat所限制的。这个其实大家看SparkAPI就能了解到,这里不多说。

 

val paraRdd = casRdd flatMap {
    case (key, value) => {
        value.filter(v => {
            ByteBufferUtil.string(v._1).compareTo("paragraph") == 0
        }).map(v => ByteBufferUtil.string(v._2.value()))
    }
}

这里就是运行mapper方法了,就和hadoop中的mapper一个概念。需要说明的是,这里的key就是一个ByteBuffer对象,value就是一个SortedMap[ByteBuffer, IColumn],有没有觉得很熟悉,是的,这两个就是之前创建RDD设置的最后两个参数。这里我做的就是过滤掉colomn name不等于paragraph的,然后将colomn是paragraph的值由Icolumn变成ByteBuffer类型。

 

val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

最后就是reduce方法了。 先用空格将段落打散成单词,然后将单个单词word转化成元组(word, 1)。最后统计每个word出现次数。

 

这样我们就完成了简单的WordCount方法。

我们可以通过以下代码打印出结果来观看。

counts.collect() foreach {
    case (word, count) => println(word + ":" + count)
}

 

这里介绍了通过Spark读取Cassandra数据并进行处理。下一节,会介绍写入数据。

分享到:
评论

相关推荐

    cassandra cli 命令 大全

    Cassandra CLI是Apache Cassandra数据库系统的一个命令行工具,它提供了与Cassandra集群交互的能力,包括连接到远程节点、创建或更新模式(schema)、设置和检索记录及列,以及查询节点和集群元数据。这个工具主要...

    vbay#big-data#14.4Spark-SQL基于Cassandra数据分析编程实例1

    创建Gradle项目,引入依赖创建Spark Session连接写入Cassandra数据库读取Cassandra数据库Spark注册SQL 临时视图执行Dis

    许鹏:使用Spark+Cassandra打造高性能数据分析平台(一)

    Spark,强大的迭代计算框架,在内存数据计算上无可匹敌。Cassandra,优异的列式存储NoSQL,在写入操作上难逢敌手。自本期《问底》,许鹏将结合实际实践,带大家打造一个由Spark和Cassandra组成的大数据分析平台。...

    spark-cassandra-bulkloader:使用 spark 将数据上传到 cassandra 的测试应用程序

    使用 spark 将数据上传到 cassandra 的测试应用程序,例如使用或 。 此示例中的代码取自 Cassandra 2.0.11,但具有对该项目中输出格式和记录编写器副本的。 笔记 示例数据和 schema.cql 位于 src/main/resources/...

    Cassandra的数据模型介绍

    关于Cassandra数据模型的简单介绍

    使用Spark+Cassandra打造高性能数据分析平台

    摘要:Spark,强大的迭代计算框架,在内存数据计算上无可匹敌。Cassandra,优异的列式存储NoSQL,在写入操作上难逢敌手。自本期《问底》,许鹏将结合实际实践,带大家打造一个由Spark和Cassandra组成的大数据分析...

    cassandra-spark-jdbc-bridge:如果要通过JDBC查询Cassandra数据,但想使用Spark SQL的强大功能进行数据处理,则需要此应用程序

    如果要通过JDBC查询Cassandra数据,但要使用Spark SQL的功能进行数据处理,则需要此应用程序。 此应用程序(CSJB)是Spark应用程序,它将在Spark SQL中自动将所有Cassandra表注册为架构RDD,并启动嵌入式Apache ...

    java导出cassandra数据

    Cassandra的数据存储基于表(Table),这些表分布在多个节点上,形成一个分区(Partition)。每个分区由键(Partition Key)决定,而行(Row)由主键(Primary Key)确定。在Java中,我们可以使用DataStax的...

    StratioDeep:一个在Spark 和Cassandra之间的集成层

    Stratio高级架构师Luca Rosellini:在此次峰会上和其同事Oscar Méndez、Alvaro Agea重点介绍Stratio的主要客户、为什么使用Cassandra、为什么使用Spark以及举例说明。

    基于Cassandra的实时气象数据分布式存储系统.pdf

    每个数据项都会被复制到N个节点(N是通过参数配置的副本因子),系统利用数据的复制机制将存储在各节点上的数据复制到其他结点上,实现了数据的高度可获得性和安全性。 1.2 数据模型 Cassandra使用宽列存储模型,...

    Big Data SMACK: A Guide to Apache Spark, Mesos, Akka, Cassandra, and Kafka

    Big Data SMACK: A Guide to Apache Spark, Mesos, Akka, Cassandra, and Kafka by Raul Estrada, Isaac Ruiz English | ISBN: 1484221745 | 2016 | EPUB | 264 pages | 2.35 MB This book is about how to ...

    spark-cassandra-connector:DataStax Spark Cassandra连接器

    该库使您可以将Cassandra表公开为Spark RDD和数据集/数据框架,将Spark RDD和数据集/数据框架写入Cassandra表,并在Spark应用程序中执行任意CQL查询。 与Apache Cassandra 2.1或更高版本兼容(请参见下表) 与...

    java8看不到源码-Spark-Cassandra-Collabfiltering:基于Cassandra中的数据在Spark上与MLLib

    看不到源码spark-cassandra-collabfiltering 此代码与我的 . 它使用基于协作过滤公司员工评级的示例说明了 Spark 上的 MLLib。 它显示了用 Java 7 和 Java 8 编写的完全相同的 Spark 客户端功能。新的 Java 8 特性使...

    Spark大数据分析与实战课后练习答案.rar

    2. **Spark SQL与DataFrame/Dataset API**:介绍如何使用Spark SQL进行结构化数据处理,DataFrame和Dataset API的优势,以及如何与其他数据源(如HDFS、Hive、Cassandra等)集成。 3. **Spark Streaming**:讲解...

    Spark大数据处理数据性能优化学习

    在大数据处理领域,Apache Spark因其高效、易用和弹性等特点,成为了许多企业和开发者首选的工具。本主题聚焦于"Spark大数据处理数据性能优化学习",旨在深入探讨如何在处理海量数据时,通过优化策略提升Spark的运行...

    Cassandra数据模型

    - **分区(Partitioning)**:基于行键对数据进行分区,确保数据分布在集群的不同节点上,提高读写性能。 - **复制(Replication)**:Cassandra通过复制策略确保数据的高可用性。数据会在多个节点上保存副本,以...

    spark项目代码以及数据

    这个压缩包文件包含了Spark项目相关的代码和数据,这意味着我们可以深入探讨Spark在实际项目中的应用,以及如何处理和分析数据。 一、Spark概述 Spark的核心设计理念是提供内存计算,以加速大规模数据处理。相比...

    spark快速数据处理_完整中文版

    Spark是Apache软件基金会下的一个开源大数据处理框架,它以其高效的并行计算能力、内存计算以及易用性在大数据领域广受赞誉。Spark的核心设计理念是提供一个统一的平台,支持多种数据处理模式,包括批处理、交互式...

Global site tag (gtag.js) - Google Analytics