`
BlackWing
  • 浏览: 199977 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

分拆TableSplit 让多个mapper同时读取

阅读更多
默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。

由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
mapred.min.split.size
mapred.max.split.size


所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。

HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
TableMapReduceUtil.initTableMapperJob(table[0].getBytes(), scan,
					UserViewHisMapper2.class, Text.class, Text.class,
					genRecommendations);

而这个方法,最终是调用以下方法进行初始化设置的:
 public static void initTableMapperJob(byte[] table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<? extends WritableComparable> outputKeyClass,
      Class<? extends Writable> outputValueClass, Job job,
      boolean addDependencyJars)
  throws IOException {
      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
              outputValueClass, job, addDependencyJars, TableInputFormat.class);
  }


所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:

public class TableInputFormat extends TableInputFormatBase
implements Configurable 


最终要修改的则是TableInputFormatBase这个类,修改其以下方法:

public List<InputSplit> getSplits(JobContext context) throws IOException {}


这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
  public List<InputSplit> getSplits(JobContext context) throws IOException {
	if (table == null) {
	    throw new IOException("No table was provided.");
	}
    Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
    if (keys == null || keys.getFirst() == null ||
        keys.getFirst().length == 0) {
      throw new IOException("Expecting at least one region.");
    }
    int count = 0;
    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
    for (int i = 0; i < keys.getFirst().length; i++) {
      if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
        continue;
      }
      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
        getHostname();
      byte[] startRow = scan.getStartRow();
      byte[] stopRow = scan.getStopRow();
      // determine if the given start an stop key fall into the region
      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
          (stopRow.length == 0 ||
           Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
        byte[] splitStart = startRow.length == 0 ||
          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
            keys.getFirst()[i] : startRow;
        byte[] splitStop = (stopRow.length == 0 ||
          Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
          keys.getSecond()[i].length > 0 ?
            keys.getSecond()[i] : stopRow;
        InputSplit split = new TableSplit(table.getTableName(),
          splitStart, splitStop, regionLocation);
        splits.add(split);
        if (LOG.isDebugEnabled())
          LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
      }
    }
    return splits;
  }


这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:

	public List<InputSplit> getSplits(JobContext context) throws IOException {
		if (table == null) {
			throw new IOException("No table was provided.");
		}
		Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
		if (keys == null || keys.getFirst() == null
				|| keys.getFirst().length == 0) {
			throw new IOException("Expecting at least one region.");
		}
		int count = 0;
		List<InputSplit> splits = new ArrayList<InputSplit>(
				keys.getFirst().length);
		for (int i = 0; i < keys.getFirst().length; i++) {
			if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
				continue;
			}
			String regionLocation = table.getRegionLocation(keys.getFirst()[i],true)
					.getHostname();
			byte[] startRow = scan.getStartRow();
			byte[] stopRow = scan.getStopRow();
			// determine if the given start an stop key fall into the region
			if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes
					.compareTo(startRow, keys.getSecond()[i]) < 0)
					&& (stopRow.length == 0 || Bytes.compareTo(stopRow,
							keys.getFirst()[i]) > 0)) {
				byte[] splitStart = startRow.length == 0
						|| Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
						.getFirst()[i] : startRow;
				byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(
						keys.getSecond()[i], stopRow) <= 0)
						&& keys.getSecond()[i].length > 0 ? keys.getSecond()[i]
						: stopRow;

				Scan scan1 = new Scan();
				scan1.setStartRow(splitStart);
				scan1.setStopRow(splitStop);
				scan1.setFilter(new KeyOnlyFilter());
				scan1.setBatch(500);
				
				ResultScanner resultscanner = table.getScanner(scan1);
				
				//用来保存该region的所有key
				List<String> rows = new ArrayList<String>();
				//Iterator<Result>  it = resultscanner.iterator();
				
				for(Result rs : resultscanner)
				{
					if(rs.isEmpty())
						continue;
					rows.add(new String(rs.getRow()));
				}
				
				int splitSize = rows.size() / mappersPerSplit;
				
				for (int j = 0; j < mappersPerSplit; j++) {
					TableSplit tablesplit = null;
					if (j == mappersPerSplit - 1)
						tablesplit = new TableSplit(table.getTableName(),
								rows.get(j * splitSize).getBytes(),
								rows.get(rows.size() - 1).getBytes(),
								regionLocation);
					else
						tablesplit = new TableSplit(table.getTableName(),
								rows.get(j * splitSize).getBytes(),
								rows.get(j * splitSize + splitSize).getBytes(), regionLocation);
					splits.add(tablesplit);
					if (LOG.isDebugEnabled())
						LOG.debug((new StringBuilder())
								.append("getSplits: split -> ").append(i++)
								.append(" -> ").append(tablesplit).toString());
				}
				resultscanner.close();				
			}
		}
		return splits;
	}


通过配置设置需要拆分的split数。




分享到:
评论
2 楼 BlackWing 2013-03-26  
我的是可以拆分,表的数据只有一个region,多region情况下没有测试。

chenbaohua518 写道
你好!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!

1 楼 chenbaohua518 2013-03-25  
你好!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!

相关推荐

    SpringBoot多数据源配置(方式一:配置多个mapper扫描不同的包路径实现多数据源配置).docx

    - **数据分片:** 对于大型应用,可能需要将数据分布在多个数据库上以提高性能和可用性。 - **异构数据源整合:** 不同的数据源可能存在类型差异,如MySQL、Oracle等,多数据源配置可以方便地进行整合。 #### 三、...

    springboot + mybatis(通用mapper) + druid多数据源

    【标题】"SpringBoot + MyBatis(通用Mapper) + Druid多数据源"是一个常见的Java后端开发架构,用于构建高效、稳定且可扩展的Web应用程序。在这个框架中,SpringBoot简化了Spring应用的初始化和配置,MyBatis作为持久...

    idea工具中直接从mapper.Java文件中跳转到mapper.xml文件的插件,挺不错的

    在Java开发领域,IDEA(IntelliJ IDEA)是一款广泛使用的集成开发环境,以其高效、智能的特性深受程序员喜爱。...同时,了解并合理使用MyBatis-Plus这样的扩展库,也能让MyBatis的使用体验更上一层楼。

    继承Mapper实现的方法,无需编写太多mapper.xml文件,即可获得CRUD功能

    本话题将深入探讨如何通过继承Mapper接口来实现无需大量编写mapper.xml文件,就能轻松获取CRUD(创建、读取、更新、删除)功能的方法。 首先,MyBatis中的Mapper接口是一种设计模式,它允许开发者定义一组方法,...

    fc-mapper4_MAPPER4_fc-mapper4_

    标题中的"fc-mapper4_MAPPER4_fc-mapper4_"似乎是一个标识符,它可能代表一个与FC(Family Computer,即任天堂的红白机)游戏卡带映射器相关的项目或程序。在FC游戏开发中,映射器(Mapper)是关键组件,它的作用是...

    GlobalMapper17补丁

    首先,"GlobalMapper17补丁"是针对GlobalMapper17版本的一个关键更新,它可能包含了对软件的性能优化、错误修复或新功能的添加。补丁的使用通常是为了提升软件的稳定性和用户体验,确保用户能够顺利进行地图处理和...

    FC转mapper教程

    FC的ROM独有的mapper使得制作卡带比较麻烦, 通常mapper4时最常用的mapper, 此教程讲解如何将mapper0, mapper1, mapper2, mapper3,mapper23(VRC2)转换为mapper4

    mapper_src_VirtualNES_mapper_源码

    当VirtualNES在尝试运行某个nes游戏时提示缺少mapperXXX,这意味着该游戏依赖于特定的Mapper功能,而当前的模拟器配置中并未包含这个Mapper模块。 Mapper的种类繁多,例如:NROM、UNROM、CNROM、MMC1、INES-Mapper ...

    GlobalMapper14安装包

    总的来说,GlobalMapper14是一个功能强大的GIS工具,无论你是专业GIS工作者还是业余爱好者,都能从中受益。通过深入学习和实践,你将能够熟练运用它来处理各种地理信息问题,创作出精美的地图作品。

    Mybatis Mapper的使用

    Mybatis Mapper的主要目标是让我们能够仅定义接口,而无需手动编写接口的实现类,这极大地提高了开发效率和代码的可维护性。 在传统的Mybatis使用中,我们通常需要为每个SQL操作创建一个接口方法,并在对应的实现类...

    mapper52.zip

    标题"Mapper52.zip"指的是一个与FC(Family Computer,也称为任天堂红白机)游戏相关的资源包,其中特别涉及Mapper52这种特定的卡带映射器。Mapper是FC游戏卡带中的一个重要组成部分,它负责管理和控制游戏内存、I/O...

    GlobalMapper生成DEM.doc

    《全球映射器(GlobalMapper)生成DEM及数据处理详解》 全球映射器(GlobalMapper)是一款功能强大的地理信息系统(GIS)软件,它能够处理各种地理数据,并生成数字高程模型(Digital Elevation Model,简称DEM)。...

    mybatis自动生成mapper文件

    MBG会为每个表生成一个Mapper接口,包含CRUD(Create、Read、Update、Delete)等基本操作。同时,它还会生成对应的XML映射文件,定义SQL语句和结果映射。这样,开发者只需要在业务逻辑中调用Mapper接口,而无需手动...

    Java的MyBatis框架中Mapper映射配置的使用及原理解析

    Mapper的内置方法是MyBatis提供的一系列方便的CRUD(创建、读取、更新、删除)操作,它们直接映射到SQL语句上: 1. `countByExample`:这个方法用于根据指定的条件查询记录的数量。例如,`UserMapper`中的`...

    GLOBAL MAPPER数据转换

    本文档旨在提供一个详细的指南,指导用户如何使用 Global Mapper 对数据进行裁剪和转换。 数据裁剪是指从原始数据中提取出需要的部分,去掉无关的信息。Global Mapper 提供了多种裁剪方式,例如光栅图像剪裁和矢量...

    工具自动生成mapper(GeneratorSqlmap)

    总的来说,"工具自动生成mapper(GeneratorSqlmap)"是一个高效实用的开发辅助工具,它可以帮助开发者快速构建MyBatis的Mapper层,从而将更多精力集中在业务逻辑上。通过合理使用并对其进行适当的定制,可以极大地...

    Global Mapper软件操作教程.pdf

    Global Mapper能够支持多种数据类型和格式的读取与输出,无论是矢量数据还是栅格数据,都能轻松处理。特别是在LiDAR数据的质量检查方面,其直观的界面和便捷的操作使得工作效率显著提升。软件的编辑和管理功能使得...

    linkage_mapper3.0和对应Circuitscape

    同时,提供的相关说明书能够指导用户有效地使用这两个工具,实现对复杂生态环境的精确建模和分析。在当前全球生态环境变化的背景下,这类工具的应用显得尤为重要,能够帮助我们更好地理解和保护我们的自然遗产。

    FieldMapper用户手册

    FieldMapper主要用于现场数据采集,而FieldMapper Tools则提供了更多的数据处理和管理功能。 - **用户手册内容概览**:用户手册除了介绍系统总体特点外,还详细介绍了FieldMapper的功能,而FieldMapper Tools的相关...

Global site tag (gtag.js) - Google Analytics