`
sunwinner
  • 浏览: 202519 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

PageRank Algorithm in MapReduce

 
阅读更多

In chapter 5 of Data-Intensive Text Processing with MapReduce, it introduces how to implement PageRank algorithm in MapReduce way. Here I am not going to talk more about PageRank itself, please refer to wikipedia or other papers for further explaination. What I'm going to talk about is how to implement PageRank in MapReduce with Hadoop.

 

PageRank is recursively defined translates into a iterative algorithm. At the beginning of each iteration, a node passes it PageRank contributions to other nodes that it is connected to. Since PageRank is a probability distribution, we can think of this as spreading probability mass to neighbors via outgoing links. To conclude the iteration, each node subs up all PageRank contributions that have been passed to it and computes an updated PageRank score. We can think of this as gathering probability mass passed to a node via its incoming links. This algorithm iterates until PageRank values don't change anymore. You may want to check the pseudo-code for PageRank via the paper from Jimmy Lin and Michael Schatz, which also contains detailed explaination of the code we will shown later.

 

In the shuffle and sort phase, the MapReduce execution framework groups values (piece of PageRank mass) passed along the graph edges by destination node (i.e., all edges that point to the same node). In the reducer, PageRank mass contributions from all incoming edges are summed to arrive at the updated PageRank value for each node. In order to pass the graph structure itself from iteration to iteration, each node data structure is emitted in the mapper and written back out to disk in the reducer. All PageRank mass emitted by the mappers is accounted for in the reducer: since we begin with the sum of PageRank values across all nodes equal to one, the sum of all the updated PageRank values should remain a valid probability distribution.

 

If random jamp factor and dangling nodes are taken into account, we need to do a little more. Put everything into togather, one iteration of PageRank requires two MapReduce jobs: the first to distrubute PageRank mass along graph edges, and the second to take care of dangling nodes and the random jump factor. At the end of each iteration, we end up with exactly the same data structure as the beginning, which is a requirement for the iterative algorithm to work. Also, the PageRank values of all nodes sum up to one, which ensures a valid probability distribution.

 

Typically, PageRank is iterated until convergence, i.e., when the PageRank values of nodes no longer change (within some tolerance, to take into account, for example, floating point precision errors). Therefore at the end of each iteration, the PageRank driver program must check to see if convergence has been reached. This is the exactly approach we will take in below code. Alternative stopping criteria include running a fixed number of iterations (useful is one wishes to bound algorithm running time) or stopping when the ranks of PageRank valus no longer change. The latter is useful for some application that only care about comparing the PageRank of two arbitrary pages and do not need the actual PageRank values.

 

Now let's see the code from Cloud9, which is distrubuted under Apache License 2.0. It is implemented for study purpose by Jimmy Lin and Michael Schatz. For production quality implementation of PageRank algorithm in MapReduce, you may consider the implementation in Apache Mahout, which was added into Mahout by this ticket.

 

/*
 * Cloud9: A Hadoop toolkit for working with big data
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you
 * may not use this file except in compliance with the License. You may
 * obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 * implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package edu.umd.cloud9.example.pagerank;

import com.google.common.base.Preconditions;
import edu.umd.cloud9.io.array.ArrayListOfIntsWritable;
import edu.umd.cloud9.mapreduce.lib.input.NonSplitableSequenceFileInputFormat;
import edu.umd.cloud9.util.map.HMapIF;
import edu.umd.cloud9.util.map.MapIF;
import org.apache.commons.cli.*;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Iterator;

/**
 * <p>
 * Main driver program for running the basic (non-Schimmy) implementation of
 * PageRank.
 * </p>
 * <p/>
 * <p>
 * The starting and ending iterations will correspond to paths
 * <code>/base/path/iterXXXX</code> and <code>/base/path/iterYYYY</code>. As a
 * example, if you specify 0 and 10 as the starting and ending iterations, the
 * driver program will start with the graph structure stored at
 * <code>/base/path/iter0000</code>; final results will be stored at
 * <code>/base/path/iter0010</code>.
 * </p>
 *
 * @author Jimmy Lin
 * @author Michael Schatz
 * @see RunPageRankSchimmy
 */
public class RunPageRankBasic extends Configured implements Tool {
    private static final Logger LOG = Logger.getLogger(RunPageRankBasic.class);

    private static enum PageRank {
        nodes, edges, massMessages, massMessagesSaved, massMessagesReceived, missingStructure
    }

    // Mapper, no in-mapper combining.
    private static class MapClass extends
            Mapper<IntWritable, PageRankNode, IntWritable, PageRankNode> {

        // The neighbor to which we're sending messages.
        private static final IntWritable neighbor = new IntWritable();

        // Contents of the messages: partial PageRank mass.
        private static final PageRankNode intermediateMass = new PageRankNode();

        // For passing along node structure.
        private static final PageRankNode intermediateStructure = new PageRankNode();

        @Override
        public void map(IntWritable nid, PageRankNode node, Context context)
                throws IOException, InterruptedException {
            // Pass along node structure.
            intermediateStructure.setNodeId(node.getNodeId());
            intermediateStructure.setType(PageRankNode.Type.Structure);
            intermediateStructure.setAdjacencyList(node.getAdjacenyList());

            context.write(nid, intermediateStructure);

            int massMessages = 0;

            // Distribute PageRank mass to neighbors (along outgoing edges).
            if (node.getAdjacenyList().size() > 0) {
                // Each neighbor gets an equal share of PageRank mass.
                ArrayListOfIntsWritable list = node.getAdjacenyList();
                float mass = node.getPageRank() - (float) StrictMath.log(list.size());

                context.getCounter(PageRank.edges).increment(list.size());

                // Iterate over neighbors.
                for (int i = 0; i < list.size(); i++) {
                    neighbor.set(list.get(i));
                    intermediateMass.setNodeId(list.get(i));
                    intermediateMass.setType(PageRankNode.Type.Mass);
                    intermediateMass.setPageRank(mass);

                    // Emit messages with PageRank mass to neighbors.
                    context.write(neighbor, intermediateMass);
                    massMessages++;
                }
            }

            // Bookkeeping.
            context.getCounter(PageRank.nodes).increment(1);
            context.getCounter(PageRank.massMessages).increment(massMessages);
        }
    }

    // Mapper with in-mapper combiner optimization.
    private static class MapWithInMapperCombiningClass extends
            Mapper<IntWritable, PageRankNode, IntWritable, PageRankNode> {
        // For buffering PageRank mass contributes keyed by destination node.
        private static final HMapIF map = new HMapIF();

        // For passing along node structure.
        private static final PageRankNode intermediateStructure = new PageRankNode();

        @Override
        public void map(IntWritable nid, PageRankNode node, Context context)
                throws IOException, InterruptedException {
            // Pass along node structure.
            intermediateStructure.setNodeId(node.getNodeId());
            intermediateStructure.setType(PageRankNode.Type.Structure);
            intermediateStructure.setAdjacencyList(node.getAdjacenyList());

            context.write(nid, intermediateStructure);

            int massMessages = 0;
            int massMessagesSaved = 0;

            // Distribute PageRank mass to neighbors (along outgoing edges).
            if (node.getAdjacenyList().size() > 0) {
                // Each neighbor gets an equal share of PageRank mass.
                ArrayListOfIntsWritable list = node.getAdjacenyList();
                float mass = node.getPageRank() - (float) StrictMath.log(list.size());

                context.getCounter(PageRank.edges).increment(list.size());

                // Iterate over neighbors.
                for (int i = 0; i < list.size(); i++) {
                    int neighbor = list.get(i);

                    if (map.containsKey(neighbor)) {
                        // Already message destined for that node; add PageRank mass contribution.
                        massMessagesSaved++;
                        map.put(neighbor, sumLogProbs(map.get(neighbor), mass));
                    } else {
                        // New destination node; add new entry in map.
                        massMessages++;
                        map.put(neighbor, mass);
                    }
                }
            }

            // Bookkeeping.
            context.getCounter(PageRank.nodes).increment(1);
            context.getCounter(PageRank.massMessages).increment(massMessages);
            context.getCounter(PageRank.massMessagesSaved).increment(massMessagesSaved);
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // Now emit the messages all at once.
            IntWritable k = new IntWritable();
            PageRankNode mass = new PageRankNode();

            for (MapIF.Entry e : map.entrySet()) {
                k.set(e.getKey());

                mass.setNodeId(e.getKey());
                mass.setType(PageRankNode.Type.Mass);
                mass.setPageRank(e.getValue());

                context.write(k, mass);
            }
        }
    }

    // Combiner: sums partial PageRank contributions and passes node structure along.
    private static class CombineClass extends
            Reducer<IntWritable, PageRankNode, IntWritable, PageRankNode> {
        private static final PageRankNode intermediateMass = new PageRankNode();

        @Override
        public void reduce(IntWritable nid, Iterable<PageRankNode> values, Context context)
                throws IOException, InterruptedException {
            int massMessages = 0;

            // Remember, PageRank mass is stored as a log prob.
            float mass = Float.NEGATIVE_INFINITY;
            for (PageRankNode n : values) {
                if (n.getType() == PageRankNode.Type.Structure) {
                    // Simply pass along node structure.
                    context.write(nid, n);
                } else {
                    // Accumulate PageRank mass contributions.
                    mass = sumLogProbs(mass, n.getPageRank());
                    massMessages++;
                }
            }

            // Emit aggregated results.
            if (massMessages > 0) {
                intermediateMass.setNodeId(nid.get());
                intermediateMass.setType(PageRankNode.Type.Mass);
                intermediateMass.setPageRank(mass);

                context.write(nid, intermediateMass);
            }
        }
    }

    // Reduce: sums incoming PageRank contributions, rewrite graph structure.
    private static class ReduceClass extends
            Reducer<IntWritable, PageRankNode, IntWritable, PageRankNode> {
        // For keeping track of PageRank mass encountered, so we can compute missing PageRank mass lost
        // through dangling nodes.
        private float totalMass = Float.NEGATIVE_INFINITY;

        @Override
        public void reduce(IntWritable nid, Iterable<PageRankNode> iterable, Context context)
                throws IOException, InterruptedException {
            Iterator<PageRankNode> values = iterable.iterator();

            // Create the node structure that we're going to assemble back together from shuffled pieces.
            PageRankNode node = new PageRankNode();

            node.setType(PageRankNode.Type.Complete);
            node.setNodeId(nid.get());

            int massMessagesReceived = 0;
            int structureReceived = 0;

            float mass = Float.NEGATIVE_INFINITY;
            while (values.hasNext()) {
                PageRankNode n = values.next();

                if (n.getType().equals(PageRankNode.Type.Structure)) {
                    // This is the structure; update accordingly.
                    ArrayListOfIntsWritable list = n.getAdjacenyList();
                    structureReceived++;

                    node.setAdjacencyList(list);
                } else {
                    // This is a message that contains PageRank mass; accumulate.
                    mass = sumLogProbs(mass, n.getPageRank());
                    massMessagesReceived++;
                }
            }

            // Update the final accumulated PageRank mass.
            node.setPageRank(mass);
            context.getCounter(PageRank.massMessagesReceived).increment(massMessagesReceived);

            // Error checking.
            if (structureReceived == 1) {
                // Everything checks out, emit final node structure with updated PageRank value.
                context.write(nid, node);

                // Keep track of total PageRank mass.
                totalMass = sumLogProbs(totalMass, mass);
            } else if (structureReceived == 0) {
                // We get into this situation if there exists an edge pointing to a node which has no
                // corresponding node structure (i.e., PageRank mass was passed to a non-existent node)...
                // log and count but move on.
                context.getCounter(PageRank.missingStructure).increment(1);
                LOG.warn("No structure received for nodeid: " + nid.get() + " mass: "
                        + massMessagesReceived);
                // It's important to note that we don't add the PageRank mass to total... if PageRank mass
                // was sent to a non-existent node, it should simply vanish.
            } else {
                // This shouldn't happen!
                throw new RuntimeException("Multiple structure received for nodeid: " + nid.get()
                        + " mass: " + massMessagesReceived + " struct: " + structureReceived);
            }
        }

        @Override
        public void cleanup(Context context) throws IOException {
            Configuration conf = context.getConfiguration();
            String taskId = conf.get("mapred.task.id");
            String path = conf.get("PageRankMassPath");

            Preconditions.checkNotNull(taskId);
            Preconditions.checkNotNull(path);

            // Write to a file the amount of PageRank mass we've seen in this reducer.
            FileSystem fs = FileSystem.get(context.getConfiguration());
            FSDataOutputStream out = fs.create(new Path(path + "/" + taskId), false);
            out.writeFloat(totalMass);
            out.close();
        }
    }

    // Mapper that distributes the missing PageRank mass (lost at the dangling nodes) and takes care
    // of the random jump factor.
    private static class MapPageRankMassDistributionClass extends
            Mapper<IntWritable, PageRankNode, IntWritable, PageRankNode> {
        private float missingMass = 0.0f;
        private int nodeCnt = 0;

        @Override
        public void setup(Context context) throws IOException {
            Configuration conf = context.getConfiguration();

            missingMass = conf.getFloat("MissingMass", 0.0f);
            nodeCnt = conf.getInt("NodeCount", 0);
        }

        @Override
        public void map(IntWritable nid, PageRankNode node, Context context)
                throws IOException, InterruptedException {
            float p = node.getPageRank();

            float jump = (float) (Math.log(ALPHA) - Math.log(nodeCnt));
            float link = (float) Math.log(1.0f - ALPHA)
                    + sumLogProbs(p, (float) (Math.log(missingMass) - Math.log(nodeCnt)));

            p = sumLogProbs(jump, link);
            node.setPageRank(p);

            context.write(nid, node);
        }
    }

    // Random jump factor.
    private static float ALPHA = 0.15f;
    private static NumberFormat formatter = new DecimalFormat("0000");

    /**
     * Dispatches command-line arguments to the tool via the {@code ToolRunner}.
     */
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new RunPageRankBasic(), args);
    }

    public RunPageRankBasic() {
    }

    private static final String BASE = "base";
    private static final String NUM_NODES = "numNodes";
    private static final String START = "start";
    private static final String END = "end";
    private static final String COMBINER = "useCombiner";
    private static final String INMAPPER_COMBINER = "useInMapperCombiner";
    private static final String RANGE = "range";

    /**
     * Runs this tool.
     */
    @SuppressWarnings({"static-access"})
    public int run(String[] args) throws Exception {
        Options options = new Options();

        options.addOption(new Option(COMBINER, "use combiner"));
        options.addOption(new Option(INMAPPER_COMBINER, "user in-mapper combiner"));
        options.addOption(new Option(RANGE, "use range partitioner"));

        options.addOption(OptionBuilder.withArgName("path").hasArg()
                .withDescription("base path").create(BASE));
        options.addOption(OptionBuilder.withArgName("num").hasArg()
                .withDescription("start iteration").create(START));
        options.addOption(OptionBuilder.withArgName("num").hasArg()
                .withDescription("end iteration").create(END));
        options.addOption(OptionBuilder.withArgName("num").hasArg()
                .withDescription("number of nodes").create(NUM_NODES));

        CommandLine cmdline;
        CommandLineParser parser = new GnuParser();

        try {
            cmdline = parser.parse(options, args);
        } catch (ParseException exp) {
            System.err.println("Error parsing command line: " + exp.getMessage());
            return -1;
        }

        if (!cmdline.hasOption(BASE) || !cmdline.hasOption(START) ||
                !cmdline.hasOption(END) || !cmdline.hasOption(NUM_NODES)) {
            System.out.println("args: " + Arrays.toString(args));
            HelpFormatter formatter = new HelpFormatter();
            formatter.setWidth(120);
            formatter.printHelp(this.getClass().getName(), options);
            ToolRunner.printGenericCommandUsage(System.out);
            return -1;
        }

        String basePath = cmdline.getOptionValue(BASE);
        int n = Integer.parseInt(cmdline.getOptionValue(NUM_NODES));
        int s = Integer.parseInt(cmdline.getOptionValue(START));
        int e = Integer.parseInt(cmdline.getOptionValue(END));
        boolean useCombiner = cmdline.hasOption(COMBINER);
        boolean useInmapCombiner = cmdline.hasOption(INMAPPER_COMBINER);
        boolean useRange = cmdline.hasOption(RANGE);

        LOG.info("Tool name: RunPageRank");
        LOG.info(" - base path: " + basePath);
        LOG.info(" - num nodes: " + n);
        LOG.info(" - start iteration: " + s);
        LOG.info(" - end iteration: " + e);
        LOG.info(" - use combiner: " + useCombiner);
        LOG.info(" - use in-mapper combiner: " + useInmapCombiner);
        LOG.info(" - user range partitioner: " + useRange);

        // Iterate PageRank.
        for (int i = s; i < e; i++) {
            iteratePageRank(i, i + 1, basePath, n, useCombiner, useInmapCombiner);
        }

        return 0;
    }

    // Run each iteration.
    private void iteratePageRank(int i, int j, String basePath, int numNodes,
                                 boolean useCombiner, boolean useInMapperCombiner) throws Exception {
        // Each iteration consists of two phases (two MapReduce jobs).

        // Job 1: distribute PageRank mass along outgoing edges.
        float mass = phase1(i, j, basePath, numNodes, useCombiner, useInMapperCombiner);

        // Find out how much PageRank mass got lost at the dangling nodes.
        float missing = 1.0f - (float) StrictMath.exp(mass);

        // Job 2: distribute missing mass, take care of random jump factor.
        phase2(i, j, missing, basePath, numNodes);
    }

    private float phase1(int i, int j, String basePath, int numNodes,
                         boolean useCombiner, boolean useInMapperCombiner) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJobName("PageRank:Basic:iteration" + j + ":Phase1");
        job.setJarByClass(RunPageRankBasic.class);

        String in = basePath + "/iter" + formatter.format(i);
        String out = basePath + "/iter" + formatter.format(j) + "t";
        String outm = out + "-mass";

        // We need to actually count the number of part files to get the number of partitions (because
        // the directory might contain _log).
        int numPartitions = 0;
        for (FileStatus s : FileSystem.get(getConf()).listStatus(new Path(in))) {
            if (s.getPath().getName().contains("part-"))
                numPartitions++;
        }

        LOG.info("PageRank: iteration " + j + ": Phase1");
        LOG.info(" - input: " + in);
        LOG.info(" - output: " + out);
        LOG.info(" - nodeCnt: " + numNodes);
        LOG.info(" - useCombiner: " + useCombiner);
        LOG.info(" - useInmapCombiner: " + useInMapperCombiner);
        LOG.info("computed number of partitions: " + numPartitions);

        int numReduceTasks = numPartitions;

        job.getConfiguration().setInt("NodeCount", numNodes);
        job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
        job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
        //job.getConfiguration().set("mapred.child.java.opts", "-Xmx2048m");
        job.getConfiguration().set("PageRankMassPath", outm);

        job.setNumReduceTasks(numReduceTasks);

        FileInputFormat.setInputPaths(job, new Path(in));
        FileOutputFormat.setOutputPath(job, new Path(out));

        job.setInputFormatClass(NonSplitableSequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(PageRankNode.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(PageRankNode.class);

        job.setMapperClass(useInMapperCombiner ? MapWithInMapperCombiningClass.class : MapClass.class);

        if (useCombiner) {
            job.setCombinerClass(CombineClass.class);
        }

        job.setReducerClass(ReduceClass.class);

        FileSystem.get(getConf()).delete(new Path(out), true);
        FileSystem.get(getConf()).delete(new Path(outm), true);

        long startTime = System.currentTimeMillis();
        job.waitForCompletion(true);
        System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");

        float mass = Float.NEGATIVE_INFINITY;
        FileSystem fs = FileSystem.get(getConf());
        for (FileStatus f : fs.listStatus(new Path(outm))) {
            FSDataInputStream fin = fs.open(f.getPath());
            mass = sumLogProbs(mass, fin.readFloat());
            fin.close();
        }

        return mass;
    }

    private void phase2(int i, int j, float missing, String basePath, int numNodes) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJobName("PageRank:Basic:iteration" + j + ":Phase2");
        job.setJarByClass(RunPageRankBasic.class);

        LOG.info("missing PageRank mass: " + missing);
        LOG.info("number of nodes: " + numNodes);

        String in = basePath + "/iter" + formatter.format(j) + "t";
        String out = basePath + "/iter" + formatter.format(j);

        LOG.info("PageRank: iteration " + j + ": Phase2");
        LOG.info(" - input: " + in);
        LOG.info(" - output: " + out);

        job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
        job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
        job.getConfiguration().setFloat("MissingMass", (float) missing);
        job.getConfiguration().setInt("NodeCount", numNodes);

        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job, new Path(in));
        FileOutputFormat.setOutputPath(job, new Path(out));

        job.setInputFormatClass(NonSplitableSequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(PageRankNode.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(PageRankNode.class);

        job.setMapperClass(MapPageRankMassDistributionClass.class);

        FileSystem.get(getConf()).delete(new Path(out), true);

        long startTime = System.currentTimeMillis();
        job.waitForCompletion(true);
        System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
    }

    // Adds two log probs.
    private static float sumLogProbs(float a, float b) {
        if (a == Float.NEGATIVE_INFINITY)
            return b;

        if (b == Float.NEGATIVE_INFINITY)
            return a;

        if (a < b) {
            return (float) (b + StrictMath.log1p(StrictMath.exp(a - b)));
        }

        return (float) (a + StrictMath.log1p(StrictMath.exp(b - a)));
    }
}

 

 

The code above is implemented without the Schimmy Trick, you can also find the explaination of it in this paper. You can choose whether or not use the Combiner or in-map sorting algorithm by specify additional options in command line. It follows the patterns we talked above. For further information of the code, refer the paper published recently.

分享到:
评论

相关推荐

    Algorithm-hadoop-pagerank.zip

    在Hadoop中,这可以通过MapReduce模型实现。Map阶段,输入数据是网页的链接关系,每个键值对表示一个网页及其链接的网页集合;Reduce阶段,根据网页ID进行分组,计算每个网页的新PageRank值。初始时,所有网页的...

    Data-Intensive+Text+Processing+with+MapReduce

    Combiners和In-Mapper Combining是实现本地聚合的两种方式,它们能够提高算法效率,但需注意保持算法正确性。 #### Pairs and Stripes Pairs and Stripes方法用于处理多维数据,通过将数据划分为不同的条带...

    Data-Intensive Text Processing with MapReduce

    6. **基本MapReduce算法设计**(Basic MapReduce Algorithm Design) 该部分介绍了如何设计MapReduce算法,如局部聚合、键值对处理、计算相对频率以及二次排序等技术,这些都是处理文本数据的关键步骤。 7. **倒排...

    Spells-Finder:允许使用咒语数据库的网站

    主要目标是在数据库(包含咒语)上使用MapReduce算法来过滤它们并找到合适的一个。先决条件为了能够使用我们的应用程序,您需要同时具有和数据库。 这两个服务已经包含在应用程序依赖项中,但是您需要将其安装到...

    sigmod2011全部论文(2)

    Fast Personalized PageRank on MapReduce (Page 973) Bahman Bahmani (Stanford University) Kaushik Chakrabarti (Microsoft Research) Dong Xin (Google Inc.) A Platform for Scalable One-Pass Analytics Using...

    大数据(Mining of Massive Datasets)

    - **Apriori Algorithm**:这是一种经典的频繁项集挖掘算法,通过逐步生成候选频繁项集来实现高效挖掘。 ##### 6. 聚类算法 - **大规模聚类**:对于非常大的、高维的数据集,传统的聚类算法可能不再适用。本书讨论...

    搜索引擎的研究与实现(Java)(含源码)

    1. 搜索引擎组成:主要包括爬虫(Spider)、索引(Indexer)、查询处理器(Query Processor)和排名算法(Ranking Algorithm)。爬虫负责抓取网页;索引器将抓取的网页内容转化为可搜索的数据结构;查询处理器解析...

    Mining of Massive Datasets

    - **A-Priori Algorithm**及其改进:A-Priori算法是一种经典的频繁项集挖掘算法,书中不仅介绍了其基本原理,还讨论了如何优化算法以提高效率。 #### 七、聚类算法 - **大型高维数据集聚类**:面对大规模的高维...

    Mining of massive datasets

    - **Apriori Algorithm**:是一种常用的挖掘频繁项集和关联规则的算法。它通过逐步扩展候选项集的方式,有效地减少了需要检查的组合数量。 #### 七、聚类算法 - **大规模聚类算法**:面对高维度和大规模的数据集,...

Global site tag (gtag.js) - Google Analytics