写了个简单的mr 操作file到hfile,在把hfile倒入hbase的例子,在此记录一下:
File2HFile2HBase.java代码:
package com.lyq.study.example; import java.io.IOException; import java.security.PrivilegedAction; import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import com.lyq.study.lib.HFileOutputFormatBase; import com.lyq.study.util.HBaseConfigUtils; public class File2HFile2HBase { private static final Log LOG = LogFactory.getLog(File2HFile2HBase.class); private String tableName = "testtable1"; private static class MapperClass extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { public byte[] family = Bytes.toBytes("info"); public String[] columns = { "card", "type", "amount", "time", "many" }; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] columnVals = value.toString().split(","); String rowkey = columnVals[0] + columnVals[3]; Put put = new Put(Bytes.toBytes(rowkey)); for (int i = 0; i < columnVals.length; i++) { put.add(family, Bytes.toBytes(columns[i]), Bytes.toBytes(columnVals[i])); } context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put); } } public int run(final String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); return -1; } LOG.info("Usage command args: " + Arrays.toString(args)); final String hfile = args[1]; UserGroupInformation ugi = UserGroupInformation .createRemoteUser("hadoop"); ugi.doAs(new PrivilegedAction<Void>() { @Override public Void run() { try { // 1.获取Configuration Configuration conf = HBaseConfigUtils.getHBaseConfig(1); conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); // 2.获取Job Job job = getJob(conf, args); // 3.执行Job if (!job.waitForCompletion(true)) { throw new IOException("【初始化数据】失败"); } // 4.把hfile文件导入到HBase LoadIncrementalHFiles loader = new LoadIncrementalHFiles( conf); HTable htable = new HTable(conf, tableName); loader.doBulkLoad(new Path(hfile), htable); // 5.清理hfile目录 deleteByDir(conf, new Path(hfile), true); } catch (IOException e) { // TODO: handle exception } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }); return 0; } private Job getJob(Configuration conf, String[] args) throws IOException { Job job = Job.getInstance(conf); job.setJobName("File2HFile"); job.setJarByClass(File2HFile2HBase.class); job.setMapperClass(MapperClass.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); HTable htable = new HTable(conf, tableName); HFileOutputFormatBase.configureIncrementalLoad(job, htable, HFileOutputFormatBase.class); return job; } public static boolean deleteByDir(Configuration conf, Path path, Boolean recursive) throws IOException { FileSystem fs = FileSystem.get(conf); boolean success = fs.delete(path, recursive); LOG.info("删除[" + path + "]成功? " + success); return success; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://master129:9000/test/input/data.txt", "hdfs://master129:9000/test/output" }; File2HFile2HBase f2hf2hb = new File2HFile2HBase(); System.exit(f2hf2hb.run(args)); } }
HFileOutputFormatBase.java代码:
/** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.lyq.study.lib; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; import org.apache.hadoop.hbase.mapreduce.PutSortReducer; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TextSortReducer; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; /** * Writes HFiles. Passed Cells must arrive in order. Writes current time as the * sequence id for the file. Sets the major compacted attribute on created * hfiles. Calling write(null,null) will forceably roll all HFiles being * written. * <p> * Using this class as part of a MapReduce job is best done using * {@link #configureIncrementalLoad(Job, HTable)}. * * @see KeyValueSortReducer */ @InterfaceAudience.Public @InterfaceStability.Stable public class HFileOutputFormatBase extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {// 把Cell改成了KeyValue static Log LOG = LogFactory.getLog(HFileOutputFormatBase.class); static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; private static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; public static final String _deleteRowkey = "_deleteRowkey";// 新增了该行 public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(// 把cell改成了KeyValue //去掉了static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter() //把其内容直接拿到了getRecordWriter()中 final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context) .getWorkPath(); final Path ignoreOutputPath = getDeleteRowKeyFile(outputPath);// 新增了该行 final Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); // Invented config. Add to hbase-*.xml if other than default // compression. final String defaultCompression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); final boolean compactionExclude = conf.getBoolean( "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); // create a map from column family to the compression algorithm final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf); final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf); final Map<byte[], String> blockSizeMap = createFamilyBlockSizeMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); final HFileDataBlockEncoder encoder; if (dataBlockEncodingStr == null) { encoder = NoOpDataBlockEncoder.INSTANCE; } else { try { encoder = new HFileDataBlockEncoderImpl( DataBlockEncoding.valueOf(dataBlockEncodingStr)); } catch (IllegalArgumentException ex) { throw new RuntimeException( "Invalid data block encoding type configured for the param " + DATABLOCK_ENCODING_CONF_KEY + " : " + dataBlockEncodingStr); } } return new RecordWriter<ImmutableBytesWritable, KeyValue>() {// 把V改成了KeyValue // Map of families to writers and how much has been output on the // writer. private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>( Bytes.BYTES_COMPARATOR); private final FSDataOutputStream dos = fs.create(ignoreOutputPath); private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte[] now = Bytes .toBytes(System.currentTimeMillis()); private boolean rollRequested = false; public void write(ImmutableBytesWritable row, KeyValue kv)// 把V cell改成了KeyValue kv throws IOException { // KeyValue kv = KeyValueUtil.ensureKeyValue(cell);//注释掉了该行 // null input == user explicitly wants to flush if (row == null && kv == null) { rollWriters(); return; } byte[] rowKey = kv.getRow(); long length = kv.getLength(); byte[] family = kv.getFamily(); if (ignore(kv)) {// 新增了该if条件判断 byte[] readBuf = rowKey; dos.write(readBuf, 0, readBuf.length); dos.write(Bytes.toBytes("\n")); return; } WriterLength wl = this.writers.get(family); // If this is a new column family, verify that the directory // exists if (wl == null) { fs.mkdirs(new Path(outputdir, Bytes.toString(family))); } // If any of the HFiles for the column families has reached // maxsize, we need to roll all the writers if (wl != null && wl.written + length >= maxsize) { this.rollRequested = true; } // This can only happen once a row is finished though if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { rollWriters(); } // create a new HLog writer, if necessary if (wl == null || wl.writer == null) { wl = getNewWriter(family, conf); } // we now have the proper HLog writer. full steam ahead kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; // Copy the row so we know when a row transition. this.previousRow = rowKey; } private void rollWriters() throws IOException { for (WriterLength wl : this.writers.values()) { if (wl.writer != null) { LOG.info("Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); close(wl.writer); } wl.writer = null; wl.written = 0; } this.rollRequested = false; } /* * Create a new StoreFile.Writer. * * @param family * * @return A WriterLength, containing a new StoreFile.Writer. * * @throws IOException */ private WriterLength getNewWriter(byte[] family, Configuration conf) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); String compression = compressionMap.get(family); compression = compression == null ? defaultCompression : compression; String bloomTypeStr = bloomTypeMap.get(family); BloomType bloomType = BloomType.NONE; if (bloomTypeStr != null) { bloomType = BloomType.valueOf(bloomTypeStr); } String blockSizeString = blockSizeMap.get(family); int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE : Integer.parseInt(blockSizeString); Configuration tempConf = new Configuration(conf); tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig( tempConf), fs, blockSize) .withOutputDir(familydir) .withCompression( AbstractHFileWriter .compressionByName(compression)) .withBloomType(bloomType) .withComparator(KeyValue.COMPARATOR) .withDataBlockEncoder(encoder) .withChecksumType(HStore.getChecksumType(conf)) .withBytesPerChecksum(HStore.getBytesPerChecksum(conf)) .build(); this.writers.put(family, wl); return wl; } private void close(final StoreFile.Writer w) throws IOException { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes .toBytes(context.getTaskAttemptID().toString())); w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); w.appendFileInfo( StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); w.appendTrackedTimestampsToMetadata(); w.close(); } } public void close(TaskAttemptContext c) throws IOException, InterruptedException { dos.flush();// 新增了该行 dos.close();// 新增了该行 for (WriterLength wl : this.writers.values()) { close(wl.writer); } } }; } /* * Data structure to hold a Writer and amount of data written on it. */ static class WriterLength { long written = 0; StoreFile.Writer writer = null; } /** * Return the start keys of all of the regions in this table, as a list of * ImmutableBytesWritable. */ private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table) throws IOException { byte[][] byteKeys = table.getStartKeys(); ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>( byteKeys.length); for (byte[] byteKey : byteKeys) { ret.add(new ImmutableBytesWritable(byteKey)); } return ret; } /** * Write out a {@link SequenceFile} that can be read by * {@link TotalOrderPartitioner} that contains the split points in * startKeys. */ @SuppressWarnings("deprecation") private static void writePartitions(Configuration conf, Path partitionsPath, List<ImmutableBytesWritable> startKeys) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { throw new IllegalArgumentException("No regions passed"); } // We're generating a list of split points, and we don't ever // have keys < the first region (which has an empty start key) // so we need to remove it. Otherwise we would end up with an // empty reducer with index 0 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<ImmutableBytesWritable>( startKeys); ImmutableBytesWritable first = sorted.first(); if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { throw new IllegalArgumentException( "First region of table should have empty start key. Instead has: " + Bytes.toStringBinary(first.get())); } sorted.remove(first); // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); try { for (ImmutableBytesWritable startKey : sorted) { writer.append(startKey, NullWritable.get()); } } finally { writer.close(); } } /** * Configure a MapReduce Job to perform an incremental load into the given * table. This * <ul> * <li>Inspects the table to configure a total order partitioner</li> * <li>Uploads the partitions file to the cluster and adds it to the * DistributedCache</li> * <li>Sets the number of reduce tasks to match the current number of * regions</li> * <li>Sets the output key/value class to match HFileOutputFormat2's * requirements</li> * <li>Sets the reducer up to perform the appropriate sorting (either * KeyValueSortReducer or PutSortReducer)</li> * </ul> * The user should be sure to set the map output value class to either * KeyValue or Put before running this function. */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { configureIncrementalLoad(job, table, HFileOutputFormatBase.class); } public static void configureIncrementalLoad(Job job, HTable table, Class<? extends OutputFormat<?, ?>> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormatBase.class); // Based on the configured map output class, set the correct reducer to // properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(TextSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); configurePartitioner(job, startKeys); // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); configureBlockSize(table, conf); // TableMapReduceUtil.addDependencyJars(job);// 注释掉了该行 TableMapReduceUtil.initCredentials(job); LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); } private static void configureBlockSize(HTable table, Configuration conf) throws IOException { StringBuilder blockSizeConfigValue = new StringBuilder(); HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; } Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); int i = 0; for (HColumnDescriptor familyDescriptor : families) { if (i++ > 0) { blockSizeConfigValue.append('&'); } blockSizeConfigValue.append(URLEncoder.encode( familyDescriptor.getNameAsString(), "UTF-8")); blockSizeConfigValue.append('='); blockSizeConfigValue.append(URLEncoder.encode( String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); } // Get rid of the last ampersand conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString()); } /** * Run inside the task to deserialize column family to compression algorithm * map from the configuration. * * Package-private for unit tests only. * * @return a map from column family to the name of the configured * compression algorithm */ static Map<byte[], String> createFamilyCompressionMap(Configuration conf) { return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY); } private static Map<byte[], String> createFamilyBloomMap(Configuration conf) { return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); } private static Map<byte[], String> createFamilyBlockSizeMap( Configuration conf) { return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); } /** * Run inside the task to deserialize column family to given conf value map. * * @param conf * @param confName * @return a map of column family to the given configuration value */ private static Map<byte[], String> createFamilyConfValueMap( Configuration conf, String confName) { Map<byte[], String> confValMap = new TreeMap<byte[], String>( Bytes.BYTES_COMPARATOR); String confVal = conf.get(confName, ""); for (String familyConf : confVal.split("&")) { String[] familySplit = familyConf.split("="); if (familySplit.length != 2) { continue; } try { confValMap .put(URLDecoder.decode(familySplit[0], "UTF-8") .getBytes(), URLDecoder.decode(familySplit[1], "UTF-8")); } catch (UnsupportedEncodingException e) { // will not happen with UTF-8 encoding throw new AssertionError(e); } } return confValMap; } /** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning * against <code>splitPoints</code>. Cleans up the partitions file after job * exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { // create the partitions file FileSystem fs = FileSystem.get(job.getConfiguration()); Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); fs.deleteOnExit(partitionsPath); writePartitions(job.getConfiguration(), partitionsPath, splitPoints); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); } /** * Serialize column family to compression algorithm map to configuration. * Invoked while configuring the MR job for incremental load. * * Package-private for unit tests only. * * @throws IOException * on failure to read column family descriptors */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") static void configureCompression(HTable table, Configuration conf) throws IOException { StringBuilder compressionConfigValue = new StringBuilder(); HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; } Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); int i = 0; for (HColumnDescriptor familyDescriptor : families) { if (i++ > 0) { compressionConfigValue.append('&'); } compressionConfigValue.append(URLEncoder.encode( familyDescriptor.getNameAsString(), "UTF-8")); compressionConfigValue.append('='); compressionConfigValue.append(URLEncoder.encode(familyDescriptor .getCompression().getName(), "UTF-8")); } // Get rid of the last ampersand conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); } /** * Serialize column family to bloom type map to configuration. Invoked while * configuring the MR job for incremental load. * * @throws IOException * on failure to read column family descriptors */ static void configureBloomType(HTable table, Configuration conf) throws IOException { HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; } StringBuilder bloomTypeConfigValue = new StringBuilder(); Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); int i = 0; for (HColumnDescriptor familyDescriptor : families) { if (i++ > 0) { bloomTypeConfigValue.append('&'); } bloomTypeConfigValue.append(URLEncoder.encode( familyDescriptor.getNameAsString(), "UTF-8")); bloomTypeConfigValue.append('='); String bloomType = familyDescriptor.getBloomFilterType().toString(); if (bloomType == null) { bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; } bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); } conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString()); } // 新增了ignore() @SuppressWarnings("deprecation") public boolean ignore(KeyValue kv) { boolean ignore = Bytes.toString(kv.getValue()).indexOf("Del") >= 0; return ignore; } // 新增了getDeleteRowKeyPath() public static Path getDeleteRowKeyPath(Path outputPath) { return new Path(outputPath + HFileOutputFormatBase._deleteRowkey); } // 新增了getDeleteRowKeyFile() public static Path getDeleteRowKeyFile(Path outputPath) { return new Path(getDeleteRowKeyPath(outputPath) + "/" + UUID.randomUUID().toString()); } }
data.txt文件的内容如下:
6222020405006,typeA,100000,201408081225,2000 6222020405006,typeA,100000,201408112351,1000 6222020405006,typeA,100000,201408140739,4000 6222020405008,typeB,50000,201408150932,5000 6222020405009,typeC,30000,201408181212,10000
在这里解说一下:HFileOutputFormatBase.java是重写了hbase-server-0.96.2-hadoop2.jar里面的HFileOutputFormat2.java文件,在File2HFile2HBase.java 里方法 getJob()里return前一行:
HFileOutputFormatBase.configureIncrementalLoad(job, htable, HFileOutputFormatBase.class);
刚开始写了HFileOutputFormat.configureIncrementalLoad(job, htable);但是老是报如下错误:
2014-08-26 22:31:47,183 INFO [main] example.File2HFile2HBase (File2HFile2HBase.java:run(61)) - Usage command args: [hdfs://master129:9000/test/input/data.txt, hdfs://master129:9000/test/output] SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/app/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/app/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2014-08-26 22:31:51,256 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT 2014-08-26 22:31:51,258 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=LiuYQ-PC 2014-08-26 22:31:51,258 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.6.0_45 2014-08-26 22:31:51,258 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Sun Microsystems Inc. 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=C:\Program Files\Java\jdk1.6.0_45\jre 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.class.path=E:\workspace\.metadata\.plugins\org.apache.hadoop.eclipse\hadoop-conf-2965312243669964400;E:\workspace\hadoop_study\bin;D:\app\lib\activation-1.1.jar;D:\app\lib\annotations-api.jar;D:\app\lib\aopalliance-1.0.jar;D:\app\lib\asm-3.1.jar;D:\app\lib\asm-3.2.jar;D:\app\lib\avro-1.7.4.jar;D:\app\lib\catalina.jar;D:\app\lib\catalina-ant.jar;D:\app\lib\catalina-ha.jar;D:\app\lib\catalina-tribes.jar;D:\app\lib\commons-beanutils-1.7.0.jar;D:\app\lib\commons-beanutils-core-1.8.0.jar;D:\app\lib\commons-cli-1.2.jar;D:\app\lib\commons-codec-1.4.jar;D:\app\lib\commons-codec-1.7.jar;D:\app\lib\commons-collections-3.2.1.jar;D:\app\lib\commons-compress-1.4.1.jar;D:\app\lib\commons-configuration-1.6.jar;D:\app\lib\commons-daemon-1.0.13.jar;D:\app\lib\commons-digester-1.8.jar;D:\app\lib\commons-el-1.0.jar;D:\app\lib\commons-httpclient-3.1.jar;D:\app\lib\commons-io-2.1.jar;D:\app\lib\commons-io-2.4.jar;D:\app\lib\commons-lang-2.5.jar;D:\app\lib\commons-lang-2.6.jar;D:\app\lib\commons-logging-1.1.1.jar;D:\app\lib\commons-math-2.1.jar;D:\app\lib\commons-net-3.1.jar;D:\app\lib\ecj-3.7.2.jar;D:\app\lib\el-api.jar;D:\app\lib\findbugs-annotations-1.3.9-1.jar;D:\app\lib\gmbal-api-only-3.0.0-b023.jar;D:\app\lib\grizzly-framework-2.1.2.jar;D:\app\lib\grizzly-http-2.1.2.jar;D:\app\lib\grizzly-http-server-2.1.2.jar;D:\app\lib\grizzly-http-servlet-2.1.2.jar;D:\app\lib\grizzly-rcm-2.1.2.jar;D:\app\lib\guava-11.0.2.jar;D:\app\lib\guava-12.0.1.jar;D:\app\lib\guice-3.0.jar;D:\app\lib\guice-servlet-3.0.jar;D:\app\lib\hadoop-annotations-2.2.0.jar;D:\app\lib\hadoop-archives-2.2.0.jar;D:\app\lib\hadoop-auth-2.2.0.jar;D:\app\lib\hadoop-client-2.2.0.jar;D:\app\lib\hadoop-common-2.2.0.jar;D:\app\lib\hadoop-common-2.2.0-tests.jar;D:\app\lib\hadoop-datajoin-2.2.0.jar;D:\app\lib\hadoop-distcp-2.2.0.jar;D:\app\lib\hadoop-extras-2.2.0.jar;D:\app\lib\hadoop-gridmix-2.2.0.jar;D:\app\lib\hadoop-hdfs-2.2.0.jar;D:\app\lib\hadoop-hdfs-2.2.0-tests.jar;D:\app\lib\hadoop-hdfs-nfs-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-app-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-common-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-core-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-hs-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-hs-plugins-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-jobclient-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-jobclient-2.2.0-tests.jar;D:\app\lib\hadoop-mapreduce-client-shuffle-2.2.0.jar;D:\app\lib\hadoop-mapreduce-examples-2.2.0.jar;D:\app\lib\hadoop-nfs-2.2.0.jar;D:\app\lib\hadoop-rumen-2.2.0.jar;D:\app\lib\hadoop-streaming-2.2.0.jar;D:\app\lib\hadoop-yarn-api-2.2.0.jar;D:\app\lib\hadoop-yarn-applications-distributedshell-2.2.0.jar;D:\app\lib\hadoop-yarn-applications-unmanaged-am-launcher-2.2.0.jar;D:\app\lib\hadoop-yarn-client-2.2.0.jar;D:\app\lib\hadoop-yarn-common-2.2.0.jar;D:\app\lib\hadoop-yarn-server-common-2.2.0.jar;D:\app\lib\hadoop-yarn-server-nodemanager-2.2.0.jar;D:\app\lib\hadoop-yarn-server-resourcemanager-2.2.0.jar;D:\app\lib\hadoop-yarn-server-tests-2.2.0.jar;D:\app\lib\hadoop-yarn-server-tests-2.2.0-tests.jar;D:\app\lib\hadoop-yarn-server-web-proxy-2.2.0.jar;D:\app\lib\hadoop-yarn-site-2.2.0.jar;D:\app\lib\hamcrest-core-1.1.jar;D:\app\lib\hamcrest-core-1.3.jar;D:\app\lib\hbase-client-0.96.2-hadoop2.jar;D:\app\lib\hbase-common-0.96.2-hadoop2.jar;D:\app\lib\hbase-common-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-examples-0.96.2-hadoop2.jar;D:\app\lib\hbase-hadoop2-compat-0.96.2-hadoop2.jar;D:\app\lib\hbase-hadoop-compat-0.96.2-hadoop2.jar;D:\app\lib\hbase-it-0.96.2-hadoop2.jar;D:\app\lib\hbase-it-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-prefix-tree-0.96.2-hadoop2.jar;D:\app\lib\hbase-protocol-0.96.2-hadoop2.jar;D:\app\lib\hbase-server-0.96.2-hadoop2.jar;D:\app\lib\hbase-server-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-shell-0.96.2-hadoop2.jar;D:\app\lib\hbase-testing-util-0.96.2-hadoop2.jar;D:\app\lib\hbase-thrift-0.96.2-hadoop2.jar;D:\app\lib\hsqldb-2.0.0.jar;D:\app\lib\htrace-core-2.04.jar;D:\app\lib\httpclient-4.1.3.jar;D:\app\lib\httpcore-4.1.3.jar;D:\app\lib\jackson-core-asl-1.8.8.jar;D:\app\lib\jackson-jaxrs-1.8.8.jar;D:\app\lib\jackson-mapper-asl-1.8.8.jar;D:\app\lib\jackson-xc-1.8.8.jar;D:\app\lib\jamon-runtime-2.3.1.jar;D:\app\lib\jasper.jar;D:\app\lib\jasper-compiler-5.5.23.jar;D:\app\lib\jasper-el.jar;D:\app\lib\jasper-runtime-5.5.23.jar;D:\app\lib\javax.inject-1.jar;D:\app\lib\javax.servlet-3.1.jar;D:\app\lib\javax.servlet-api-3.0.1.jar;D:\app\lib\jaxb-api-2.2.2.jar;D:\app\lib\jaxb-impl-2.2.3-1.jar;D:\app\lib\jersey-client-1.9.jar;D:\app\lib\jersey-core-1.8.jar;D:\app\lib\jersey-core-1.9.jar;D:\app\lib\jersey-grizzly2-1.9.jar;D:\app\lib\jersey-guice-1.9.jar;D:\app\lib\jersey-json-1.8.jar;D:\app\lib\jersey-json-1.9.jar;D:\app\lib\jersey-server-1.8.jar;D:\app\lib\jersey-server-1.9.jar;D:\app\lib\jersey-test-framework-core-1.9.jar;D:\app\lib\jersey-test-framework-grizzly2-1.9.jar;D:\app\lib\jets3t-0.6.1.jar;D:\app\lib\jettison-1.1.jar;D:\app\lib\jettison-1.3.1.jar;D:\app\lib\jetty-6.1.26.jar;D:\app\lib\jetty-sslengine-6.1.26.jar;D:\app\lib\jetty-util-6.1.26.jar;D:\app\lib\jruby-complete-1.6.8.jar;D:\app\lib\jsch-0.1.42.jar;D:\app\lib\jsp-2.1-6.1.14.jar;D:\app\lib\jsp-api.jar;D:\app\lib\jsp-api-2.1.jar;D:\app\lib\jsp-api-2.1-6.1.14.jar;D:\app\lib\jsr305-1.3.9.jar;D:\app\lib\junit-4.8.2.jar;D:\app\lib\junit-4.10.jar;D:\app\lib\junit-4.11.jar;D:\app\lib\libthrift-0.9.0.jar;D:\app\lib\log4j-1.2.17.jar;D:\app\lib\management-api-3.0.0-b012.jar;D:\app\lib\metrics-core-2.1.2.jar;D:\app\lib\mockito-all-1.8.5.jar;D:\app\lib\netty-3.6.2.Final.jar;D:\app\lib\netty-3.6.6.Final.jar;D:\app\lib\paranamer-2.3.jar;D:\app\lib\protobuf-java-2.5.0.jar;D:\app\lib\servlet-api.jar;D:\app\lib\servlet-api-2.5.jar;D:\app\lib\servlet-api-2.5-6.1.14.jar;D:\app\lib\slf4j-api-1.6.4.jar;D:\app\lib\slf4j-api-1.7.5.jar;D:\app\lib\slf4j-log4j12-1.6.4.jar;D:\app\lib\slf4j-log4j12-1.7.5.jar;D:\app\lib\snappy-java-1.0.4.1.jar;D:\app\lib\stax-api-1.0.1.jar;D:\app\lib\tomcat-coyote.jar;D:\app\lib\tomcat-dbcp.jar;D:\app\lib\tomcat-i18n-es.jar;D:\app\lib\tomcat-i18n-fr.jar;D:\app\lib\tomcat-i18n-ja.jar;D:\app\lib\xmlenc-0.52.jar;D:\app\lib\xz-1.0.jar;D:\app\lib\zookeeper-3.4.5.jar 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.library.path=C:\Program Files\Java\jdk1.6.0_45\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/bin/client;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/bin;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/lib/i386;D:\app\hadoop-2.2.0\bin;C:\Program Files (x86)\Java\jdk1.6.0_35\bin;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;D:\app\Oracle11g\product\11.1.0\db_1\bin;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;D:\Program Files\TortoiseSVN\bin;D:\Program Files\MySQL\MySQL Server 5.5\bin;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x64;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;D:\Program Files\SSH Communications Security\SSH Secure Shell;.;;D:\app\eclipse;;. 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.io.tmpdir=C:\Users\LiuYQ\AppData\Local\Temp\ 2014-08-26 22:31:51,261 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.compiler=<NA> 2014-08-26 22:31:51,263 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.name=Windows 7 2014-08-26 22:31:51,263 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.arch=amd64 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.version=6.1 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.name=LiuYQ 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.home=C:\Users\LiuYQ 2014-08-26 22:31:51,264 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.dir=E:\workspace\hadoop_study 2014-08-26 22:31:51,272 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=slave130:2181,master129:2181,slave132:2181,slave131:2181 sessionTimeout=180000 watcher=hconnection-0x76b20352, quorum=slave130:2181,master129:2181,slave132:2181,slave131:2181, baseZNode=/hbase 2014-08-26 22:31:51,389 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(120)) - Process identifier=hconnection-0x76b20352 connecting to ZooKeeper ensemble=slave130:2181,master129:2181,slave132:2181,slave131:2181 2014-08-26 22:31:51,404 INFO [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(966)) - Opening socket connection to server master129/192.168.24.129:2181. Will not attempt to authenticate using SASL (无法定位登录配置) 2014-08-26 22:31:51,419 INFO [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(849)) - Socket connection established to master129/192.168.24.129:2181, initiating session 2014-08-26 22:31:51,464 INFO [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1207)) - Session establishment complete on server master129/192.168.24.129:2181, sessionid = 0x48129e3b750006, negotiated timeout = 150000 2014-08-26 22:31:53,018 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(366)) - Looking up current regions for table testtable1 2014-08-26 22:31:53,046 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(368)) - Configuring 1 reduce partitions to match current region count 2014-08-26 22:31:53,164 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:writePartitions(287)) - Writing partition information to /tmp/partitions_8f008fe0-9170-48ac-940a-83c2813f1378 2014-08-26 22:31:53,356 WARN [main] zlib.ZlibFactory (ZlibFactory.java:<clinit>(50)) - Failed to load/initialize native-zlib library 2014-08-26 22:31:53,359 INFO [main] compress.CodecPool (CodecPool.java:getCompressor(150)) - Got brand-new compressor [.deflate] 2014-08-26 22:31:53,904 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:53,964 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,006 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,059 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,110 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available 2014-08-26 22:31:54,310 INFO [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(380)) - Incremental table testtable1 output configured. 2014-08-26 22:31:54,362 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - session.id is deprecated. Instead, use dfs.metrics.session-id 2014-08-26 22:31:54,365 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId= 2014-08-26 22:31:54,998 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(149)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 2014-08-26 22:31:55,083 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(439)) - Cleaning up the staging area file:/tmp/hadoop-LiuYQ/mapred/staging/hadoop5469487/.staging/job_local5469487_0001 java.lang.IllegalArgumentException: Pathname /D:/app/lib/hadoop-mapreduce-client-core-2.2.0.jar from hdfs://master129:9000/D:/app/lib/hadoop-mapreduce-client-core-2.2.0.jar is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:184) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93) at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57) at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:264) at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:300) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:387) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1286) at com.lyq.study.example.File2HFile2HBase$1.run(File2HFile2HBase.java:76) at com.lyq.study.example.File2HFile2HBase$1.run(File2HFile2HBase.java:1) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:337) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471) at com.lyq.study.example.File2HFile2HBase.run(File2HFile2HBase.java:65) at com.lyq.study.example.File2HFile2HBase.main(File2HFile2HBase.java:135)
最终重写了HFileOutputFormat2 为HFileOutputFormatBase,使用HFileOutputFormatBase,至此成功运行。
附件是
File2HFile2HBase.ava
HFileOutputFormatBase.java
data.txt
和源码HFileOutputFormat2.java
相关推荐
HBase,全称为Hadoop Distributed File System Base,是构建在Hadoop文件系统(HDFS)之上、面向大数据的开源分布式数据库。它以Google Bigtable为设计蓝本,实现了高效、可伸缩、高可用的存储系统。HBase支持实时...
脆弱水印技术在图像篡改检测中的应用与挑战,脆弱水印技术在图像篡改检测中的应用与挑战,脆弱水印的图像篡改检测 ,脆弱水印; 图像篡改; 检测; 图像处理,基于脆弱水印的图像篡改检测技术
高效Delta机械臂运动控制卡:前瞻轨迹规划,G代码编程,多维插补,激光切割与绘图,机器视觉集成,扩展坐标与旋转功能,一键脱机运行,大容量存储,基于前瞻运动轨迹规划的Delta机械臂运动控制卡:高效G代码编程,圆弧插补与激光切割功能,配合机器视觉实现精准操作。高效精准操作与管理工具的创新型机械运动控制解决方案。,delta机械臂,delta机器人,运动控制器,运动控制卡 本卡采用前瞻运动轨迹规划,运动采用G代码指令编程,具有G5三维空间的圆弧插补,空间直线插补功能,子程序编程功能,逻辑判断语句功能,示教编程功能(支持手柄),变量位置编程功能,动态PWM激光输出功能(兼容舵机控制信号),动态频率脉冲输出功能,通用输入输出功能。 可极简单的实现绘图雕刻,3维激光切割功能。 轨迹图形可xy平面整体旋转功能。 可利用变量位置,获取外部坐标要求,可轻松配合机器视觉。 支持探针功能,测平面,测外形等。 可设置4组平移工件坐标系,2组参考原点。 新增2组空间旋转工件坐标系,支持任意图形直接空间旋转。 卡上一键脱机RAM区运行功能。 2M程序容量。 断电后位置记忆,变量坐标位置记忆,计数器记忆。 伺服
毕业设计
内容概要:随着模型参数量不断扩大,如从BERT到GPT-3,传统微调方法变得不可行。文章聚焦于参数高效微调(PEFT)策略,系统探讨了几十余种方法,包括加法型、选择型、重构型及其混合方法。文中详细介绍各类PEFT的具体操作(如引入额外参数、冻结部分权重等),并通过广泛实验验证其在大型预训练模型上的适用性和性能。特别指出,PEFT在保持高性能的同时极大减少了计算与内存成本,并针对十几亿乃至几十亿参数级别的模型展开测试与讨论。 适用人群:适用于从事大规模机器学习模型研究、开发的应用科学家和技术专家,尤其是那些希望通过减少资源消耗实现高效微调的技术团队成员。 使用场景及目标:该文章适用于希望在有限资源条件下优化大模型性能的人群。帮助研究人员理解不同类型PEFT的优点和局限,为实际项目中选择合适技术路线提供建议。其目的是为了指导开发者正确理解和应用先进的PEFT技术,从而提高系统的运行效率和服务质量。 其他说明:本文不仅提供了详尽的方法介绍和性能对比,而且为未来的研究指明方向,鼓励创新思维的发展,旨在推动参数有效调优领域的进步。同时提醒注意现有的挑战和未解决问题。
磷酸铁锂体系电池COMSOL模型构建解析与实践指南,磷酸铁锂体系电池COMSOL建模分析与优化方案探讨,出一个磷酸铁锂体系电池comsol模型 ,建立磷酸铁锂体系电池; comsol模型; 电池模拟; 模型构建; 锂离子电池。,构建磷酸铁锂体系电池Comsol模型,深入探索电池性能
开关磁阻电机多维控制策略仿真研究(基于Matlab 2016b的精细化模型),开关磁阻电机多策略控制仿真模型(matlab 2016b版本,含传统与智能控制策略及离线迭代算法),开关磁阻电机控制仿真(matlab 2016b版本仿真模型 自用) 模型包涵: 开关磁阻电机传统控制:电流斩波控制、电压PWM控制、角度位置控制。 智能控制:12 8三相开关磁阻电机有限元分析本体建模、转矩分配函数控制、模糊PID控制、模糊角度控制、神经网络在线自适应迭代控制。 部分离线迭代算法:遗传算法优化PID、粒子群算法优化PID。 biye研究生自用仿真模型 . ,核心关键词: 开关磁阻电机; 控制仿真; Matlab 2016b; 传统控制; 智能控制; 有限元分析; 转矩分配函数控制; 模糊PID控制; 神经网络在线自适应迭代控制; 遗传算法优化PID; 粒子群算法优化PID; 研究生自用仿真模型。,基于Matlab 2016b的开关磁阻电机控制模型研究与仿真优化研究生自用版
McgsPro_IoT驱动_V3.1.1.8
数学建模相关主题资源2
基于改进粒子群算法的光伏储能选址定容模型分析——针对14节点配网系统的实践与出力情况探索,基于改进粒子群算法的光伏储能选址定容模型分析与出力预测研究(含配图材料参考),含光伏的储能选址定容模型 14节点 程序采用改进粒子群算法,对分析14节点配网系统中的储能选址定容方案,并得到储能的出力情况,有相关参考资料 ,核心关键词:含光伏的储能选址定容模型;14节点;改进粒子群算法;配网系统;储能选址定容方案;出力情况;参考资料。,基于改进粒子群算法的14节点配网光伏储能选址定容模型及出力分析研究
基于需求响应与阶梯式碳交易的综合能源系统优化调度模型研究(MATLAB仿真实现),基于需求响应与碳交易的综合能源系统优化调度策略:灵活调配冷热电负荷,实现低碳高效运行。,考虑需求响应和碳交易的综合能源系统日前优化调度模型 关键词:柔性负荷 需求响应 综合能源系统 参考:私我 仿真平台:MATLAB yalmip+cplex 主要内容:在冷热电综合能源系统的基础上,创新性的对用户侧资源进行了细致的划分和研究,首先按照能源类型将其分为热负荷需求响应和电负荷需求响应,在此基础上,进一步分为可削减负荷、可转移负荷以及可平移负荷三类,并将柔性负荷作为需求响应资源加入到综合能源的调度系统中,从而依据市场电价灵活调整各类负荷,实现削峰填谷,改善负荷曲线等优势,此外,为了丰富内容,还考虑了阶梯式碳交易,构建了考虑阶梯式碳交易以及综合需求响应的综合能源低碳经济调度模型,设置了多个对比场景,验证所提模型的有效性,从而体现工作量,是不可多得的代码 场景一: 这段程序主要是用来进行某微网的运行优化。它包含了多个功能和应用,涉及到了能源集线器、需求侧柔性负荷、光伏、风机、燃气轮机等内容。 首先,程序读取了
multisim
内容概要:本文详细介绍了一系列用于科学研究、工程项目和技术开发中至关重要的实验程序编写与文档报告撰写的资源和工具。从代码托管平台(GitHub/GitLab/Kaggle/CodeOcean)到云端计算环境(Colab),以及多种类型的编辑器(LaTeX/Microsoft Word/Overleaf/Typora),还有涵盖整个研究周期的各种辅助工具:如可视化工具(Tableau)、数据分析平台(R/Pandas)、项目管理工具(Trello/Jira)、数据管理和伦理审核支持(Figshare/IRB等),最后提供了典型报告的具体结构指导及其范本实例链接(arXiv/PubMed)。这为实验流程中的各个环节提供了系统的解决方案,极大地提高了工作的效率。 适合人群:高校学生、科研工作者、工程技术人员以及从事学术写作的人员,无论是新手入门还是有一定经验的人士都能从中受益。 使用场景及目标:帮助读者高效地准备并开展实验研究活动;促进团队间协作交流;规范研究报告的形式;提高对所收集资料的安全性和隐私保护意识;确保遵循国际公认的伦理准则进行实验。
基于OpenCV与深度学习的人脸表情识别系统:Python编程,实时检测与视频加载的PyQt界面应用,基于OpenCV与深度学习的人脸表情识别系统:Python编程,PyQt界面,实时视频与图片检测.exe可执行文件,基于OpenCV的人脸表情识别系统 相关技术:python,opencv,pyqt,深度学习 (请自行安装向日葵远程软件,以便提供远程帮助) 可编译为.exe文件。 软件说明:摄像头实时检测,加载照片,视频均可。 有基础的同学,可自行修改完善。 第一张和第二张为运行截图。 ,人脸表情识别; Op
基于双端口直流微电网系统模型的改进下垂控制及稳定性分析(含电压鲁棒控制器与粒子群寻优权函数),基于双端口直流微电网系统模型的优化设计与分析:改进下垂控制、电压鲁棒控制器及仿真研究,直流微网,直流微电网系统模型,有两个端口。 外环有改进下垂控制,内环双pi环,带恒功率负载。 暂态性能良好,可用于控制器设计,稳定性分析等。 另外还有电压鲁棒控制器,小信号模型,根轨迹分析,粒子群寻优权函数等内容。 仅为simulink ,直流微网; 直流微电网系统模型; 改进下垂控制; 双pi环; 恒功率负载; 暂态性能; 控制器设计; 稳定性分析; 电压鲁棒控制器; 小信号模型; 根轨迹分析; 粒子群寻优权函数,基于改进下垂控制的直流微网系统模型:双PI环与恒功率负载研究
这是萨达萨达是发生发士大夫
Labview下的通用OCR识别技术:高效文本识别与图像处理解决方案,Labview下的通用OCR识别技术:提高文字识别效率与准确度,labview.通用OCR识别技术 ,核心关键词:LabVIEW; 通用OCR识别技术; 识别技术; OCR技术; 图像识别; 文字识别。,LabVIEW平台下的通用OCR识别技术