之前两篇文章,简单的介绍了使用Spark通过CLI来进行读写Cassandra数据。在这一篇中,将介绍使用新的CQL来进行读取写入数据。
第一步,还是一样的去配置SparkContext,唯一的区别是使用的InputFormat不同。在Cli中使用的是ColumnFamilyInputFormat,而在这里将使用的是CqlPagingInputFormat。除了这两个类之外,还有CqlRagingRecordReader。所有的这些类都可以在apache-cassandra-<version>.jar中的org.apache.cassandra.haddop中找到。还有就是除了跟CLI一样有ConfigHelper可以使用,CQL还有自己的CqlConfigHelper。
具体如下:
val sc = new SparkContext("local[4]", "whitebox_test") val job = new Job() job.setInputFormatClass(classOf[CqlPagingInputFormat]) ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost") ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160") ConfigHelper.setInputColumnFamily(job.getConfiguration(), "whitebox_test", "words") ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner") CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),"paragraph=english")
上面这段代码跟之前的不同之处在于多了最后一行的CqlConfigHelper,CQL其实形式上面有点类似于SQL的,所以最后添加的一句就类似于SQL中的"where paragraph=english"。
然后我们需要获取从Cassandra中读入数据的RDD,这步也和之前说道的一样,只是outputFormat不同而已,如下:
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[Map[String, ByteBuffer]],//key classOf[Map[String, ByteBuffer]])//value
相关的信息可以在这里找到。这样这里casRdd的格式应该是RDD[(Map[String, ByteBuffer],Map[String, ByteBuffer])]。第一个map是key的对应关系,这里的key包括了partition key和cluster columns。Cassandra和关系型数据库一样在使用CQL的时候可以指定一个组为key。在关系型数据库中,可以指定如"CONSTRAINT pk_personid primary key (id, lastname)"。Cassandra中也可以做出类似的指定如"primary key(a, b, c, d)",这其中a就是partition key,bcd就是cluster columns。第二个map是非key部分的对应关系。然后就可以使用这个RDD了,如下
val paraRdd = casRdd flatMap { case (key, value) => { value.filter(v => { (v._1).compareTo("content") == 0 }).map(v => ByteBufferUtil.string(v._2)) } }.map(content => (content, 1)).reduceByKey(_+_)
以上的代码完成了最基本的字数统计的功能,和之前一样,就是计算了每个单词出现的次数。
接着如果需要写入数据到Cassandra,还得需要设定Output的config,跟之前的类似唯一不同的是需要指定输入语句。具体如下:
job.setOutputFormatClass(classOf[CqlOutputFormat]) ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160") ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost") ConfigHelper.setOutputColumnFamily(job.getConfiguration, KEYSPACE, "stats") ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner") val ps = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET num = ?, largerfive=?, content = ?, ip=?" CqlConfigHelper.setOutputCql(job.getConfiguration(), ps)
这里比较有意思的是,无法使用insert语句,只能使用update语句,而且也不需要指定key之类的信息,key的信息是在RDD中进行指定的。这里的?其实就是preparedStatement中的?一样的,只是占位符。在后面的RDD中才去指定值。如下
counts.map{ case (word, count) => { val partitionMap = new util.LinkedHashMap[String, ByteBuffer]{} if(word!="") { partitionMap.put("word", ByteBufferUtil.bytes(word)) } else { partitionMap.put("word", ByteBufferUtil.bytes("empty")) } val columnList = new ArrayList[ByteBuffer] columnList.add(ByteBufferUtil.bytes(count)) if(count>5){ columnList.add(ByteBuffer.wrap(TRUE)) } else { columnList.add(ByteBuffer.wrap(FALSE)) } columnList.add(ByteBufferUtil.bytes("Statistics for "+word)) val address = InetAddress.getByAddress(ip); columnList.add(ByteBufferUtil.bytes(address)) (partitionMap, columnList) } }.saveAsNewAPIHadoopFile(KEYSPACE,classOf[Map[String, ByteBuffer]], classOf[List[ByteBuffer]],classOf[CqlOutputFormat], job.getConfiguration()
这里也是需要注意的,partitionMap就是key的对应,这个还是比较好理解的。但是非key部分这里使用的是List[ByteBuffer]。这样子的话,这个list中的顺序就必须和之前那个update语句中声明的顺序一致了。否则将会抛出错误。还有值得说的就是,ByteBufferUtil里面有个function是给IP地址使用的,但是我们需要先将ip String转换成InetAddress的形式才能够使用。
到了这里,就基本到spark和cassandra通讯的部分基本说完了。还有一些复杂的设定,可以根据自己需求来设置。
周六晚上,enjoy
相关推荐
【作品名称】:基于MATLAB的肤色的人数统计系统,以地铁车厢为实际背景,通过预测的方式,结合肤色统计人脸得到车厢人数,从而估计拥挤度 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】:该课题为基于MATLAB的肤色的人数统计系统,以地铁车厢为实际背景,通过预测的方式,结合肤色统计人脸得到车厢人数,从而估计拥挤度,将结果反馈给车站里的视频,让乘客得知每节车厢的拥挤程度,从而合理分配车厢乘客,避免资源浪费和拥挤,本设计带有一个GUI交互界面。是一个人数统计类课题,该课题可以应用于教室人数统计,十字路口行人统计等等方面。 【资源声明】:本资源作为“参考资料”而不是“定制需求”,代码只能作为参考,不能完全复制照搬。需要有一定的基础能够看懂代码,能够自行调试代码并解决报错,能够自行添加功能修改代码。
亲爱的工程师和技术爱好者们, 您是否想学习一种强大的图形化编程语言,以便在自动化、数据采集、测试测量等领域大展身手?LabVIEW 是您的最佳选择!我们为您精心准备了一份全面的 LabVIEW 入门教程,帮助您从零开始,逐步掌握这门强大的工具。 为什么选择 LabVIEW? 直观易学:图形化编程界面,无需编写复杂的代码。 广泛应用:适用于工业自动化、数据采集、仪器控制、图像处理等多个领域。 强大功能:内置丰富的函数库和工具包,满足各种复杂需求。 社区支持:庞大的用户社区和丰富的在线资源,助您解决各种问题。 本教程适合谁? 初学者:从未接触过 LabVIEW 的新手。 工程师:希望提升技能,提高工作效率的专业人士。 学生:对自动化和测试测量感兴趣的在校学生。 研究人员:需要进行数据采集和分析的研究人员。 课程内容 LabVIEW 基础:安装与配置、界面介绍、基本概念。 图形化编程:创建 VI(虚拟仪器)、使用控件和指示器。 数据流编程:理解数据流的概念,编写简单的程序。 常用函数库:了解并使用常用
那些年,与你同分同位次的同学都去了哪里?全国各大学在辽宁2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
离网下三相不平衡负载,基于下垂控制的T型三电平逆变器,采用正负序分离四环控制,正序电压电流双闭环,负序电压电流双闭环,中点电位平衡控制,采用SPWM调制。 1.提供下垂控制原理,参数计算方法以及相关文献 2.电压电流双闭环控制 3.正负序分离控制以及相关资料 支持simulink2022以下版本(默认发2016b)。
微信小程序-新闻阅读器_ echatapp新闻阅读器
该项目是一个基于Python和HTML的TDXPystock股票交易自动化设计源码,总共有72个文件组成,包括47个Python源文件、6个XML配置文件、6个文本文件、3个用户界面文件、1个Git忽略文件、1个IDE配置文件、1个Jupyter Notebook文件、1个Markdown文件、1个SQL文件和1个图片文件。该项目旨在通过编程手段实现股票交易自动化,适用于对股票市场感兴趣的开发者和投资者。
c
微信OpenDevTool-微信小程序强制开发者工具打开-WiChatOpenDevTools Python
内容概要:本文提供了C++实现的一个基本汽车自动驾驶巡航系统。这个例子涵盖了系统的基础元素:模拟汽车自动巡航的定义、设定巡航的速度等功能,并在一个无限的回圈下演示了时间和速度的流动。 适合人群:适合熟悉 C++ 开发人员,尤其对车辆自动化感兴趣的研究者们。 使用场景及目标:旨在帮助开发者快速建立关于自动驾驶巡航系统的原型,加深对其基本特性的理解和认识。
ClioSoft SOS 8.1.1 2023最新版手册
那些年,与你同分同位次的同学都去了哪里?全国各大学在辽宁2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
那些年,与你同分同位次的同学都去了哪里?全国各大学在辽宁2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
STM32硬件液晶资料OLED资料STM32硬件液晶资料OLED资料
C# 矩阵运算类库 矩阵运算,求逆 。 欧拉角转类库 。 24种欧拉角、四元数互相转 数学运算100%正确无措
本文提供了一份采用C语言实现智能小车自动化移动的基本实例源码。通过对左旋/右旋、正向推进、逆向后移以及停止等一系列指令的具体阐述及注释指引,详述各命令的工作机理与实施程序。该文档为智能设备的研发与实践,特别是围绕小型自主驱动载具的设计与构建提供了有益启示,还介绍了怎样配合特定硬件来设置相应的端口标识符并引入传感器增强自动寻径效果,如防碰撞能力。 适用人群:掌握一定的嵌入式系统的编程能力的研究开发者或学生;适用于对Arduino平台有初步操作经验且有兴趣进一步探索其机器人领域的爱好者。 使用场景及目标:对于想要从基础开始动手做智能巡线车项目的电子爱好人士来说,这是一个很好的起点,旨在让小车模拟简单的自动驾驶行为(例如绕固定路径行驶),并为加入更多的智能化组件留有足够的扩展空间。 其它注意事项:注意,代码里运用到wiringPi库,请先行验证它已经顺利装妥和匹配系统。同时也要记得依照自己的物理接驳图更改代码里面的电机对应关系。
微信小程序反编译脚本备份WaxAppUnpacker
那些年,与你同分同位次的同学都去了哪里?全国各大学在辽宁2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
WelCropper微信小程序截图工具_ WelCropper
微信小程序反编译工具w_xappUnpacker
STM32软件学习资料USBSTM32软件学习资料USB