上一篇(隔得实在有点远)讲到了通过使用Cassandra原生的CLI接口将数据读入了Spark的RDD中,在这篇中,我们将了解如何将数据通过Spark的RDD写入到Cassandra中。
与读取相同的步骤,我们一开始需要初始化SparkContext,以及使用的Cassandra实例的地址,端口,keyspace,columnfamily和partitioner。如下
val sc = new SparkContext("local[3]", "casDemo") val job = new Job() job.setOutputFormatClass(classOf[ColumnFamilyOutputFormat]) ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost") ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160") ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")//casDemo是keyspace,和sc中的casDemo没有任何关系 ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
这样子,最基本的配置就已经配置好了。接下来就可以通过RDD来进行写入了。在这里我们假设已经存在一个RDD[(String, Int)],变量名为counts。其中String代表的是一个单词,比如“China”,Int代表的是出现的次数,如1,2.....。然后可以进行如下操作
counts.map{ case (word, count) => { //store the StringType val colWord = new org.apache.cassandra.thrift.Column()//cli通过Thrift访问,所以我们也需要libthrift这个jar包 colWord.setName(ByteBufferUtil.bytes("word"))//column名字为word colWord.setValue(ByteBufferUtil.bytes(word ))//此column值为word colWord.setTimestamp(System.currentTimeMillis()) //store the LongType val colCount = new org.apache.cassandra.thrift.Column() colCount.setName(ByteBufferUtil.bytes("count")) colCount.setValue(ByteBufferUtil.bytes(count.toLong)) colCount.setTimestamp(System.currentTimeMillis()) //store the BooleanType val colmorethan = new org.apache.cassandra.thrift.Column() colmorethan.setName(ByteBufferUtil.bytes("larger5")) if(count>5) { colmorethan.setValue(TRUE) } else { colmorethan.setValue(FALSE) } colmorethan.setTimestamp(System.currentTimeMillis()) //store the FloatType val colPercentage = new org.apache.cassandra.thrift.Column() colPercentage.setName(ByteBufferUtil.bytes("percentage")) colPercentage.setValue(ByteBufferUtil.bytes(1.22.toFloat)) colPercentage.setTimestamp(System.currentTimeMillis()) //store the DoubleType val colRate = new org.apache.cassandra.thrift.Column() colRate.setName(ByteBufferUtil.bytes("rate")) colRate.setValue(ByteBufferUtil.bytes(1.888888)) colRate.setTimestamp(System.currentTimeMillis()) val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis) val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() :: Nil mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(0).column_or_supercolumn.setColumn(colWord) mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(1).column_or_supercolumn.setColumn(colCount) mutations.get(2).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(2).column_or_supercolumn.setColumn(colmorethan) mutations.get(3).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(3).column_or_supercolumn.setColumn(colPercentage) mutations.get(4).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(4).column_or_supercolumn.setColumn(colRate) (outputkey, mutations) } }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]], classOf[ColumnFamilyOutputFormat], job.getConfiguration)
这里使用了RDD的map函数以及scala的case class,这样word就对应着单词,ercount就对应着出现次数。在Cassandra中column name以及timestamp都是必须的,column name是可选的,但是这里我们全部都填上了,主要是将每个基本类型都是用了下。其中比较tricky的是bool类型,因为RDD会分出很多partition在各个节点上,每个节点也需要和Cassandra交流,所以需要将所有的数据都转化成Byte类型。在这里我们ByteBufferUitl工具包完成了这个操作。但是false和true到底怎么表达呢?这个问题我也是尝试了很多次才搞清楚的。如下
val TRUE = new Array[Byte](1) TRUE(0) = 1.toByte val FALSE = new Array[Byte](1)//default to 0
0就是false,非0就是ture,这个还是比较无可厚非的。以为需要的是Byte类型所以将1变成了Byte类型也是可以理解的,唯一让我比较困惑的是为什么需要搞出一个长度为1的数组。因为时间过得比较久了,我也不记得我当初是怎么搞出来的了。Okay,回到上一段代码。所有这些都设置好了之后,根据Cassandra提供的ColumnFamilyOutputFormat接口我们需要提供给一个二元组,这个二元组中第一个参数是ByteBuffer类型的,outputkey正好是,第二个参数应该是List[Mutation]类型的,正好我们设置的mutations满足要求,所以就返回(outputkey, mutations)。接下来就是使用hadoop的newapi完成任务了,不过这里需要注意的就是,为了使用这个函数,需要导入SparkContext._中的implicit函数,将使用PairFunctionRDD中的功能。
这就是基本的通过CLI接口来写入Cassandra。在下一篇中,会讲解使用Cassandra新的CQL接口进行读写。
Have a nice weekend!
相关推荐
通过理解以上知识点,开发者可以有效地在Spark上利用CLI和Scala代码读取和处理Cassandra中的数据。这对于大数据分析和实时数据处理项目尤其有用。在实际操作中,还需要考虑性能优化、数据分区策略以及错误处理等复杂...
创建Gradle项目,引入依赖创建Spark Session连接写入Cassandra数据库读取Cassandra数据库Spark注册SQL 临时视图执行Dis
Cassandra CLI是Apache Cassandra数据库系统的一个命令行工具,它提供了与Cassandra集群交互的能力,包括连接到远程节点、创建或更新模式(schema)、设置和检索记录及列,以及查询节点和集群元数据。这个工具主要...
该库使您可以将Cassandra表公开为Spark RDD和数据集/数据框架,将Spark RDD和数据集/数据框架写入Cassandra表,并在Spark应用程序中执行任意CQL查询。 与Apache Cassandra 2.1或更高版本兼容(请参见下表) 与...
2014年Spark Summit于6月30日至7月2日在美国旧金山举行。Spark、Shark、Spark流媒体和相关项目及产品的主要用户聚集一地,共同探讨Spark项目开发的方向,以及Spark在各种各样应用程序中的实践情况。
Big Data SMACK: A Guide to Apache Spark, Mesos, Akka, Cassandra, and Kafka by Raul Estrada, Isaac Ruiz English | ISBN: 1484221745 | 2016 | EPUB | 264 pages | 2.35 MB This book is about how to ...
kafka-sparkstreaming-cassandra, 用于 Kafka Spark流的Docker 容器 用于 Kafka Spark流的Docker 容器这里Dockerfile为实验 Kafka 。Spark流( PySpark ) 和Cassandra设置了完整的流环境。 安装Kafka 0.10.2.1用于 ...
5. **Data Source API**:Spark SQL引入了统一的数据源接口,使得它可以透明地读取和写入各种数据格式,如Parquet、JSON、CSV、JDBC等。 6. **性能优化**:Spark SQL采用了 Catalyst 编译器进行查询优化,包括代码...
Stratio高级架构师Luca Rosellini:在此次峰会上和其同事Oscar Méndez、Alvaro Agea重点介绍Stratio的主要客户、为什么使用Cassandra、为什么使用Spark以及举例说明。
2、基于spark streaming+Cassandra的实时分析和监控,包括性能分析、账号安全主动防御。 web部分采用spring boot开发,前端采用angularJS组织页面相关的各个部分,系统的技术和效果在2016~2017年在行业具有一定先进...
使用 spark 将数据上传到 cassandra 的测试应用程序,例如使用或 。 此示例中的代码取自 Cassandra 2.0.11,但具有对该项目中输出格式和记录编写器副本的。 笔记 示例数据和 schema.cql 位于 src/main/resources/...
如果要通过JDBC查询Cassandra数据,但要使用Spark SQL的功能进行数据处理,则需要此应用程序。 此应用程序(CSJB)是Spark应用程序,它将在Spark SQL中自动将所有Cassandra表注册为架构RDD,并启动嵌入式Apache ...
您正在构建的机器上的 jdk 和 sbt,某些机器上的 spark 和 cassandra cqlsh --file cassandra-example.cql 编辑 cassandra-example.conf,为您的环境设置 master 和 cassandra 连接信息。 要构建一个包含所有...
通过这样的集成,Spring Boot应用可以利用Spark的强大计算能力对Cassandra中的数据进行实时分析和处理,同时保持Spring Boot的灵活性和易用性。这样的组合在大数据应用中具有广泛的应用前景,特别是在需要实时分析、...
标题中的"Spark-Kafka-Cassandra-Airflow-Docker"揭示了这个项目涉及到多个重要的大数据处理和容器化技术。让我们逐一探讨这些技术的核心概念及其在实际应用中的重要性。 1. **Spark**: Apache Spark是一个分布式...
Scala 中的 Apache Spark/Apache Cassandra 应用程序 介绍 这是一个存储库,其中包含用于将 Apache Spark 与 Apache Cassandra 一起使用的“入门”类 Scala 代码。 此存储库中给出的所有代码库都在以下版本的软件上...
当这两者结合,通过datastax spark-cassandra-connector,我们可以构建出强大的数据处理系统。本篇将深入解析datastax spark-cassandra-connector,以及如何利用它来实现加利福尼亚州圣塔克拉拉的Cassandra Summit ...
Cassandra,优异的列式存储NoSQL,在写入操作上难逢敌手。自本期《问底》,许鹏将结合实际实践,带大家打造一个由Spark和Cassandra组成的大数据分析平台。笔者在源码阅读的过程中秉持着一种非常简单的思维模式,就是...