`

Hadoop Reduce Join及基于MRV2 API 重写

阅读更多

        编写不易,转载请注明(http://shihlei.iteye.com/blog/2263757)! 

 

        最近项目,需要对两个文件进行连接查询,从文件2中提取在文件1中选线id的记录。
主要问题:两个文件都很大【 文件1:1亿记录 ; 文件2:8亿记录 】 
方案:

  • 方案1:Map启动将文件1表示读取bloomfilter,map处理文件2,发现存在即输出。
    问题:文件1过大,读取时间长,task直接timeout被kill。

  • 方案2:使用Reduce端join,使用Hadoop data-join包的api进行连接

一 Hadoop Reduce Join

1思想

        根据输入标记数据源,根据提供的Group key 分组,在Reduce侧处理组内容,完成连接。

2 实现

(1)定义可标记的输出类型
	/**
	 * 
	 * 1 根据文件名作为tag,区分数据来源 2 将数据封装成TaggedMapOutput 对象,并打上必要的tag 3 生成group
	 * by的分组key,作为依据
	 * 
	 * */
	public static class JoinMapper extends DataJoinMapperBase {

		/**
		 * 读取输入的文件路径
		 * 
		 * **/
		protected Text generateInputTag(String inputFile) {
			// 取文件名的A和B作为来源标记
			String datasource = StringUtils.splitByWholeSeparatorPreserveAllTokens(inputFile, ".", 2)[0];
			return new Text(datasource);
		}

		/***
		 * 分组的Key
		 * 
		 * **/
		protected Text generateGroupKey(TaggedMapOutput aRecord) {
			String line = ((Text) aRecord.getData()).toString();
			if (StringUtils.isBlank(line)) {
				return null;
			}
			// 去每个文件的第一个字段作为连接key
			String groupKey = StringUtils.splitByWholeSeparatorPreserveAllTokens(line, ",", 2)[0];
			return new Text(groupKey);
		}

		/**
		 * 对文件打上标记
		 */
		protected TaggedMapOutput generateTaggedMapOutput(Object value) {
			TaggedWritable retv = new TaggedWritable((Text) value);
			retv.setTag(this.inputTag);
			return retv;
		}
	}

	// 自定义输出类型=====================================================
	public static class TaggedWritable extends TaggedMapOutput {
		private Writable data;

		// 需要定义默认构造函数,否则报错
		public TaggedWritable() {
			this.tag = new Text();
		}

		public TaggedWritable(Writable data) {
			this.tag = new Text("");
			this.data = data;
		}

		public Writable getData() {
			return data;
		}

		public void setData(Writable data) {
			this.data = data;
		}

		public void write(DataOutput out) throws IOException {
			this.tag.write(out);
			// 由于定义类型为WriteAble 所以不好使
			out.writeUTF(this.data.getClass().getName());
			this.data.write(out);
		}

		public void readFields(DataInput in) throws IOException {
			this.tag.readFields(in);
			this.data.readFields(in);
			String dataClz = in.readUTF();
			try {
				if (this.data == null || !this.data.getClass().getName().equals(dataClz)) {
					this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
				}
				this.data.readFields(in);
			} catch (ClassNotFoundException cnfe) {
				System.out.println("Problem in TaggedWritable class, method readFields.");
			}
		}
	}

遇到的坑:——跟Hadoop实战的区别

 

a)没有默认构造函数


报错:

java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.<init>()
  at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
  at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.<init>()
  at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
  at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)
  at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
  at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1421)
  at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1361)
  at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:220)
  at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:216)
  at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)
  at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:129)
  at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
  at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
  at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodException: x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.<init>()
  at java.lang.Class.getConstructor0(Class.java:3082)
  at java.lang.Class.getDeclaredConstructor(Class.java:2178)
  at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)

解决:

添加默认构造器。

  

b)反序列化readFields空指针

 

报错:

java.lang.Exception: java.lang.NullPointerException
  at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
  at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.NullPointerException
  at x.bd.hadoop.join.MR1ReduceJoinJob$TaggedWritable.readFields(MR1ReduceJoinJob.java:160)
  at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
  at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
  at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1421)
  at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1361)
  at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:220)
  at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:216)
  at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)
  at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:129)
  at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
  at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
  at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745) 

原因:

在反序列化时有如下(《Hadoop实战》中):

 

public void readFields(DataInput in) throws IOException {
	this.tag.readFields(in);
	this.data.readFields(in);
}
 这时两个成员变量还没有值

解决:
1 在构造是创建Text 类型 Tag对象。

2 由于Data 对象无法是Writeable类型,无法创建,所以只能在序列化时多记录类型,在readRields时反射出来。

 

详细见上代码。

 

(2)继承DataJoinMapperBase 实现记录标记

 

主要实现三个方法:

  /**
   * 根据文件名实现找打标记
   */
  protected abstract Text generateInputTag(String inputFile);

  /**
   * 对本行记录打上标记,生成TaggedMapOutput
   */
  protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);

  /**
   * 生成分组key,其实就是Reduce的key
   */
  protected abstract Text generateGroupKey(TaggedMapOutput aRecord);

代码: 

/**
   * 
   * 1 根据文件名作为tag,区分数据来源 2 将数据封装成TaggedMapOutput 对象,并打上必要的tag 3 生成group
   * by的分组key,作为依据
   * 
   * */
  public static class JoinMapper extends DataJoinMapperBase {

    /**
     * 读取输入的文件路径
     * 
     * **/
    protected Text generateInputTag(String inputFile) {
      // 取文件名的A和B作为来源标记
      String datasource = StringUtils.splitByWholeSeparatorPreserveAllTokens(inputFile, ".", 2)[0];
      return new Text(datasource);
    }

    /***
     * 分组的Key
     * 
     * **/
    protected Text generateGroupKey(TaggedMapOutput aRecord) {
      String line = ((Text) aRecord.getData()).toString();
      if (StringUtils.isBlank(line)) {
        return null;
      }
      // 去每个文件的第一个字段作为连接key
      String groupKey = StringUtils.splitByWholeSeparatorPreserveAllTokens(line, ",", 2)[0];
      return new Text(groupKey);
    }

    /**
     * 对文件打上标记
     */
    protected TaggedMapOutput generateTaggedMapOutput(Object value) {
      TaggedWritable retv = new TaggedWritable((Text) value);
      retv.setTag(this.inputTag);
      return retv;
    }
  }
(3)继承DataJoinReducerBase 根据条件数据数据

 

主要实现combin()方法,完成连接操作

public static class JoinReducer extends DataJoinReducerBase {

		/**
		 * tags,标签集合,且有顺序通常按照文件读取顺序 values,标签值,
		 * 
		 * 本方法被调一次,会传递一组要连接的记录,文件1的一条,文件2的一条
		 */
		protected TaggedMapOutput combine(Object[] tags, Object[] values) {
			// 按照需求,非left join 或 right join所以要求
			if (tags.length < 2)
				return null;

			String joinedStr = "";
			for (int i = 0; i < values.length; i++) {
				// 设置拼接符
				if (i > 0)
					joinedStr += ",";
				TaggedWritable tw = (TaggedWritable) values[i];
				String line = ((Text) tw.getData()).toString();
				String[] tokens = line.split(",", 2);
				joinedStr += tokens[1];
			}
			// 写出
			TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
			retv.setTag((Text) tags[0]);
			return retv;
		}
	}

 

(4)整体调用代码
public class MR1ReduceJoinJob {
	public static void main(String[] args) throws Exception {
		String in = "/Test/demo/in";
		String out = "/Test/demo/out";

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);

		JobConf job = buildJob(new Path(in), new Path(out), fs, conf);
		RunningJob runningJob = JobClient.runJob(job);
		// 等待结束
		runningJob.waitForCompletion();
		if (runningJob.isSuccessful()) {
			System.out.println("success ! ");
		} else {
			System.out.println(runningJob.getFailureInfo());
		}
	}

	public static JobConf buildJob(Path in, Path out, FileSystem fs, Configuration conf) throws IOException {
		fs.delete(out, true);

		JobConf job = new JobConf(new Configuration(), MR1ReduceJoinJob.class);
		job.setJobName("MR1ReduceJoinJob");

		job.setJarByClass(MR1ReduceJoinJob.class);

		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);

		job.setNumReduceTasks(1);

		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TaggedWritable.class);

		job.set("mapred.textoutputformat.separator", ",");

		// 解决死循环问题
		job.setLong("datajoin.maxNumOfValuesPerGroup", Long.MAX_VALUE);

		FileInputFormat.addInputPath(job, in);
		FileOutputFormat.setOutputPath(job, out);
		return job;
}
}
 

 

输入文件:

A:

 

1,a
2,b
3,c

 

B:

 

1,11
1,111
2,22
2,222
4,44

 

输出文件:

1,a,111
1,a,11
2,b,222
2,b,22

 

其他的坑:

 

大文件执行是一直出这个:

2-13 17:54:45 -2924 [localfetcher#5] INFO    - closeInMemoryFile -> map-output of size: 215770, inMemoryMapOutputs.size() -> 2, commitMemory -> 829095, usedMemory ->1044865
2015-12-13 17:54:45 -2925 [localfetcher#5] INFO    - localfetcher#5 about to shuffle output of map attempt_local647573184_0001_m_000009_0 decomp: 205864 len: 205868 to MEMORY
2015-12-13 17:54:45 -2925 [localfetcher#5] INFO    - Read 205864 bytes from map-output for attempt_local647573184_0001_m_000009_0
2015-12-13 17:54:45 -2925 [localfetcher#5] INFO    - closeInMemoryFile -> map-output of size: 205864, inMemoryMapOutputs.size() -> 3, commitMemory -> 1044865, usedMemory ->1250729
2015-12-13 17:54:45 -2926 [localfetcher#5] INFO    - localfetcher#5 about to shuffle output of map attempt_local647573184_0001_m_000007_0 decomp: 211843 len: 211847 to MEMORY
2015-12-13 17:54:45 -2926 [localfetcher#5] INFO    - Read 211843 bytes from map-output for attempt_local647573184_0001_m_000007_0
2015-12-13 17:54:45 -2926 [localfetcher#5] INFO    - closeInMemoryFile -> map-output of size: 211843, inMemoryMapOutputs.size() -> 4, commitMemory -> 1250729, usedMemory ->1462572
2015-12-13 17:54:45 -2927 [localfetcher#5] INFO    - localfetcher#5 about to shuffle output of map attempt_local647573184_0001_m_000000_0 decomp: 851861 len: 851865 to MEMORY
2015-12-13 17:54:45 -2929 [localfetcher#5] INFO    - Read 851861 bytes from map-output for attempt_local647573184_0001_m_000000_0
2015-12-1

 

根据源码及别人方案,要改成

job.setLong("datajoin.maxNumOfValuesPerGroup", Long.MAX_VALUE);

后来问题又不出现了。

 

4 不足

  • 基于mrv1 实现,API 交旧
  • Map的问题:
    (1)只能基于文件名分组,要求连接的文件名必须能区分
    (2)generateTaggedMapOutput()还需要手动绑定Tag

  • Reduce 问题:
    (1)迭代输出时不能灵活控制,框架给分组,输出单个文件的字段会重,需要自己处理。

  • 排序相同Group key的记录,也不适合处理超大的记录,可以通过二次排序改进。

  • 我这种需要查找exist的需求不好实现

二 基于MR V2 重写并改进

1 TaggedValue

 

package x.bd.hadoop.join.base;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.ReflectionUtils;

/**
 * 可标记的结果
 * 
 * @author shilei
 *
 */
public class TaggedValue implements Writable {
	private Text tag;
	private Writable data;

	public TaggedValue() {
		tag = new Text();
	}
	
	public TaggedValue(Writable data) {
		tag = new Text();
		this.data = data;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// 写出内容
		this.tag.write(out);
		out.writeUTF(this.data.getClass().getName());
		this.data.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.tag.readFields(in);
		String dataClz = in.readUTF();
		try {
			if (this.data == null || !this.data.getClass().getName().equals(dataClz)) {
				this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
			}
			this.data.readFields(in);
		} catch (ClassNotFoundException cnfe) {
			System.out.println("Problem in TaggedWritable class, method readFields.");
		}
	}

	public Text getTag() {
		return tag;
	}

	public void setTag(Text tag) {
		this.tag = tag;
	}

	public Writable getData() {
		return data;
	}

	public void setData(Writable data) {
		this.data = data;
	}

	/**
	 * clone克隆 一个 对象数据
	 * 
	 * @param conf
	 * @return
	 */
	public TaggedValue clone(Configuration conf) {
		return (TaggedValue) WritableUtils.clone(this, conf);
	}

	public static void main(String[] args) {
		System.out.println(TaggedValue.class.getName());
	}

}

 

2 DataJoinMapBase

 

package x.bd.hadoop.join.base;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 连接查询 Mapper
 * 
 * @author shilei
 *
 * @param <KEYIN>
 * @param <VALUEIN>
 */
public abstract class DataJoinMapperBase<KEYIN, VALUEIN> extends Mapper<KEYIN, VALUEIN, Text, TaggedValue> {
	// 类型标记
	protected Text inputTag;
	// 输入文件路径,文件名
	protected String inputFilePath, inputFileName;

	/**
	 * 根据数据的文件名确定输入标签
	 * 
	 */
	protected abstract Text generateInputTagByFile(String inputFilePath, String inputFileName);

	/**
	 * 根据行内容处理Tag
	 * 
	 * @param inputFilePath
	 * @param inputFileName
	 * @return
	 */
	protected Text generateInputTagByLine(Text tag, KEYIN key, VALUEIN value, Context context) {
		return inputTag;
	}

	/**
	 * 封装待标签的输出
	 */
	protected abstract TaggedValue generateTaggedMapValue(VALUEIN value);

	/**
	 * 生成group by的列
	 */
	protected abstract Text generateGroupKey(TaggedValue tagValue);

	@Override
	protected void setup(Mapper<KEYIN, VALUEIN, Text, TaggedValue>.Context context) throws IOException, InterruptedException {
		FileSplit inputSplit = (FileSplit)context.getInputSplit();
		
		this.inputFilePath = inputSplit.getPath().getParent().getName();
		this.inputFileName = inputSplit.getPath().getName();
		this.inputTag = generateInputTagByFile(this.inputFilePath, this.inputFileName);
	}

	@Override
	public void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
		// 根据本行情况,处理Tag
		this.inputTag = generateInputTagByLine(this.inputTag, key, value, context);
		
		// 生成带标签的value
		TaggedValue taggedValue = generateTaggedMapValue(value);
		if (taggedValue == null) {
			context.getCounter("DataJoinMapper", "discardedCount").increment(1);
			return;
		}

		// 生成分组健
		Text groupKey = generateGroupKey(taggedValue);
		if (groupKey == null) {
			context.getCounter("DataJoinMapper", "nullGroupKeyCount").increment(1);
			return;
		}

		// 输出内容绑定标签
		taggedValue.setTag(this.inputTag);
		// key : group key , value : taggedValue
		context.write(groupKey, taggedValue);
		context.getCounter("DataJoinMapper", "outCount").increment(1);
	}
}

 

3 DataJoinReduceJoin

 

package x.bd.hadoop.join.base;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 连接查询Reduce
 * 
 * @author shilei
 *
 * @param <KEYOUT>
 * @param <VALUEOUT>
 */
public abstract class DataJoinReducerBase<KEYOUT, VALUEOUT> extends Reducer<Text, TaggedValue, KEYOUT, VALUEOUT> {

	@Override
	protected void setup(Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
		super.setup(context);
	}

	/**
	 * 合并结果
	 */
	protected abstract void combine(SortedMap<Text, List<TaggedValue>> valueGroups, Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;

	@Override
	protected void reduce(Text key, Iterable<TaggedValue> values, Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
		// 获得一个sort map ,key 是tab ,value 是结果的集合
		SortedMap<Text, List<TaggedValue>> groups = regroup(key, values, context);
		combine(groups, context);
		context.getCounter("DataJoinReucer", "groupCount").increment(1);
	}

	/**
	 * 按照Tag 对value 进行充分组
	 * 
	 * @param key
	 * @param arg1
	 * @param reporter
	 * @return
	 * @throws IOException
	 */
	private SortedMap<Text, List<TaggedValue>> regroup(Text key, Iterable<TaggedValue> values, Reducer<Text, TaggedValue, KEYOUT, VALUEOUT>.Context context) throws IOException {
		/*
		 * key: tag; value : TaggedValue
		 */
		SortedMap<Text, List<TaggedValue>> valueGroup = new TreeMap<Text, List<TaggedValue>>();

		// 遍历Value
		Iterator<TaggedValue> iter = values.iterator();
		while (iter.hasNext()) {

			// TODO 为什么需要克隆?
			TaggedValue taggedValue = ((TaggedValue) iter.next()).clone(context.getConfiguration());
			// 获得记录的 tag
			Text tag = taggedValue.getTag();
			// 从map 中获取一个iterator,如果已经创建,就做一个情况

			List<TaggedValue> datas = valueGroup.get(tag);
			if (datas == null) {
				datas = new LinkedList<TaggedValue>();
				valueGroup.put(tag, datas);
			}
			datas.add(taggedValue);

			// System.out.println("reduce : " + taggedValue + "|" +
			// tag.toString() + "|" + taggedValue.getData().toString());
			taggedValue = null;
		}
		return valueGroup;
	}

} 

 

4 整体调用

package x.bd.hadoop.join;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import x.bd.hadoop.join.base.DataJoinMapperBase;
import x.bd.hadoop.join.base.DataJoinReducerBase;
import x.bd.hadoop.join.base.TaggedValue;

/**
 * 使用Hadoop API 对数据进行 Reduce 连接</br>
 * 
 * 文件1:A.txt 1,a</br> 2,b </br> 3,c
 * 
 * 文件2:B.txt 1,11</br> 1,111</br> 2,22</br> 2,222</br>4,44
 * 
 * 关联查询(要求inner join): 1,a,11</br> 1,a,111</br> 2,b,22 </br> 2,b,222</br>
 * 
 * 
 * @author shilei
 *
 */
public class MR2SelfReduceJoinJob extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int res = ToolRunner.run(conf, new MR2SelfReduceJoinJob(), args);
		if (res == 0) {
			System.out.println("MR2SelfReduceJoinJob  success !");
		} else {
			System.out.println("MR2SelfReduceJoinJob  error ! ");

		}
		System.exit(res);
	}

	@Override
	public int run(String[] args) throws Exception {
		String in = "/Test/demo/in";
		String out = "/Test/demo/out";

		Path inPath = new Path(in);
		Path outPath = new Path(out);

		Configuration conf = getConf();
		FileSystem fs = FileSystem.get(conf);

		fs.delete(outPath, true);

		Job job = Job.getInstance(conf, "MR2SelfReduceJoinJob");

		job.setJarByClass(MR2SelfReduceJoinJob.class);
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);

		job.setNumReduceTasks(1);

		// 处理map的输出
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TaggedValue.class);

		job.setOutputKeyClass(Writable.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);

		if (job.waitForCompletion(true)) {
			return 0;
		} else {
			return 1;
		}
	}

	// Map=======================================
	/**
	 * 
	 * 1 根据文件名作为tag,区分数据来源 2 将数据封装成TaggedMapOutput 对象,并打上必要的tag 3 生成group
	 * by的分组key,作为依据
	 * 
	 * */
	public static class JoinMapper extends DataJoinMapperBase<Object, Text> {
		/**
		 * 读取输入的文件路径
		 * 
		 */
		@Override
		protected Text generateInputTagByFile(String inputFilePath, String inputFileName) {
			// 取文件名的A和B作为来源标记
			String datasource = StringUtils.splitByWholeSeparatorPreserveAllTokens(inputFileName, ".", 2)[0];
			return new Text(datasource);
		}

		/**
		 * 按需峰值要处理的记录,这里只需要原样输出
		 */
		@Override
		protected TaggedValue generateTaggedMapValue(Text value) {
			return new TaggedValue(value);
		}

		/**
		 * 数据的第一个字段作为分组key
		 */
		@Override
		protected Text generateGroupKey(TaggedValue tagValue) {
			String line = ((Text) tagValue.getData()).toString();
			if (StringUtils.isBlank(line)) {
				return null;
			}
			// 去每个文件的第一个字段作为连接key
			String groupKey = StringUtils.splitByWholeSeparatorPreserveAllTokens(line, ",", 2)[0];
			return new Text(groupKey);
		}
	}

	// Reduce============================================
	public static class JoinReducer extends DataJoinReducerBase<Writable, NullWritable> {
		private Text key = new Text("B");

		@Override
		protected void combine(SortedMap<Text, List<TaggedValue>> valueGroups, Reducer<Text, TaggedValue, Writable, NullWritable>.Context context) throws IOException, InterruptedException {
			// 必须能连接上
			if (valueGroups.size() < 2) {
				return;
			}

			// 这里只输出文件2的字段
			// 写出
			List<TaggedValue> cookieValues = valueGroups.get(key);
			Iterator<TaggedValue> iter = cookieValues.iterator();
			while (iter.hasNext()) {
				TaggedValue value = iter.next();
				if (value == null) {
					continue;
				}
				context.write(value.getData(), NullWritable.get());
			}
		}
	}
}

三 源码包

 

分享到:
评论

相关推荐

    呼伦贝尔市-鄂温克族自治旗-街道行政区划_150724_Shp数据-wgs84坐标系.rar

    呼伦贝尔市-鄂温克族自治旗-街道行政区划_150724_Shp数据-wgs84坐标系.rar

    Cruise纯电动汽车仿真输入模板详解:涵盖8大核心模块参数设置与代码实现

    内容概要:本文详细介绍了用于Cruise纯电动汽车仿真的输入模板,该模板由8个表单组成,覆盖了从整车参数到计算输出的各个方面。每个表单都包含了关键参数的设置方法及其背后的逻辑,如校核清单、整车参数、电池参数、电机参数、传动系参数、制动轮胎参数、能量回收参数以及最终的计算输出。文中不仅提供了具体的参数定义和计算公式,还附有Python代码示例,帮助用户更好地理解和应用这些参数。此外,作者还分享了一些实用技巧,如防止参数遗漏的校验函数、处理电池温度效应的实际容量计算函数等。 适合人群:从事纯电动汽车仿真工作的工程师和技术人员,尤其是那些需要频繁处理复杂输入参数的人群。 使用场景及目标:① 提高纯电动汽车仿真工作的效率和准确性;② 规范参数收集流程,减少因参数错误导致的仿真失败;③ 提供详细的参数设置指导和代码实现,帮助用户更好地理解和应用Cruise仿真平台。 其他说明:本文不仅提供了一个全面的输入模板,还分享了许多实践经验,旨在帮助用户在实际工作中少走弯路,提高工作效率。

    张家口市-桥西区--街道行政区划_130703_Shp-wgs84坐标系.rar

    街道级行政区划shp数据,wgs84坐标系,直接下载使用。

    通辽市-通辽市-街道行政区划_150500_Shp数据-wgs84坐标系.rar

    街道级行政区划shp矢量数据,wgs84坐标系,下载直接使用

    【预编码】基于matlab大规模多用户MIMO系统低复杂度混合预编码(Rayleigh信道)【含Matlab源码 13197期】.zip

    Matlab领域上传的视频是由对应的完整代码运行得来的,完整代码皆可运行,亲测可用,适合小白; 1、从视频里可见完整代码的内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、仿真咨询 如需其他服务,可私信博主; 4.1 博客或资源的完整代码提供 4.2 期刊或参考文献复现 4.3 Matlab程序定制 4.4 科研合作

    CTF竞赛基于杂项题目的隐写术与编码挑战:涵盖LSB隐写、摩斯密码、进制转换及文件格式转换技巧了文档的核心内容

    内容概要:本文档是作者在bugku平台进行CTF(夺旗赛)杂项题目练习的解题思路总结,涵盖第25至33题。题目类型多样,包括但不限于隐写术、进制转换、音频分析、图像处理等。每道题都详细介绍了背景信息、解题步骤和所使用的工具,如Stegsolve用于图片隐写分析、Python脚本处理进制转换、Audacity解析音频中的摩尔斯电码等。通过这些实例,展示了如何运用各种技术手段解决实际问题,强调了理论与实践相结合的重要性。 适合人群:对信息安全、逆向工程感兴趣的读者,特别是有一定编程基础和技术积累的安全爱好者或初学者。 使用场景及目标:①学习隐写术的基本原理及其在CTF比赛中的应用;②掌握不同进制间的转换方法及其实现;③熟悉音频文件中提取摩尔斯电码的技术;④了解图像处理技巧,如调整尺寸、解析隐藏信息等;⑤掌握压缩文件的明文攻击技巧,以及如何利用已知信息破解加密文件。 阅读建议:由于每道题涉及的知识点较为独立且专业性强,建议读者根据自己的兴趣选择相关题目深入研究。同时,在学习过程中应注重动手实践,尝试复现文中提到的操作流程,并结合网络资源进一步拓展知识面。对于遇到的工具和概念,可以通过查阅官方文档或参考教程加深理解。

    Qt时间标尺控件:实现丝滑缩放与自适应刻度的高效可视化组件

    内容概要:本文详细介绍了如何在Qt中实现一个高效的时间标尺控件,重点讲解了时间标尺的缩放功能、刻度自动生成以及曲线绘制的技术细节。首先,通过重载wheelEvent方法,利用QGraphicsView框架实现了基于鼠标的缩放功能,确保缩放过程中鼠标位置对应的时间点不变。其次,针对不同的时间范围,采用对数分级算法自动调整刻度间隔,使刻度线既美观又实用。最后,在曲线绘制方面,使用QPainterPath进行路径构建,并通过预处理和分段绘制优化性能,确保即使面对大量数据点也能保持流畅的用户体验。 适合人群:具有一定Qt开发经验的程序员,尤其是从事数据可视化项目的开发者。 使用场景及目标:适用于需要展示时间序列数据的应用程序,如金融图表、监控系统、日志分析工具等。主要目标是提供一个响应迅速、视觉效果优秀的交互式时间标尺控件,帮助用户更好地理解和分析数据。 其他说明:文中还提到了一些性能优化的小技巧,如数据预处理、路径分段绘制等,有助于提高大型数据集的渲染速度。同时,作者强调了在时间转换函数中避免使用低效的方法,推荐自行实现高效的缓存机制。

    天津市-静海区-街道行政区_120118_Shapefile_wgs84坐标系.rar

    街道级行政区划shp数据,wgs84坐标系,直接下载使用。

    石家庄市-石家庄市-石家庄市-赵县-街道行政区划_130133_Shp数据wgs84坐标系.rar

    街道级行政区划shp数据,wgs84坐标系,直接下载使用。

    赤峰市-喀喇沁旗-街道行政区划_150428_Shp数据-wgs84坐标系.rar

    赤峰市-喀喇沁旗-街道行政区划_150428_Shp数据-wgs84坐标系.rar

    【时间序列预测】基于Python和LSTM的时间序列预测系统设计与实现:从环境搭建到案例实战

    内容概要:本文详细介绍了使用Python和LSTM(长短期记忆网络)进行时间序列预测的方法及其应用场景。首先阐述了时间序列预测的重要性,指出传统ARIMA模型在处理复杂模式和长期依赖关系时的局限性,进而引出LSTM的优势。LSTM通过引入门控机制(输入门、遗忘门、输出门)和记忆单元,有效解决了长期依赖问题,能更好地捕捉时间序列中的复杂模式。接着,文章详细讲解了LSTM的工作原理,包括各个门控机制的作用和计算流程。随后,通过股票价格预测和气温预测两个案例,逐步演示了从环境搭建、数据准备(包括数据读取、归一化处理)、模型构建(使用Keras搭建LSTM模型)、模型编译、训练与评估到预测结果可视化的全过程。最后,文章总结了LSTM的关键技术和实现要点,并展望了其在自然语言处理、计算机视觉、生物学等领域的应用前景及未来研究方向。 适合人群:具备一定编程基础,尤其是对深度学习和时间序列预测感兴趣的开发者、数据科学家和研究人员。 使用场景及目标:①帮助读者掌握LSTM的基本原理和工作流程;②提供详细的Python实现步骤,包括环境配置、数据处理、模型搭建与训练;③通过具体案例展示LSTM在时间序列预测中的应用,如股票价格预测和气温预测;④探讨LSTM在其他领域的潜在应用,如自然语言处理、计算机视觉和生物学等。 阅读建议:本文内容详尽,涵盖理论与实践两方面,建议读者在阅读过程中结合代码实践,逐步理解LSTM的工作原理和实现细节,特别是注意数据处理和模型参数的选择对预测效果的影响。

    三菱FX5U机床双轴定位控制系统解析与优化 - 结构化编程及应用实例

    内容概要:本文详细介绍了基于三菱FX5U PLC的机床X轴和Y轴工作台定位控制系统的开发与优化过程。主要内容涵盖:使用J4-A系列伺服驱动器进行绝对位置控制,通过ST语言和结构化梯形图实现复杂的20组直线插补工序;手动模式下的点动与长按操作逻辑;MODBUS通讯协议的应用;以及详细的报警诊断和统计功能。文中展示了如何利用结构体封装参数,提高代码可读性和维护性,并通过具体案例解释了关键技术和调试经验。 适合人群:从事工业自动化控制领域的工程师和技术人员,尤其是熟悉三菱PLC编程的从业者。 使用场景及目标:适用于需要深入了解三菱FX5U PLC编程技巧及其在实际工程项目中应用的人群。目标是掌握高级编程方法如结构化编程、ST语言特性、MODBUS通讯优化等,从而提升工作效率并减少调试时间。 其他说明:文章不仅提供了理论知识,还包括大量实用的编程技巧和实践经验分享,有助于读者更好地理解和应用于实际工作中。

    大同市-大同市-街道行政区划_140200_Shp数据-wgs84坐标系.rar

    大同市-大同市-街道行政区划_140200_Shp数据-wgs84坐标系.rar

    火电厂协调仿真机:提升PID参数调试效率与安全性

    内容概要:本文详细介绍了火电厂协调仿真机的应用及其优势,特别是在PID参数调试方面的高效性和安全性。文中通过具体的Python代码示例展示了如何构建锅炉和汽轮机的仿真模型,并解释了PID控制器的工作原理。重点讨论了PID参数调试的关键点,如响应延迟、采样时间设定以及前馈控制的叠加效果。此外,还提到了实时曲线对比、参数扫描、自整定算法等功能的实际应用,强调了仿真机在提高调试效率和降低现场调试风险方面的重要作用。 适合人群:从事火电厂自动化控制领域的工程师和技术人员,尤其是需要进行PID参数调试的专业人士。 使用场景及目标:① 提高PID参数调试效率,减少现场调试时间和成本;② 降低现场调试的安全风险;③ 实现更加精确和平稳的控制系统性能。 其他说明:文章不仅提供了理论指导,还结合了大量的实战经验和具体代码示例,帮助读者更好地理解和掌握协调仿真机的使用方法。

    邢台市-襄都区--街道行政区划_130502_Shp-wgs84坐标系.rar

    街道级行政区划shp数据,wgs84坐标系,直接下载使用。

    保定市-博野县--街道行政区划_130637_Shp-wgs84坐标系.rar

    街道级行政区划shp数据,wgs84坐标系,直接使用。

    学号-姓名-作业二编写程序.ipynb

    学号-姓名-作业二编写程序.ipynb

    正弦内插算法的FPGA实现.docx

    正弦内插算法的FPGA实现.docx

    呼和浩特市_回民区_街道级--街道行政区划_150103_Shp_wgs84坐标系.rar

    街道级行政区划shp数据,wgs84坐标系,直接使用。

Global site tag (gtag.js) - Google Analytics