`
cjcrobin
  • 浏览: 12859 次
  • 性别: Icon_minigender_1
  • 来自: 厦门
社区版块
存档分类
最新评论

Spark通过CQL读取写入Cassandra数据

阅读更多

之前两篇文章,简单的介绍了使用Spark通过CLI来进行读写Cassandra数据。在这一篇中,将介绍使用新的CQL来进行读取写入数据。

 

第一步,还是一样的去配置SparkContext,唯一的区别是使用的InputFormat不同。在Cli中使用的是ColumnFamilyInputFormat,而在这里将使用的是CqlPagingInputFormat。除了这两个类之外,还有CqlRagingRecordReader。所有的这些类都可以在apache-cassandra-<version>.jar中的org.apache.cassandra.haddop中找到。还有就是除了跟CLI一样有ConfigHelper可以使用,CQL还有自己的CqlConfigHelper。

具体如下:

 

val sc = new SparkContext("local[4]", "whitebox_test")
val job = new Job()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost")
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160")
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "whitebox_test", "words")
ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner")
CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),"paragraph=english")

上面这段代码跟之前的不同之处在于多了最后一行的CqlConfigHelper,CQL其实形式上面有点类似于SQL的,所以最后添加的一句就类似于SQL中的"where paragraph=english"。

 

 

然后我们需要获取从Cassandra中读入数据的RDD,这步也和之前说道的一样,只是outputFormat不同而已,如下:

 

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
    classOf[CqlPagingInputFormat],
    classOf[Map[String, ByteBuffer]],//key
    classOf[Map[String, ByteBuffer]])//value

 相关的信息可以在这里找到。这样这里casRdd的格式应该是RDD[(Map[String, ByteBuffer],Map[String, ByteBuffer])]。第一个map是key的对应关系,这里的key包括了partition key和cluster columns。Cassandra和关系型数据库一样在使用CQL的时候可以指定一个组为key。在关系型数据库中,可以指定如"CONSTRAINT pk_personid primary key (id, lastname)"。Cassandra中也可以做出类似的指定如"primary key(a, b, c, d)",这其中a就是partition key,bcd就是cluster columns。第二个map是非key部分的对应关系。然后就可以使用这个RDD了,如下

 

val paraRdd = casRdd flatMap {
    case (key, value) => {
        value.filter(v => {
            (v._1).compareTo("content") == 0
        }).map(v => ByteBufferUtil.string(v._2))
    }
}.map(content => (content, 1)).reduceByKey(_+_)

 以上的代码完成了最基本的字数统计的功能,和之前一样,就是计算了每个单词出现的次数。

 

接着如果需要写入数据到Cassandra,还得需要设定Output的config,跟之前的类似唯一不同的是需要指定输入语句。具体如下:

 

job.setOutputFormatClass(classOf[CqlOutputFormat])
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160")
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost")
ConfigHelper.setOutputColumnFamily(job.getConfiguration, KEYSPACE, "stats")
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner")
val ps = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET num = ?, largerfive=?, content = ?, ip=?"
CqlConfigHelper.setOutputCql(job.getConfiguration(), ps)

 这里比较有意思的是,无法使用insert语句,只能使用update语句,而且也不需要指定key之类的信息,key的信息是在RDD中进行指定的。这里的?其实就是preparedStatement中的?一样的,只是占位符。在后面的RDD中才去指定值。如下

 

counts.map{
    case (word, count) => {

        val partitionMap = new util.LinkedHashMap[String, ByteBuffer]{}
        if(word!="") {
            partitionMap.put("word", ByteBufferUtil.bytes(word))
        } else {
            partitionMap.put("word", ByteBufferUtil.bytes("empty"))
        }
        val columnList = new ArrayList[ByteBuffer]
        columnList.add(ByteBufferUtil.bytes(count))
        if(count>5){
            columnList.add(ByteBuffer.wrap(TRUE))
        } else {
            columnList.add(ByteBuffer.wrap(FALSE))
        }
        columnList.add(ByteBufferUtil.bytes("Statistics for "+word))
        val address = InetAddress.getByAddress(ip);

        columnList.add(ByteBufferUtil.bytes(address))
        (partitionMap, columnList)
    }
}.saveAsNewAPIHadoopFile(KEYSPACE,classOf[Map[String, ByteBuffer]],
    classOf[List[ByteBuffer]],classOf[CqlOutputFormat], job.getConfiguration()

这里也是需要注意的,partitionMap就是key的对应,这个还是比较好理解的。但是非key部分这里使用的是List[ByteBuffer]。这样子的话,这个list中的顺序就必须和之前那个update语句中声明的顺序一致了。否则将会抛出错误。还有值得说的就是,ByteBufferUtil里面有个function是给IP地址使用的,但是我们需要先将ip String转换成InetAddress的形式才能够使用。

 

到了这里,就基本到spark和cassandra通讯的部分基本说完了。还有一些复杂的设定,可以根据自己需求来设置。

周六晚上,enjoy酷 

 

 

 

 

0
0
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    在Spark上使用CLI读取Cassandra数据

    通过这个连接器,用户可以直接读取和写入Cassandra表,无需将数据加载到HDFS或其他存储系统。 4. **使用CLI读取Cassandra数据**:CLI(命令行接口)是操作Cassandra的一种方式,虽然在Spark中通常我们会使用Spark ...

    CQL3.0 for Cassandra 1.2

    CQL(Cassandra Query Language)是为Apache Cassandra数据库设计的查询语言,它是一种类似于SQL的声明式语言,用于在Cassandra数据库中进行数据查询和管理操作。CQL3.0是CQL的第三个主要版本,其包含了一些新增特性...

    cassandra cql 3.1

    Apache Cassandra是一个开源的、分布式的NoSQL数据库管理系统,特别适用于需要处理大量数据且要求高可用性的应用场景。...通过这个文档,开发者可以更高效地使用CQL来管理和查询Cassandra数据库中的数据。

    cassandra cql3

    以上是 Cassandra CQL3 的核心知识点概览,涵盖了数据模型的设计、CQL 的基本使用、数据的插入和更新、查询操作以及表和索引的管理等方面。掌握这些内容对于有效利用 Cassandra 来构建高性能的应用程序至关重要。

    Cassandra:Cassandra数据模型与CQL教程.docx

    Cassandra:Cassandra数据模型与CQL教程.docx

    java导出cassandra数据

    使用`session.execute()`方法执行CQL(Cassandra查询语言)语句,获取表中的数据。例如,导出整个表的数据: ```java String query = "SELECT * FROM your_table"; ResultSet resultSet = session.execute(query...

    spark-cassandra-connector:DataStax Spark Cassandra连接器

    该库使您可以将Cassandra表公开为Spark RDD和数据集/数据框架,将Spark RDD和数据集/数据框架写入Cassandra表,并在Spark应用程序中执行任意CQL查询。 与Apache Cassandra 2.1或更高版本兼容(请参见下表) 与...

    分布式存储系统:Cassandra:Cassandra数据模型与CQL语言.docx

    分布式存储系统:Cassandra:Cassandra数据模型与CQL语言.docx

    spark-cassandra-bulkloader:使用 spark 将数据上传到 cassandra 的测试应用程序

    使用 spark 将数据上传到 cassandra 的测试应用程序,例如使用或 。 此示例中的代码取自 Cassandra 2.0.11,但具有对该项目中输出格式和记录编写器副本的。 笔记 示例数据和 schema.cql 位于 src/main/resources/...

    2019云栖大会 Cassandra cql以及业务场景介绍.pdf

    CQL(Cassandra Query Language)是 Cassandra 的查询语言,提供了类 SQL 的查询方式,支持丰富的数据结构和操作。 CQL 组成 * DDL(Data Definition Language):用于定义数据库 schema,例如 CREATE TABLE、DROP...

    存储数据(cassandra)

    Cassandra查询语言(Cassandra Query Language,简称CQL)是面向用户的SQL-like接口,简化了数据操作。此外,Cassandra还提供了Java驱动、Python驱动等多语言的客户端库,方便开发者进行数据交互。 ### 6. 安全与...

    Cassandra数据模型

    通过理解Cassandra的数据模型,开发者可以更好地设计和优化数据存储,以适应大数据环境下的高性能、高可用和可扩展的需求。在实际应用中,根据业务场景选择合适的分区策略、复制策略以及一致性级别,是充分利用...

    connect-cassandra-cql:使用Cassandra CQL3二进制协议进行连接的会话存储

    连接cassandra-cql 使用官方Cassandra CQL3二进制协议进行连接的会话存储。 安装 npm install connect-cassandra-cql 用法 快递4 var express = require ( 'express' ) , cookieParser = require ( 'cookie-...

    Cassandra JDBC Driver

    Cassandra JDBC Driver支持CQL语法,开发者可以通过执行CQL语句来进行数据查询、插入、更新和删除等操作。 4. **性能优化**:Cassandra JDBC Driver在设计时考虑了性能因素,它能够高效地处理大量的数据请求,减少...

    Cassandra权威指南【中文版】

    最后,本书可能会涵盖Cassandra与其他技术的集成,如Hadoop、Spark等大数据处理框架,以及如何在微服务架构中使用Cassandra作为持久化存储。 总的来说,《Cassandra权威指南》中文版是一本全面覆盖Cassandra技术的...

    storm-cassandra-cql-0.1.2.zip

    2. **Cassandra CQL**:Cassandra Query Language是Apache Cassandra数据库的查询语言,类似于SQL,但为分布式NoSQL数据存储设计。CQL简化了对Cassandra的数据操作,提供了更接近传统关系型数据库的用户体验。 3. *...

    Learning_Apache_Cassandra

    比如,文档中提到“写入数据不产生反馈”、“部分插入”、“缺失行”和“分页检索结果”的概念。 4. 数据模型的发展,它强调了为Cassandra设计高效数据模型的重要性。 5. 组织相关数据的方式,例如创建带有复合...

    MariaDB Cassandra interoperability Cassandra Storage Engine in MariaDB

    Cassandra是一个分布式NoSQL数据库,设计用于处理大规模数据,特别是在高写入和读取负载下。它基于列族存储模型,类似于表格结构,但有其独特之处。Cassandra支持键值对存储、有限范围扫描、可选的灵活模式、预定义...

    cassandra-cql3-pagination:本教程代码显示如何仅使用CQL3在Cassandra上进行分页

    CQL(Cassandra Query Language)是Cassandra的一种查询接口,类似于SQL,但针对分布式数据进行了优化。CQL3 提供了更易于理解和使用的语法,支持现代数据库的许多功能,如表、主键和索引等。 在这个“cassandra-...

    cassandra-gocqltest:golang使用cql操作cassandra的例子

    gocqltest 自己写的golang一些使用cql操作cassandra的例子。 说明 cqllongtimewrite.go : 多线程长时间写入cassadra cqlbatchwrite.go : 使用batch提交数据到cassandra cqlread.go : cql读取数据

Global site tag (gtag.js) - Google Analytics