`
sunwinner
  • 浏览: 202506 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Joins with Apache Crunch

 
阅读更多

Apache Crunch is a Java library for creating MapReduce pipelines that is based on Google's FlumeJava library. Like other high-level tools for creating MapReduce jobs, such as Apache Hive, Apache Pig, and Cascading, Crunch provides a library of patterns to implement common tasks like joining data, performing aggregations, and sorting records. Unlike those other tools, Crunch does not impose a single data type that all of its inputs must conform to. Instead, Crunch uses a customizable type system that is flexible enough to work directly with complex data such as time series, HDFS files, Apache HBase tables, and serialized objects like protocol buffers or Avro records.

 

In this blog post, we will look at how to use Crunch's built-in joining capabilities to join data together. The built-in support for joins in Crunch could cover most of scenarios you may encounter in real world. As you can see in below class diagram:

It's no need for you to instanciate each type of join class yourself, Crunch has a utility class for joining multiple PTable instances based on a common key. With these built-in support for Map-side and Reduce-side joins, you can write semi-joins as well. Now let's talk about the built-in supportted joins one by one.

 

  • Join utility class

    From the class outline you can see each type of join is exposed by a static method, the only exception is the MapsideJoin, which we will talk about later. Another thing you should note is there is a method called preJoin(...), which is used to launch a number of Map tasks to tag the data to be joined. After that, the method groupByKey(...) will be called which result in a reduce phase been triggered.
      private static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) {
        PTypeFamily ptf = left.getTypeFamily();
        PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
            ptf.pairs(left.getValueType(), right.getValueType()));
    
        PTable<Pair<K, Integer>, Pair<U, V>> tag1 = left.parallelDo("joinTagLeft",
            new MapFn<Pair<K, U>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
              @Override
              public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
                return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
              }
            }, ptt);
        PTable<Pair<K, Integer>, Pair<U, V>> tag2 = right.parallelDo("joinTagRight",
            new MapFn<Pair<K, V>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
              @Override
              public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
                return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
              }
            }, ptt);
    
        GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
        optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
    
        return (tag1.union(tag2)).groupByKey(optionsBuilder.build());
      }
    The reason we need to tag the joining data is that the reducer will see the records from both sources that have the same key, but they are not guaranteed to be in any particular order. However, to perform the join, it's important to have the data from one source before another. So in order to impose an order on the values for each key that the reducers see, we should tag the data.
  • Inner join, we're not going to talk about what inner join is here, please find it via Wikipedia.
      /**
       * Performs an inner join on the specified {@link PTable}s.
       * 
       * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner
       *      Join</a>
       * @param left
       *          A PTable to perform an inner join on.
       * @param right
       *          A PTable to perform an inner join on.
       * @param <K>
       *          Type of the keys.
       * @param <U>
       *          Type of the first {@link PTable}'s values
       * @param <V>
       *          Type of the second {@link PTable}'s values
       * @return The joined result.
       */
      public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
        return innerJoin(left, right);
      }
    
      /**
       * Performs an inner join on the specified {@link PTable}s.
       * 
       * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner
       *      Join</a>
       * @param left
       *          A PTable to perform an inner join on.
       * @param right
       *          A PTable to perform an inner join on.
       * @param <K>
       *          Type of the keys.
       * @param <U>
       *          Type of the first {@link PTable}'s values
       * @param <V>
       *          Type of the second {@link PTable}'s values
       * @return The joined result.
       */
      public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) {
        return join(left, right, new InnerJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
     
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.crunch.lib.join;
    
    import java.util.List;
    
    import org.apache.crunch.Emitter;
    import org.apache.crunch.Pair;
    import org.apache.crunch.types.PType;
    
    import com.google.common.collect.Lists;
    
    /**
     * Used to perform the last step of an inner join.
     * 
     * @param <K> Type of the keys.
     * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
     * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
     */
    public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
    
      private transient K lastKey;
      private transient List<U> leftValues;
    
      public InnerJoinFn(PType<K> keyType, PType<U> leftValueType) {
        super(keyType, leftValueType);
      }
    
      /** {@inheritDoc} */
      @Override
      public void initialize() {
        super.initialize();
        lastKey = null;
        this.leftValues = Lists.newArrayList();
      }
    
      /** {@inheritDoc} */
      @Override
      public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
        if (!key.equals(lastKey)) {
          lastKey = keyType.getDetachedValue(key);
          leftValues.clear();
        }
        if (id == 0) { // from left
          for (Pair<U, V> pair : pairs) {
            if (pair.first() != null)
              leftValues.add(leftValueType.getDetachedValue(pair.first()));
          }
        } else { // from right
          for (Pair<U, V> pair : pairs) {
            for (U u : leftValues) {
              emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
            }
          }
        }
      }
    
      /** {@inheritDoc} */
      @Override
      public String getJoinType() {
        return "innerJoin";
      }
    }
    
    It's pretty simple, in the join(...) method, the data tagged with 0 will arrive first, followed by the data tagged with 1 which share the same key.
  • Left join. Please find what left join is via Wikipedia.
      /**
       * Performs a left outer join on the specified {@link PTable}s.
       * 
       * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Left_outer_join">Left
       *      Join</a>
       * @param left
       *          A PTable to perform an left join on. All of this PTable's entries
       *          will appear in the resulting PTable.
       * @param right
       *          A PTable to perform an left join on.
       * @param <K>
       *          Type of the keys.
       * @param <U>
       *          Type of the first {@link PTable}'s values
       * @param <V>
       *          Type of the second {@link PTable}'s values
       * @return The joined result.
       */
      public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) {
        return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
      }
     
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.crunch.lib.join;
    
    import java.util.List;
    
    import org.apache.crunch.Emitter;
    import org.apache.crunch.Pair;
    import org.apache.crunch.types.PType;
    
    import com.google.common.collect.Lists;
    
    /**
     * Used to perform the last step of an left outer join.
     * 
     * @param <K> Type of the keys.
     * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
     * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
     */
    public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
    
      private transient int lastId;
      private transient K lastKey;
      private transient List<U> leftValues;
    
      public LeftOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
        super(keyType, leftValueType);
      }
    
      /** {@inheritDoc} */
      @Override
      public void initialize() {
        super.initialize();
        lastId = 1;
        lastKey = null;
        this.leftValues = Lists.newArrayList();
      }
    
      /** {@inheritDoc} */
      @Override
      public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
        if (!key.equals(lastKey)) {
          // Make sure that left side always gets emitted.
          if (0 == lastId) {
            for (U u : leftValues) {
              emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
            }
          }
          lastKey = keyType.getDetachedValue(key);
          leftValues.clear();
        }
        if (id == 0) {
          for (Pair<U, V> pair : pairs) {
            if (pair.first() != null)
              leftValues.add(leftValueType.getDetachedValue(pair.first()));
          }
        } else {
          for (Pair<U, V> pair : pairs) {
            for (U u : leftValues) {
              emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
            }
          }
        }
    
        lastId = id;
      }
    
      /** {@inheritDoc} */
      @Override
      public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
        if (0 == lastId) {
          for (U u : leftValues) {
            emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
          }
        }
      }
    
      /** {@inheritDoc} */
      @Override
      public String getJoinType() {
        return "leftOuterJoin";
      }
    }
    
    The only difference is inside the join(...) method, it will make sure values from left side always get emitted.
  • Right join. Please find what right join means via Wikipedia.
      /**
       * Performs a right outer join on the specified {@link PTable}s.
       * 
       * @see <a
       *      href="http://en.wikipedia.org/wiki/Join_(SQL)#Right_outer_join">Right
       *      Join</a>
       * @param left
       *          A PTable to perform an right join on.
       * @param right
       *          A PTable to perform an right join on. All of this PTable's entries
       *          will appear in the resulting PTable.
       * @param <K>
       *          Type of the keys.
       * @param <U>
       *          Type of the first {@link PTable}'s values
       * @param <V>
       *          Type of the second {@link PTable}'s values
       * @return The joined result.
       */
      public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) {
        return join(left, right, new RightOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
      }
     
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.crunch.lib.join;
    
    import java.util.List;
    
    import org.apache.crunch.Emitter;
    import org.apache.crunch.Pair;
    import org.apache.crunch.types.PType;
    
    import com.google.common.collect.Lists;
    
    /**
     * Used to perform the last step of an right outer join.
     * 
     * @param <K> Type of the keys.
     * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
     * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
     */
    public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
    
      private transient K lastKey;
      private transient List<U> leftValues;
    
      public RightOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
        super(keyType, leftValueType);
      }
    
      /** {@inheritDoc} */
      @Override
      public void initialize() {
        super.initialize();
        lastKey = null;
        this.leftValues = Lists.newArrayList();
      }
    
      /** {@inheritDoc} */
      @Override
      public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
        if (!key.equals(lastKey)) {
          lastKey = keyType.getDetachedValue(key);
          leftValues.clear();
        }
        if (id == 0) {
          for (Pair<U, V> pair : pairs) {
            if (pair.first() != null)
              leftValues.add(leftValueType.getDetachedValue(pair.first()));
          }
        } else {
          for (Pair<U, V> pair : pairs) {
            // Make sure that right side gets emitted.
            if (leftValues.isEmpty()) {
              leftValues.add(null);
            }
    
            for (U u : leftValues) {
              emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
            }
          }
        }
      }
    
      /** {@inheritDoc} */
      @Override
      public String getJoinType() {
        return "rightOuterJoin";
      }
    }
    
    It will make sure values from the right side always get emitted.
  • Full join. Please find what full join means via Wikipedia.
      /**
       * Performs a full outer join on the specified {@link PTable}s.
       * 
       * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Full_outer_join">Full
       *      Join</a>
       * @param left
       *          A PTable to perform an full join on.
       * @param right
       *          A PTable to perform an full join on.
       * @param <K>
       *          Type of the keys.
       * @param <U>
       *          Type of the first {@link PTable}'s values
       * @param <V>
       *          Type of the second {@link PTable}'s values
       * @return The joined result.
       */
      public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) {
        return join(left, right, new FullOuterJoinFn<K, U, V>(left.getKeyType(), left.getValueType()));
      }
     
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.crunch.lib.join;
    
    import java.util.List;
    
    import org.apache.crunch.Emitter;
    import org.apache.crunch.Pair;
    import org.apache.crunch.types.PType;
    
    import com.google.common.collect.Lists;
    
    /**
     * Used to perform the last step of an full outer join.
     * 
     * @param <K> Type of the keys.
     * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
     * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
     */
    public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
    
      private transient int lastId;
      private transient K lastKey;
      private transient List<U> leftValues;
    
      public FullOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
        super(keyType, leftValueType);
      }
    
      /** {@inheritDoc} */
      @Override
      public void initialize() {
        super.initialize();
        lastId = 1;
        lastKey = null;
        this.leftValues = Lists.newArrayList();
      }
    
      /** {@inheritDoc} */
      @Override
      public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
        if (!key.equals(lastKey)) {
          // Make sure that left side gets emitted.
          if (0 == lastId) {
            for (U u : leftValues) {
              emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
            }
          }
          lastKey = keyType.getDetachedValue(key);
          leftValues.clear();
        }
        if (id == 0) {
          for (Pair<U, V> pair : pairs) {
            if (pair.first() != null)
              leftValues.add(leftValueType.getDetachedValue(pair.first()));
          }
        } else {
          for (Pair<U, V> pair : pairs) {
            // Make sure that right side gets emitted.
            if (leftValues.isEmpty()) {
              leftValues.add(null);
            }
            for (U u : leftValues) {
              emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
            }
          }
        }
    
        lastId = id;
      }
    
      /** {@inheritDoc} */
      @Override
      public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
        if (0 == lastId) {
          for (U u : leftValues) {
            emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
          }
        }
      }
    
      /** {@inheritDoc} */
      @Override
      public String getJoinType() {
        return "fullOuterJoin";
      }
    }
    
    It will make sure that values from the left and the right side always get emitted, of course the values with the same key will be emitted as one record.
  • Map-side join. In Crunch, map-side join is a in memory join, the table from the right side will be loaded fully in memory, so this method should only be used if the right side table's contents can fit in the memory allocated to mappers. Also it is a inner join.
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.crunch.lib.join;
    
    import java.io.IOException;
    
    import org.apache.crunch.CrunchRuntimeException;
    import org.apache.crunch.DoFn;
    import org.apache.crunch.Emitter;
    import org.apache.crunch.PTable;
    import org.apache.crunch.Pair;
    import org.apache.crunch.ParallelDoOptions;
    import org.apache.crunch.SourceTarget;
    import org.apache.crunch.io.ReadableSourceTarget;
    import org.apache.crunch.materialize.MaterializableIterable;
    import org.apache.crunch.types.PType;
    import org.apache.crunch.types.PTypeFamily;
    import org.apache.crunch.util.DistCache;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    
    import com.google.common.collect.ArrayListMultimap;
    import com.google.common.collect.HashMultimap;
    import com.google.common.collect.Multimap;
    
    /**
     * Utility for doing map side joins on a common key between two {@link PTable}s.
     * <p>
     * A map side join is an optimized join which doesn't use a reducer; instead,
     * the right side of the join is loaded into memory and the join is performed in
     * a mapper. This style of join has the important implication that the output of
     * the join is not sorted, which is the case with a conventional (reducer-based)
     * join.
     * <p>
     * <b>Note:</b>This utility is only supported when running with a
     * {@link MRPipeline} as the pipeline.
     */
    public class MapsideJoin {
    
      /**
       * Join two tables using a map side join. The right-side table will be loaded
       * fully in memory, so this method should only be used if the right side
       * table's contents can fit in the memory allocated to mappers. The join
       * performed by this method is an inner join.
       * 
       * @param left
       *          The left-side table of the join
       * @param right
       *          The right-side table of the join, whose contents will be fully
       *          read into memory
       * @return A table keyed on the join key, containing pairs of joined values
       */
      public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
        PTypeFamily tf = left.getTypeFamily();
        Iterable<Pair<K, V>> iterable = right.materialize();
    
        if (iterable instanceof MaterializableIterable) {
          MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
          MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
              right.getPType());
          ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
          if (mi.isSourceTarget()) {
            optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
          }
          return left.parallelDo("mapjoin", mapJoinDoFn,
              tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
              optionsBuilder.build());
        } else { // in-memory pipeline
          return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
              tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
        }
      }
    
      static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
    
        private Multimap<K, V> joinMap;
        
        public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
          joinMap = HashMultimap.create();
          for (Pair<K, V> joinPair : iterable) {
            joinMap.put(joinPair.first(), joinPair.second());
          }
        }
        
        @Override
        public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
          K key = input.first();
          U value = input.second();
          for (V joinValue : joinMap.get(key)) {
            Pair<U, V> valuePair = Pair.of(value, joinValue);
            emitter.emit(Pair.of(key, valuePair));
          }
        }
      }
      
      static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
    
        private String inputPath;
        private PType<Pair<K, V>> ptype;
        private Multimap<K, V> joinMap;
    
        public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
          this.inputPath = inputPath;
          this.ptype = ptype;
        }
    
        private Path getCacheFilePath() {
          Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
          if (local == null) {
            throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
          }
          return local;
        }
    
        @Override
        public void configure(Configuration conf) {
          DistCache.addCacheFile(new Path(inputPath), conf);
        }
        
        @Override
        public void initialize() {
          super.initialize();
    
          ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
              getCacheFilePath());
          Iterable<Pair<K, V>> iterable = null;
          try {
            iterable = sourceTarget.read(getConfiguration());
          } catch (IOException e) {
            throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
          }
    
          joinMap = ArrayListMultimap.create();
          for (Pair<K, V> joinPair : iterable) {
            joinMap.put(joinPair.first(), joinPair.second());
          }
        }
    
        @Override
        public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
          K key = input.first();
          U value = input.second();
          for (V joinValue : joinMap.get(key)) {
            Pair<U, V> valuePair = Pair.of(value, joinValue);
            emitter.emit(Pair.of(key, valuePair));
          }
        }
      }
    }
    
     

 

  • 大小: 23.2 KB
  • 大小: 49.8 KB
分享到:
评论

相关推荐

    Track Join - Distributed Joins with Minimal Network Traffic (sigmod14II)-计算机科学

    Track Join: Distributed Joins with Minimal Network TrafficOrestis Polychroniou∗ Columbia Universityorestis@cs.columbia.eduRajkumar Sen Oracle Labsrajkumar.sen@oracle.comKenneth A. Ross† Columbia ...

    SQL_Joins.zip_sql joins

    sql joins with simple description in a diagram

    Beginning Apache Pig: Big Data Processing Made Easy [2016]

    • Integrate Apache Pig with other tools • Extend Apache Pig • Optimize Pig Latin code • Solve different use cases for Pig Latin Who This Book Is For All levels of IT professionals: architects, big...

    Pro Apache Phoenix(Apress,2016)

    Pro Apache Phoenix covers the nuances of setting up a distributed HBase cluster with Phoenix libraries, running performance benchmarks, configuring parameters for production scenarios, and viewing the...

    High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark

    The choice between data joins in Core Spark and Spark SQL Techniques for getting the most out of standard RDD transformations How to work around performance issues in Spark’s key/value pair paradigm ...

    apache hbase reference guide pdf

    除此之外,还会介绍HBase中的连接(Joins)操作和ACID属性。 在HBase与模式设计(Schema Design)方面,指南会介绍如何创建模式,包括表结构设计的基本原则,如列族和行键(Rowkey)的设计。同时,还会讨论版本数的...

    rxjava-joins包

    rxjava的扩展包,其中包含一些rxjava核心包没有实现的操作符,比如and/then/when

    Learning_Apache_Flink_ColorImages.pdf

    Flink的DataStream API支持多种转换操作,如map、filter、flatMap、keyBy、reduce等,以及连接(joins)和窗口函数。 **Chapter 3: Data Processing Using the BatchProcessing API** DataSet API是Flink处理批量...

    SQL Joins.zip_database_sql joins_statement

    在这个“SQL Joins.zip_database_sql joins_statement”压缩包中,很可能是包含了一些关于SQL连接操作的详细资料。 SQL的Join操作主要分为以下几种类型: 1. **内连接(Inner Join)**:返回两个表中存在匹配记录...

    SELECT JOINS

    SELECT JOIN 的用法,一眼明了

    Laravel开发-eloquent-joins

    Eloquent提供了许多高级功能,其中包括对关系级别的联接的支持,即`joins`。本篇文章将深入探讨如何在Eloquent中使用`joins`进行数据库查询,以提升数据处理的效率和灵活性。 首先,理解Eloquent的基础是必要的。它...

    Hadoop.Application.Architectures.1491900083

    Get expert guidance on architecting end-to-end data management solutions with Apache Hadoop. While many sources explain how to use various components in the Hadoop ecosystem, this practical book takes...

    apache_hbase_reference_guide(官网).pdf

    HBase是一个开源的非关系型分布式数据库(NoSQL),基于Google的BigTable论文设计,运行在Hadoop文件系统(HDFS)之上,并且是Apache软件基金会的Hadoop项目的一部分。HBase旨在提供快速的随机访问大量结构化数据,...

    ArcGIS Geodatabase Joins

    ArcGIS Geodatabase Joins是GIS(地理信息系统)领域中的一个重要概念,主要应用于处理和分析地理空间数据。在ArcGIS中,Geodatabase是存储地理数据的主要方式,而Joins则是连接不同数据表或图层的关键操作,使得...

    Python库 | faust-joins-0.1.3.tar.gz

    Faust是一个基于Python的实时流处理库,灵感来源于Apache Kafka的Kafka Streams和Akka Streams。它允许开发者构建分布式、容错的数据流应用程序。Faust的核心理念是将数据处理抽象为有向无环图(DAG),其中节点代表...

    EE34 Joins.docx

    ### Google Earth Engine (GEE) Joins:理解与应用 #### 概述 Google Earth Engine (GEE) 是一个强大的在线平台,它结合了卫星图像、地理信息系统 (GIS) 和云计算技术,为用户提供了一个用于环境监测和地球科学...

    AAIT_SQL_JOINS.pdf

    SQL JOINs是数据库查询中的重要概念,用于合并两个或多个表的数据,以便在单个查询结果中获得所需的信息。在AAIT_SQL_JOINS.pdf中,我们可能会深入学习各种类型的JOIN操作,包括LEFT JOIN, FULL OUTER JOIN, RIGHT ...

Global site tag (gtag.js) - Google Analytics