看到这个标题,大家一定会问了。这个整合如何定义?
我个人认为,所谓的整合是指:我们可以编写MapReduce程序,从HDFS中读取数据然后插入到Cassandra中。也可以是直接从Cassandra中读取数据,然后进行相应的计算。
从HDFS中读取数据然后插入到Cassandra中
对于这种类型,我们可以按照以下几个步骤来操作。
1 将需要插入Cassandra的数据上传到HDFS中。
2 启动MapReduce程序。
这种类型的整合其实和Cassandra本身没有什么联系。我们只是运行普通的MapReduce程序,然后在Map或者Reduce端将计算好的数据插入到Cassandra中。仅此而已。
直接从Cassandra中读取数据,然后进行相应的计算
这个功能是在Cassandra0.6.x版本中添加上去的。其可以从Cassandra直接读取MapReduce需要的数据,实现对于Cassandra的全表扫描的功能。
操作步骤如下:
1 在MapReduce程序中指定使用的KeySpace,ColumnFamily,和SlicePredicate等和Cassandra相关的参数。(关于这些概念,可以参考《大话Cassandra数据模型》和《谈谈Cassandra的客户端》)
2 启动MapReduce程序。
这种类型的整合和从HDFS读取数据的整合相比,还是有许多不同的,主要有下面几点区别:
1 输入数据来源不同:前一种是从HDFS中读取输入数据,后一种是从Cassandra中直接读取数据。
2 Hadoop的版本不同:前一种可以使用任何版本的Hadoop,后一种只能使用Hadoop0.20.x
整合Hadoop0.19.x与Cassandra0.6.x
在Cassandra0.6.x中,默认实现的是与Hadoop0.20.x的整合,我们无法直接在Hadoop0.19.x中使用。
所以,要实现这个目标,我们第一步需要做的事情是,修改Cassandra的源代码,提供一个可以在Hadoop0.19.x中使用的功能。
想要进行这项测试,我们可以按照如下步骤来进行:
1 下载修改后的代码。
2 在MapReduce中指定如下内容(注意,这里的class使用的package都是com.alibaba.dw.cassandra.hadoop下面的):
ConfigHelper.setColumnFamily(conf, Keyspace, MemberCF,
<!--CRLF-->
"/home/admin/apache-cassandra-0.6.1/conf");
<!--CRLF-->
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList("CITY"
<!--CRLF-->
.getBytes(UTF8), "EMPLOYEES_COUNT".getBytes(UTF8)));
<!--CRLF-->
ConfigHelper.setSlicePredicate(conf, predicate);
<!--CRLF-->
ConfigHelper.setRangeBatchSize(conf, 512);
<!--CRLF-->
ConfigHelper.setSuperColumn(conf, "MemberInfo");
<!--CRLF-->
3 确保每一台运行MapReduce的机器的指定目录与MapReduce程序中设定的storage-conf.xml文件路径一致。
4 运行MapReduce程序。
存在的问题与改进
在实际的使用中,我们会发现Map端会出现这样的错误信息:
java.lang.RuntimeException: TimedOutException()
<!--CRLF-->
at com.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.maybeInit(ColumnFamilyRecordReader.java:125)
<!--CRLF-->
at com.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.computeNext(ColumnFamilyRecordReader.java:164)
<!--CRLF-->
at com.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.computeNext(ColumnFamilyRecordReader.java:1)
<!--CRLF-->
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:135)
<!--CRLF-->
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:130)
<!--CRLF-->
at com.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader.next(ColumnFamilyRecordReader.java:224)
<!--CRLF-->
at com.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader.next(ColumnFamilyRecordReader.java:1)
<!--CRLF-->
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192)
<!--CRLF-->
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176)
<!--CRLF-->
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
<!--CRLF-->
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
<!--CRLF-->
at org.apache.hadoop.mapred.Child.main(Child.java:158)
<!--CRLF-->
Caused by: TimedOutException()
<!--CRLF-->
at org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:11015)
<!--CRLF-->
at org.apache.cassandra.thrift.Cassandra$Client.recv_get_range_slices(Cassandra.java:623)
<!--CRLF-->
at org.apache.cassandra.thrift.Cassandra$Client.get_range_slices(Cassandra.java:597)
<!--CRLF-->
at com.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.maybeInit(ColumnFamilyRecordReader.java:108)
<!--CRLF-->
... 11 more
<!--CRLF-->
引起这样的问题的原因就在于使用Thrift API从Cassandra读取数据失败了。
所以我们可以优化这段代码,提供想要的错误处理功能来提供程序的可用性。
另一个做法是修改Cassandra的配置,将RPCTimeout的时间调长。
更多关于Cassandra的文章:http://www.cnblogs.com/gpcuster/tag/Cassandra/
分享到:
相关推荐
Hadoop,另一方面,是Apache开源项目,主要由HDFS(Hadoop Distributed File System)和MapReduce两部分组成,为海量数据的存储和处理提供了基础架构。Hadoop2引入了YARN(Yet Another Resource Negotiator),作为...
在这个系统中,"Hadoop MapReduce WordCount 操作数据库"是指使用 MapReduce 模型对视频相关的文本数据(如字幕、描述、评论等)进行统计分析,同时将结果存储到数据库中,以便后续的数据查询和可视化。 MapReduce ...
Hadoop生态系统中的工具远不止HDFS和MapReduce,还包括了Hive、Pig、HBase、Spark、Storm、Kafka、Flume、Oozie、Zookeeper、Mahout、Flink、Cassandra、Solr、Nifi、Sqoop等。这些工具在不同的场景下发挥着重要作用...
### Spark与Hadoop MapReduce的比较 虽然Spark和Hadoop MapReduce都可以处理大规模数据,但它们之间存在一些显著的区别: 1. **处理方式**:Hadoop MapReduce采用磁盘I/O的方式处理数据,而Spark更倾向于内存计算...
7. **与其他大数据系统的集成**:了解如何与Hadoop、Hive、Cassandra等其他大数据工具集成,以实现更全面的数据处理流程。 8. **性能调优**:讨论如何监控和调优Spark集群,包括资源分配、任务调度和错误排查。 ...
7. **MapReduce与其他技术的结合**:如HDFS(Hadoop Distributed File System)的数据存储,HBase、Cassandra等NoSQL数据库的集成,以及Spark、Flink等新一代大数据处理框架与MapReduce的关系。 8. **案例分析**:...
这个合集涵盖了四个关键的技术:Hadoop、Cassandra、HBase和NoSQL,这些都是构建大规模分布式数据存储和处理系统的基础。 **Hadoop** 是一个开源的框架,主要用于处理和存储大量数据。《Hadoop权威指南》是了解...
但Spark并不局限于Hadoop生态系统,它可以与多种数据源集成,如Amazon S3、Cassandra、HBase等。在没有包含Hadoop的版本中,Spark需要用户自行配置HDFS客户端或者其他分布式文件系统以进行数据读写。 安装Spark-...
此外,Cassandra还支持与Hadoop的集成,可以通过Hadoop MapReduce对数据进行批量处理和分析。 性能基准测试显示,Cassandra在大规模集群中表现出色,能够处理大量数据和高并发请求。配置选项丰富,允许用户根据硬件...
Spark 2.0.0与Hadoop 2.7的集成意味着它可以充分利用Hadoop的存储和计算资源,同时通过Spark的优化执行引擎,提供比原生Hadoop MapReduce更快的计算速度。 在Spark 2.0.0中,SQL和DataFrame得到了显著增强。Spark ...
在实际应用中,可以采用如HBase、Cassandra等NoSQL数据库与Hadoop结合,实现数据的实时查询和分析。同时,通过使用Hadoop的Elastic MapReduce(EMR)或其他云服务,可以轻松地在云端实现计算资源的弹性扩展。 7. *...
9. **Chap 5 - 分布式存储与文件系统**:除了HDFS,还可能讨论其他的分布式存储系统,如HBase、Cassandra,以及它们在特定场景下的适用性和优缺点。 通过这些章节的源代码,读者不仅可以了解Hadoop的基本工作原理,...
由于其内存计算的特点,相比Hadoop MapReduce,Spark在迭代计算和交互式查询上有显著优势。此外,Spark还支持多种数据源,如HDFS、Amazon S3、Cassandra等,具有良好的生态系统兼容性。 总结来说,Spark 2.4.0-bin-...
5. 高级Hadoop主题:讨论了Hadoop的容错机制、性能优化、YARN(Yet Another Resource Negotiator)资源管理器,以及Hadoop与其他大数据技术(如HBase、Cassandra、Spark)的集成。 6. 实战案例分析:书中包含了一些...
9. **Chap 9:Hadoop数据存储与管理** - 这一章可能涉及到Hadoop与其他数据存储系统如HBase、Cassandra的集成,以及Hadoop的数据备份和恢复策略。 10. **Chap 10:Hadoop项目实战** - 最后一章通常会提供一个完整的...
Hadoop是一个生态圈,而不是单一产品,它包括Hive、Hbase、Zookeeper、Cassandra、Solr等众多组件。 **Hadoop的发展历史** Hadoop的发展与Lucene框架有着密切的联系,后者由Doug Cutting创建,最初是Apache基金会...
此外,书中还涉及了Hadoop生态系统的一些关键组件,如Pig(数据分析工具)、Hive(数据仓库工具)、HBase(分布式数据库)和Cassandra(分布式NoSQL数据库)等,这些都是Hadoop应用中的重要辅助工具。 到了第四版,...
Hadoop还可以与NoSQL数据库集成,如Cassandra和MongoDB,以实现更灵活的数据存储和检索。 5. 实战案例 “Hadoop高级应用实战四”可能涵盖了使用Hadoop进行特定领域的数据挖掘、大数据分析或预测模型构建等实际操作...
Cassandra可能会利用Hadoop进行大规模数据的处理和分析,比如通过Hadoop MapReduce进行批量加载或者数据迁移。 2. **ANTLR3** (antlr3-master.zip): ANTLR是一个强大的解析器生成器,用于读取、处理、执行或翻译...