`
cjcrobin
  • 浏览: 12934 次
  • 性别: Icon_minigender_1
  • 来自: 厦门
社区版块
存档分类
最新评论

Spark通过CLI写入Cassandra

阅读更多

上一篇(隔得实在有点远)讲到了通过使用Cassandra原生的CLI接口将数据读入了Spark的RDD中,在这篇中,我们将了解如何将数据通过Spark的RDD写入到Cassandra中。

 

与读取相同的步骤,我们一开始需要初始化SparkContext,以及使用的Cassandra实例的地址,端口,keyspace,columnfamily和partitioner。如下

 

 

val sc = new SparkContext("local[3]", "casDemo")
val job = new Job()
job.setOutputFormatClass(classOf[ColumnFamilyOutputFormat])
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost")
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160")
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")//casDemo是keyspace,和sc中的casDemo没有任何关系
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")

 

这样子,最基本的配置就已经配置好了。接下来就可以通过RDD来进行写入了。在这里我们假设已经存在一个RDD[(String, Int)],变量名为counts。其中String代表的是一个单词,比如“China”,Int代表的是出现的次数,如1,2.....。然后可以进行如下操作

       counts.map{
            case (word, count) => {
                //store the StringType
                val colWord = new org.apache.cassandra.thrift.Column()//cli通过Thrift访问,所以我们也需要libthrift这个jar包
                colWord.setName(ByteBufferUtil.bytes("word"))//column名字为word
                colWord.setValue(ByteBufferUtil.bytes(word ))//此column值为word
                colWord.setTimestamp(System.currentTimeMillis())

                //store the LongType
                val colCount = new org.apache.cassandra.thrift.Column()
                colCount.setName(ByteBufferUtil.bytes("count"))
                colCount.setValue(ByteBufferUtil.bytes(count.toLong))
                colCount.setTimestamp(System.currentTimeMillis())

                //store the BooleanType
                val colmorethan = new org.apache.cassandra.thrift.Column()
                colmorethan.setName(ByteBufferUtil.bytes("larger5"))
                if(count>5) {
                    colmorethan.setValue(TRUE)
                } else {
                    colmorethan.setValue(FALSE)
                }
                colmorethan.setTimestamp(System.currentTimeMillis())
                
                //store the FloatType
                val colPercentage = new org.apache.cassandra.thrift.Column()
                colPercentage.setName(ByteBufferUtil.bytes("percentage"))
                colPercentage.setValue(ByteBufferUtil.bytes(1.22.toFloat))
                colPercentage.setTimestamp(System.currentTimeMillis())

                //store the DoubleType
                val colRate = new org.apache.cassandra.thrift.Column()
                colRate.setName(ByteBufferUtil.bytes("rate"))
                colRate.setValue(ByteBufferUtil.bytes(1.888888))
                colRate.setTimestamp(System.currentTimeMillis())

                val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
                val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() ::  Nil

                mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(0).column_or_supercolumn.setColumn(colWord)

                mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(1).column_or_supercolumn.setColumn(colCount)

                mutations.get(2).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(2).column_or_supercolumn.setColumn(colmorethan)

                mutations.get(3).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(3).column_or_supercolumn.setColumn(colPercentage)

                mutations.get(4).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(4).column_or_supercolumn.setColumn(colRate)

                (outputkey, mutations)
            }
        }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
            classOf[ColumnFamilyOutputFormat], job.getConfiguration)

 

这里使用了RDD的map函数以及scala的case class,这样word就对应着单词,ercount就对应着出现次数。在Cassandra中column name以及timestamp都是必须的,column name是可选的,但是这里我们全部都填上了,主要是将每个基本类型都是用了下。其中比较tricky的是bool类型,因为RDD会分出很多partition在各个节点上,每个节点也需要和Cassandra交流,所以需要将所有的数据都转化成Byte类型。在这里我们ByteBufferUitl工具包完成了这个操作。但是false和true到底怎么表达呢?这个问题我也是尝试了很多次才搞清楚的。如下

    val TRUE = new Array[Byte](1)
        TRUE(0) = 1.toByte
    val FALSE = new Array[Byte](1)//default to 0

 

0就是false,非0就是ture,这个还是比较无可厚非的。以为需要的是Byte类型所以将1变成了Byte类型也是可以理解的,唯一让我比较困惑的是为什么需要搞出一个长度为1的数组。因为时间过得比较久了,我也不记得我当初是怎么搞出来的了。Okay,回到上一段代码。所有这些都设置好了之后,根据Cassandra提供的ColumnFamilyOutputFormat接口我们需要提供给一个二元组,这个二元组中第一个参数是ByteBuffer类型的,outputkey正好是,第二个参数应该是List[Mutation]类型的,正好我们设置的mutations满足要求,所以就返回(outputkey, mutations)。接下来就是使用hadoop的newapi完成任务了,不过这里需要注意的就是,为了使用这个函数,需要导入SparkContext._中的implicit函数,将使用PairFunctionRDD中的功能。

 

这就是基本的通过CLI接口来写入Cassandra。在下一篇中,会讲解使用Cassandra新的CQL接口进行读写。

Have a nice weekend!

 

 

0
0
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    在Spark上使用CLI读取Cassandra数据

    通过这个连接器,用户可以直接读取和写入Cassandra表,无需将数据加载到HDFS或其他存储系统。 4. **使用CLI读取Cassandra数据**:CLI(命令行接口)是操作Cassandra的一种方式,虽然在Spark中通常我们会使用Spark ...

    infinispan-cli-interpreter-8.2.0.Beta2.zip

    这个连接器使得Spark能够无缝地读取、写入和操作Cassandra的数据,实现了大数据处理与NoSQL存储的完美融合。在实际应用中,这为用户提供了强大的实时分析和批量处理能力,特别是在处理大规模、高并发的数据场景下,...

    sparksqlCmd_Spark!_spark_

    SparkSQL集成了Hive,因此它可以读取和写入Hive表,同时支持多种数据源,如HDFS、Cassandra、HBase等。在SparkSQL中,SQL查询被转换为Spark的DAG(有向无环图)任务,然后由Spark执行引擎高效地并行执行。 标题中的...

    spark学习总结

    - **Thrift Server和CLI使用**:如何使用Thrift Server和CLI,以及如何通过JDBC访问Spark SQL数据。 - **Spark SQL与MLlib、GraphX结合使用**:探讨如何将Spark SQL与其他Spark组件(如MLlib和GraphX)结合使用。 - ...

    JEDEC SPEC 最新版 合集 DDR2/DDR3/DDR4/DDR5/LPDDR2/LPDDR3/LPDDR4(X)/LPDDR5(X)

    JESD79-2F DDR2 JESD79-3F DDR3 JESD79-4D DDR4 JESD79-5C DDR5 JESD209-2F LPDDR2 JESD209-3C LPDDR3 JESD209-4E LPDDR4 JESD209-4-1A LPDDR4X JESD209-5C LPDDR5(X)

    COMSOL二维光子晶体角态研究:单胞与超胞能带计算及边界态与角态特性分析,COMSOL二维光子晶体角态研究:单胞与超胞能带计算及边界态与角态特性分析,comsol二维光子晶体角态 单胞能带,超胞能

    COMSOL二维光子晶体角态研究:单胞与超胞能带计算及边界态与角态特性分析,COMSOL二维光子晶体角态研究:单胞与超胞能带计算及边界态与角态特性分析,comsol二维光子晶体角态。 单胞能带,超胞能带,边界态以及角态计算。 ,comsol;二维光子晶体;角态;单胞能带;超胞能带;边界态计算,基于Comsol的二维光子晶体角态及能带边界计算研究

    六自由度机械臂抓取动作仿真与代码解析:抓取动画、关节参数变化及轨迹图解详解,六自由度机械臂抓取动作仿真指南:掌握两套代码实现动画与轨迹图模拟学习攻略,六自由度机械臂抓取动作仿真-8 两套关于抓取动作的

    六自由度机械臂抓取动作仿真与代码解析:抓取动画、关节参数变化及轨迹图解详解,六自由度机械臂抓取动作仿真指南:掌握两套代码实现动画与轨迹图模拟学习攻略,六自由度机械臂抓取动作仿真-8 两套关于抓取动作的代码,包括抓取动画、关节角、角速度、角加速度的变化仿真、以及抓取轨迹图 简单易懂好上手~ ,六自由度机械臂;抓取动作仿真;抓取动画;关节角变化;角速度角加速度;抓取轨迹图;两套代码;简单易懂好上手,六自由度机械臂抓取动作仿真演示:代码与轨迹图解

    ITC网络广播工具软件

    ITC网络广播工具软件

    Multisim四位密码锁电路仿真设计:设定、开锁与声光报警功能演示资料包,Multisim四位密码锁电路仿真设计:设定、输入、开锁与报警功能详解,附源文件、原理说明书与演示视频,multisim四位

    Multisim四位密码锁电路仿真设计:设定、开锁与声光报警功能演示资料包,Multisim四位密码锁电路仿真设计:设定、输入、开锁与报警功能详解,附源文件、原理说明书与演示视频,multisim四位密码锁电路仿真设计 功能: 1.通过拨码开关1进行初始密码设定。 2.通过拨码开关2输入密码,实现开锁判断。 3.如果密码正确,LED绿灯亮,表示开锁。 4.如果密码不正确,LED红灯亮,蜂鸣器鸣叫,声光报警。 资料包含:仿真源文件+原理说明书+演示视频 ,四位密码锁电路、Multisim仿真设计、初始密码设定;拨码开关输入;开锁判断;LED灯显示;声光报警;仿真源文件;原理说明书;演示视频,Multisim四位密码锁电路仿真设计:初始密码设置与智能解锁功能的声光报警展示

    上班摸鱼打卡模拟器微信小程序源码.zip

    俗话说,摸鱼摸的好,上班没烦恼,毕竟谁能拒绝带薪拉屎呢(手动狗头) 这是一个云开发职场打工人专属上班摸鱼划水微信小程序源码,没有后台 直接导入微信开发者工具即可运行,UI简约大气漂亮,只需登录微信公众平台配置完合法域名即可轻松上线。 用户进入摸鱼小程序,可以自由设置薪资,上班时间、下班时间、发薪日、 月工作天数以提醒自己摸鱼,全民打酱油,让自己成为摸鱼冠军,《商鞅摸鱼哲学》 摸鱼不是自我放纵,而是个人实力的积蓄,我们的小目标是晚睡晚起 小程序中的今日待办会提醒用户带薪拉屎和闲逛,下方展示的是距离休息日的天数,距离下一次发工资的天数和节日的天数。

    【毕业设计】基于Java的开发的一个集合校园二手交易、拼车、失物招领等功能的app_pgj.zip

    【毕业设计】基于Java的开发的一个集合校园二手交易、拼车、失物招领等功能的app_pgj

    PICkit3离线烧录流程

    个人记录:PICkit3离线烧录流程 使用软件:MPLAB X IDE v5.30 记录时间:20250215

    基于Matlab代码的电力系统状态估计与实验仿真研究:扩展卡尔曼滤波和无迹卡尔曼滤波在电力系统动态状态估计中的应用及效果分析,Matlab仿真实验研究:基于扩展卡尔曼滤波器与无迹卡尔曼滤波器对电力系统

    基于Matlab代码的电力系统状态估计与实验仿真研究:扩展卡尔曼滤波和无迹卡尔曼滤波在电力系统动态状态估计中的应用及效果分析,Matlab仿真实验研究:基于扩展卡尔曼滤波器与无迹卡尔曼滤波器对电力系统状态估计的影响及验证,状态估计 电力系统状态估计 Matlab代码 实验仿真研究 电力系统由于测量值和传输误差,还有测量噪声的影响,会对状态估计产生影响。 因此,需要对嘈杂的测量进行滤波,以获得准确的电力系统运行动态。 本文使用扩展卡尔曼滤波器(EKF)和无迹卡尔曼滤波器(UKF)来估计电力系统的动态状态。 扩展卡尔曼滤波EKF、无迹卡尔曼滤波UKF 利用扩展的无迹卡尔曼滤波器估计了动力系统的动态状态。 对WECC 3机9总线系统和新英格兰10机39总线系统进行了案例研究。 结果表明EKF和UKF都能准确地估计电力系统的动态状态。 ,核心关键词:状态估计; 电力系统状态估计; Matlab代码; 实验仿真; 测量值误差; 测量噪声; 扩展卡尔曼滤波器(EKF); 无迹卡尔曼滤波器(UKF); 动力系统; 动态状态估计; WECC 3机9总线系统; 新英格兰10机39总线系统。,Matlab

    springboot在线考试--.zip

    springboot在线考试--

    台达DVP EH3与MS300 PLC&变频器通讯程序的全面解决方案,台达DVP EH3与MS300通讯程序:稳定可靠的频率控制与启停管理系统,台达DVP EH3与台达MS300通讯程序(TDEH-9

    台达DVP EH3与MS300 PLC&变频器通讯程序的全面解决方案,台达DVP EH3与MS300通讯程序:稳定可靠的频率控制与启停管理系统,台达DVP EH3与台达MS300通讯程序(TDEH-9) 可直接用于实际的程序,程序带注释,并附送触摸屏程序,有接线方式和设置,通讯地址说明等。 程序采用轮询,可靠稳定 器件:台达DVP EH3系列PLC,台达MS300系列变频器,昆仑通态7022Ni 功能:实现频率设定,启停控制,实际频率读取,加减速时间设定。 资料:带注释程序,触摸屏程序,接线和设置说明,后续有技术咨询。 ,核心关键词:台达DVP EH3; 台达MS300; 通讯程序(TDEH-9); 轮询; 稳定; 频率设定; 启停控制; 实际频率读取; 加减速时间设定; 触摸屏程序; 接线方式; 设置说明; 技术咨询。,台达PLC与变频器通讯程序(带注释、触摸屏控制)

    【python毕设】p100基于Pytorch+springboot+vue的声纹识别系统.zip

    项目资源包含:可运行源码+sql文件 适用人群:学习不同技术领域的小白或进阶学习者;可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。项目具有较高的学习借鉴价值,也可拿来修改、二次开发。 个人账户管理:支持用户注册、登录与个人信息编辑;提供密码找回及账号安全保护措施。 声纹采集:利用麦克风设备录制用户的声纹样本;支持多种录音格式和质量调整,确保采集到清晰、准确的声纹数据。 声纹模板库管理:建立和维护一个安全的声纹模板库;支持声纹模板的添加、删除、更新和查询操作。 声纹比对与识别:运用深度学习算法对输入的声纹数据进行特征提取和匹配;实现快速、准确的声纹身份验证。 多场景应用支持:适用于多种场景,如门禁系统、移动支付、远程登录等;可根据实际需求定制开发相应的应用场景。 实时监控与报警:实时监控系统运行状态,包括声纹识别成功率、处理速度等指标;当出现异常情况时,及时发出报警信息。 数据分析与报告生成:收集并分析声纹识别过程中的数据,如识别准确率、处理时间等;根据用户需求输出包含详细图表说明的专业级文档供下载打印保存。 社区互动交流:设立论坛版块鼓励用户分享心得体会讨论热点话题;定期邀请行业专家举办线上讲座传授实用技巧知识。 音乐筛选与推荐:集成音乐平台API,根据用户的浏览习惯和情绪状态推荐背景音乐,增强用户体验。 数据可视化:提供交互式的数据可视化面板,使非技术用户也能轻松理解复杂的数据集,从而做出更明智的决策。

    三相与多相开绕组永磁同步电机仿真模型的先进控制策略探讨与实现,三相与多相开绕组永磁同步电机的Simulink仿真模型与先进控制策略研究,开绕组电机,开绕组永磁同步电机仿真模型、simulink仿真 共

    三相与多相开绕组永磁同步电机仿真模型的先进控制策略探讨与实现,三相与多相开绕组永磁同步电机的Simulink仿真模型与先进控制策略研究,开绕组电机,开绕组永磁同步电机仿真模型、simulink仿真 共直流母线、独立直流母线,两相容错,三相容错控制,零序电流抑制,控制策略很多 三相开绕组永磁同步电机,六相开绕组永磁同步电机 五相开绕组永磁同步电机,五相开绕组电机 ,开绕组电机; 永磁同步电机仿真模型; simulink仿真; 共直流母线; 独立直流母线; 两相容错; 三相容错控制; 零序电流抑制; 控制策略; 六相开绕组永磁同步电机; 五相开绕组永磁同步电机,开绕组电机仿真研究:共直流母线与独立直流母线的容错控制策略

    【毕业设计】基于Java的开发的网上汽车租赁管理系统_pgj.zip

    【毕业设计】基于Java的开发的网上汽车租赁管理系统_pgj

    python打开csv文件

    csv 模块是 Python 的标准库,无需额外安装。 运行结果如下图: ['姓名', '年龄', '城市'] ['张三', '25', '北京'] ['李四', '30', '上海'] ['王五', '22', '广州']

    【毕业设计】基于Java+Springboot+Vue的宠物领养系统_pgj.zip

    【毕业设计】基于Java+Springboot+Vue的宠物领养系统_pgj

Global site tag (gtag.js) - Google Analytics