`
datamachine
  • 浏览: 164492 次
社区版块
存档分类
最新评论

Hadoop如何实现关联计算

阅读更多
    选择Hadoop,低成本和高扩展性是主要原因,但但它的开发效率实在无法让人满意。
    以关联计算为例。
    假设:HDFS上有2个文件,分别是客户信息和订单信息,customerID是它们之间的关联字段。如何进行关联计算,以便将客户名称添加到订单列表中?
    一般方法是:输入2个源文件。根据文件名在Map中处理每条数据,如果是Order,则在foreign key上加标记”O”,形成combined key;如果是Customer则做标记”C”。Map之后的数据按照key分区,再按照combined key分组排序。最后在reduce中合并结果再输出。
实现代码:
public static class JMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    //mark every row with "O" or "C" according to file name
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
    if (pathName.contains("order.txt")) {//identify order by file name
            String values[] = value.toString().split("\t");
            TextPair tp = new TextPair(new Text(values[1]), new Text("O"));//mark with "O"
            context.write(tp, new Text(values[0] + "\t" + values[2]));
        }
   if (pathName.contains("customer.txt")) {//identify customer by file name
           String values[] = value.toString().split("\t");
           TextPair tp = new TextPair(new Text(values[0]), new Text("C"));//mark with "C"
           context.write(tp, new Text(values[1]));
        }
    }
}
public static class JPartitioner extends Partitioner<TextPair, Text> {
    //partition by key, i.e. customerID
    @Override
    public int getPartition(TextPair key, Text value, int numParititon) {
        return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
    }
}
public static class JComparator extends WritableComparator {
    //group by muti-key
    public JComparator() {
        super(TextPair.class, true);
    }
    @SuppressWarnings("unchecked")
    public int compare(WritableComparable a, WritableComparable b) {
        TextPair t1 = (TextPair) a;
        TextPair t2 = (TextPair) b;
        return t1.getFirst().compareTo(t2.getFirst());
    }
}
public static class JReduce extends Reducer<TextPair, Text, Text, Text> {
    //merge and output
    protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
    Text pid = key.getFirst();
    String desc = values.iterator().next().toString();
    while (values.iterator().hasNext()) {
        context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
   }
    }
}
public class TextPair implements WritableComparable<TextPair> {
    //make muti-key
    private Text first;
    private Text second;
    public TextPair() {
        set(new Text(), new Text());
    }
    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }
    public TextPair(Text first, Text second) {
        set(first, second);
    }
    public void set(Text first, Text second) {
  this.first = first;
  this.second = second;
    }
    public Text getFirst() {
  return first;
    }
    public Text getSecond() {
  return second;
    }
    public void write(DataOutput out) throws IOException {
  first.write(out);
  second.write(out);
    }
    public void readFields(DataInput in) throws IOException {
  first.readFields(in);
  second.readFields(in);
    }
    public int compareTo(TextPair tp) {
  int cmp = first.compareTo(tp.first);
  if (cmp != 0) {
       return cmp;
  }
    return second.compareTo(tp.second);
    }
}
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
    //job entrance
    Configuration conf = new Configuration();
    GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
    String[] otherArgs = parser.getRemainingArgs();
    if (agrs.length < 3) {
   System.err.println("Usage: J <in_path_one> <in_path_two> <output>");
   System.exit(2);
    }
    Job job = new Job(conf, "J");
    job.setJarByClass(J.class);//Join class
    job.setMapperClass(JMapper.class);//Map class
    job.setMapOutputKeyClass(TextPair.class);//Map output key class
    job.setMapOutputValueClass(Text.class);//Map output value class
    job.setPartitionerClass(JPartitioner.class);//partition class
    job.setGroupingComparatorClass(JComparator.class);//condition group class after partition
    job.setReducerClass(Example_Join_01_Reduce.class);//reduce class
    job.setOutputKeyClass(Text.class);//reduce output key class
    job.setOutputValueClass(Text.class);//reduce ouput value class
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//one of source files
    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//another file
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//output path
    System.exit(job.waitForCompletion(true) ? 0 : 1);//run untill job ends
}

    不能直接使用原始数据,而是要搞一堆代码处理标记,并绕过MapReduce原本的架构,最后从底层设计并计算数据之间的关联关系。这还是最简单的关联计算,如果用MapReduce进行多表关联或逻辑更复杂的关联计算,复杂度会呈几何级数递增。
0
0
分享到:
评论

相关推荐

    基于hadoop实现的关联规则挖掘的图书数据分析推荐系统.zip

    《基于Hadoop实现的关联规则挖掘在图书数据分析推荐系统中的应用》 图书数据分析推荐系统是当前数字图书馆和在线书店中广泛应用的技术,它旨在通过分析用户的阅读习惯和偏好,为用户提供个性化的图书推荐。本项目...

    java使用hadoop实现关联商品统计

    在本文中,我们将探讨如何使用Java和Hadoop框架来实现关联商品的统计。关联商品统计是数据挖掘领域的一个常见任务,它用于发现哪些商品经常一起被购买,从而帮助商家制定促销策略或推荐系统。 首先,我们要理解...

    基于HADOOP的倒排索引实现

    总的来说,基于Hadoop的倒排索引实现是一个结合了分布式计算和高效数据结构的优秀实践,它展示了如何利用MapReduce模型解决大数据场景下的文本检索问题。通过理解这一过程,开发者可以更好地运用Hadoop来处理复杂的...

    基于hadoop实现的图书推荐系统。java web apriori.zip

    【标题】中的“基于hadoop实现的图书推荐系统”指的是使用Hadoop这一开源大数据处理框架来构建一个能够为用户推荐图书的系统。Hadoop是Apache软件基金会开发的分布式存储和计算平台,它允许高效地处理海量数据,尤其...

    词频统计(基于hadoop集群,python实现)

    本教程将深入探讨如何利用Hadoop集群和Python实现词频统计,这是一个经典的WordCount示例,适合初学者入门。在这个项目中,我们将通过Python编写MapReduce程序,并在Ubuntu系统上配置的Hadoop集群上运行它。 首先,...

    基于hadoop的apriori算法设计于实现

    ### 基于Hadoop的Apriori算法设计与实现 #### 一、背景与问题概述 随着信息技术的发展,海量数据的处理与分析变得日益重要。传统的数据挖掘算法和技术已经难以满足当前的需求,尤其是在处理多维度、含有大量噪声的...

    JAVA使用Apache Hadoop实现大规模数据处理.txt

    ### 使用Java与Apache Hadoop实现大规模数据处理 #### 一、引言 在大数据时代,高效处理海量数据已成为企业和组织的重要需求之一。Apache Hadoop是一个能够处理大量数据的开源软件框架,它支持分布式数据存储和...

    基于Hadoop的电子商务推荐系统的设计与实现_李文海.pdf

    该系统利用Hadoop的分布式计算框架,构建了一个能够处理海量数据的推荐系统。Hadoop是Apache开源项目,主要由HDFS(分布式文件系统)和MapReduce编程模型组成,适合大规模数据集的并行处理。 在描述中,提到了...

    基于hadoop实现的图书推荐系统。java web apriori .zip

    【标题】基于Hadoop实现的图书推荐系统是一个利用大数据处理技术进行个性化推荐的项目,它结合了Java Web和Apriori算法。Apriori是一种经典的关联规则挖掘算法,常用于发现用户购买行为中的隐藏模式,进而实现商品...

    hadoop软件1,和hive_3,sqoop_2搭配使用

    这种组合应用可以充分利用Hadoop的分布式计算能力,同时借助Hive简化数据分析工作,以及Sqoop实现数据的灵活迁移,为企业的大数据处理提供了一套完整的解决方案。 总结来说,Hadoop作为基础平台,提供分布式存储和...

    hadoop.dll winutils.exe

    在大数据处理领域,Hadoop是一个不可或缺的开源框架,主要用于分布式存储和计算。本文将深入探讨与"Hadoop.dll"和"winutils.exe"相关的知识点,并解释它们在Hadoop生态系统中的作用。 1. Hadoop.dll:这是一个动态...

    基于hadoop的推荐系统简单实现

    在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式计算的能力,使得处理海量数据变得可能。本项目“基于Hadoop的推荐系统简单实现”旨在利用Hadoop的强大功能来构建一个基本的推荐系统,该系统能...

    基于Hadoop的ETL处理Shell架构

    在这个架构中,Hadoop作为底层分布式计算平台,提供强大的存储和计算能力,而Shell则作为上层的工具,用于编写脚本实现ETL过程。 在大数据时代,海量的数据需要被快速处理和分析,Hadoop因其分布式、容错性和可扩展...

    基于hadoop商品推荐系统课程设计.zip

    在这个项目中,我们将利用Hadoop的分布式计算能力,结合大数据分析,为用户提供个性化的商品推荐。 一、Hadoop基础 1. Hadoop架构:Hadoop由HDFS(Hadoop Distributed File System)和MapReduce两大部分组成。HDFS...

    基于Hadoop&Spark的关联规则实践.zip

    通过以上步骤,我们可以利用Hadoop的大规模存储和Spark的高速计算能力,有效地进行关联规则挖掘,发现数据中的隐藏模式,为企业决策提供有力支持。这不仅在电商推荐系统、市场篮子分析等领域有广泛应用,而且在其他...

Global site tag (gtag.js) - Google Analytics