例一:
public class MergePcProfileMapper extends Mapper<LongWritable, Text, Text, Text> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (value.toString().indexOf(DeviceConstant.TAB) > 0) { String[] splits = value.toString().split(DeviceConstant.TAB); MobileProfileUtil.writeProfile(splits[0], splits[1], context); } else { MobileProfileUtil.writePhoneRelation(value, context); } } }
测试程序为
public class MergePcProfileMapperTest { private MapDriver<LongWritable, Text, Text, Text> mapDriver; @Test public void testMapTextPro1() { mapDriver.withInput(new LongWritable(1l), new Text("211636519815012420166")); mapDriver.withOutput(new Text("116365198"), new Text("215012420166")); mapDriver.runTest(); } @Test public void testMapTextPro2() { mapDriver.withInput(new LongWritable(1l), new Text("111636519815012420166")); mapDriver.withOutput(new Text("116365198"), new Text("115012420166")); mapDriver.runTest(); } @Test public void testMapTextPro3() { mapDriver.withInput(new LongWritable(1l), new Text("315012420166116365198")); mapDriver.withOutput(new Text("116365198"), new Text("315012420166")); mapDriver.runTest(); } @Test public void testMapResultPro() throws IOException { // MockHTable table = MockHTable.with(new String[][] { // { "110", "cat:123", "{\"categoryId\":123,\"categoryLevel\":2,\"categoryWeight\":0.818,\"userAttriProfiles\":[{\"attributePref\":0.9585,\"attributeType\":0,\"attributeValue\":9223372036854775806,\"attributeWeight\":0.9585,\"items\":[{\"av\":0.9585,\"id\":931079}]}]}" }}); // Get get = new Get("110".getBytes()); // table.get(get); mapDriver.withInput(new LongWritable(1l), new Text("user_profile_mobile110 {\"categoryId\":123,\"categoryLevel\":2,\"categoryWeight\":0.818,\"userAttriProfiles\":[{\"attributePref\":0.9585,\"attributeType\":0,\"attributeValue\":9223372036854775806,\"attributeWeight\":0.9585,\"items\":[{\"av\":0.9585,\"id\":931079}]}]}")); mapDriver.withOutput(new Text("110"), new Text("user_profile_mobile{\"categoryId\":123,\"categoryLevel\":2,\"categoryWeight\":0.818,\"userAttriProfiles\":[{\"attributePref\":0.9585,\"attributeType\":0,\"attributeValue\":9223372036854775806,\"attributeWeight\":0.9585,\"itemValue\":0,\"items\":[{\"av\":0.9585,\"id\":931079}]}]}")); mapDriver.runTest(); } @Before public void setUp() { MergePcProfileMapper lsMapper = new MergePcProfileMapper(); mapDriver = new MapDriver<LongWritable, Text, Text, Text>(); mapDriver.setMapper(lsMapper); } }
例二
在大多情况下需要模拟从hbase读取的数据
首先需要mock htable
package com.yhd.processor.mobile.util; /** * This file is licensed to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.NoSuchElementException; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowLock; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.util.Bytes; /** * Mock implementation of HTableInterface. Holds any supplied data in a * multi-dimensional NavigableMap which acts as a in-memory database. Useful for * testing classes that operate on data using an HTableInterface. * <p> * Instances should be get using <code>MockHTable.create()</code>. So while a * DAO with a saving operation like * * <pre> * public class MyDAO { * private HTableInterface table; * * public MyDAO(HTableInterface table) { * this.table = table; * } * * public void saveData(byte[] id, byte[] data) throws IOException{ * Put put = new Put(id) * put.add(family, qualifier, data); * table.put(put); * } * } * </pre> * <p> * is used in production like * * <pre> * MyDAO(new HTable(conf, tableName)).saveData(id, data); * </pre> * <p> * can be tested like * * <pre> * @Test * public void testSave() { * MockHTable table = MockHTable.create(); * MyDAO(table).saveData(id, data); * Get get = new Get(id); * Result result = table.get(get); * assertArrayEquals(data, result.getValue(family, qualifier)); * } * </pre> * <p> * MockHTable instances can also be initialized with pre-loaded data using one * of the String[][] or Map<String, Map<String, String>> data formats. While * String[][] parameter lets directly loading data from source code, Map can be * generated from a YAML document, using a parser. * * <pre> * // String[][] * MockHTable table = MockHTable.with(new String[][] { * { "<rowid>", "<column>", "<value>" }, * { "id", "family:qualifier1", "data1" }, * { "id", "family:qualifier2", "data2" } * }); * // YAML * String database = "id:\n family:qualifier1: data1\n family:qualifier2: data2\n"; * MockHTable table = MockHTable.with((Map<String, Map<String, String>) new Yaml().load(database)); * </pre> * <p> * If value is not supposed to be a String, but an int, double or anything, * <code>MockHTable.toEString()</code> can be used to turn it into a String. * * <p> * In order to simplify assertions for tests that should put anything into * database, MockHTable.read() works with two parameters (id and column) and * returns anything written to that row/column. So, previous test can be reduced * to * * <pre> * @Test * public void testSave() { * MockHTable table = MockHTable.create(); * MyDAO(table).saveData(id, data); * assertArrayEquals(data, table.read(id, "family:qualifier")); * } * </pre> * <p> * * @author erdem * */ public class MockHTable implements HTableInterface { /** * This is all the data for a MockHTable instance */ private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>>( Bytes.BYTES_COMPARATOR); /** * Helper method to convert some data into a list of KeyValue's * * @param row * row value of the KeyValue's * @param rowdata * data to decode * @param maxVersions * number of versions to return * @return List of KeyValue's */ private static List<KeyValue> toKeyValue( byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); } /** * Helper method to convert some data into a list of KeyValue's with * timestamp constraint * * @param row * row value of the KeyValue's * @param rowdata * data to decode * @param timestampStart * start of the timestamp constraint * @param timestampEnd * end of the timestamp constraint * @param maxVersions * number of versions to return * @return List of KeyValue's */ private static List<KeyValue> toKeyValue( byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { List<KeyValue> ret = new ArrayList<KeyValue>(); for (byte[] family : rowdata.keySet()) for (byte[] qualifier : rowdata.get(family).keySet()) { int versionsAdded = 0; for (Entry<Long, byte[]> tsToVal : rowdata.get(family) .get(qualifier).descendingMap().entrySet()) { if (versionsAdded++ == maxVersions) break; Long timestamp = tsToVal.getKey(); if (timestamp < timestampStart) continue; if (timestamp > timestampEnd) continue; byte[] value = tsToVal.getValue(); ret.add(new KeyValue(row, family, qualifier, timestamp, value)); } } return ret; } /** * Clients should not rely on table names so this returns null. * * @return null */ @Override public byte[] getTableName() { return null; } /** * No configuration needed to work so this returns null. * * @return null */ @Override public Configuration getConfiguration() { return null; } /** * No table descriptor needed so this returns null. * * @return null */ @Override public HTableDescriptor getTableDescriptor() { return null; } @Override public boolean exists(Get get) throws IOException { if (get.getFamilyMap() == null || get.getFamilyMap().size() == 0) { return data.containsKey(get.getRow()); } else { byte[] row = get.getRow(); if (!data.containsKey(row)) { return false; } for (byte[] family : get.getFamilyMap().keySet()) { if (!data.get(row).containsKey(family)) { return false; } else { for (byte[] qualifier : get.getFamilyMap().get(family)) { if (!data.get(row).get(family).containsKey(qualifier)) { return false; } } } } return true; } } @Override public Result get(Get get) throws IOException { if (!data.containsKey(get.getRow())) return new Result(); byte[] row = get.getRow(); List<KeyValue> kvs = new ArrayList<KeyValue>(); if (!get.hasFamilies()) { kvs = toKeyValue(row, data.get(row), get.getMaxVersions()); } else { for (byte[] family : get.getFamilyMap().keySet()) { if (data.get(row).get(family) == null) continue; NavigableSet<byte[]> qualifiers = get.getFamilyMap() .get(family); if (qualifiers == null || qualifiers.isEmpty()) qualifiers = data.get(row).get(family).navigableKeySet(); for (byte[] qualifier : qualifiers) { if (qualifier == null) qualifier = "".getBytes(); if (!data.get(row).containsKey(family) || !data.get(row).get(family) .containsKey(qualifier) || data.get(row).get(family).get(qualifier) .isEmpty()) continue; Entry<Long, byte[]> timestampAndValue = data.get(row) .get(family).get(qualifier).lastEntry(); kvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), timestampAndValue .getValue())); } } } Filter filter = get.getFilter(); if (filter != null) { filter.reset(); List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size()); for (KeyValue kv : kvs) { if (filter.filterAllRemaining()) { break; } if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) { continue; } if (filter.filterKeyValue(kv) == ReturnCode.INCLUDE) { nkvs.add(kv); } // ignoring next key hint which is a optimization to reduce file // system IO } if (filter.hasFilterRow()) { filter.filterRow(nkvs); } kvs = nkvs; } return new Result(kvs); } @Override public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { // FIXME: implement return null; } @Override public ResultScanner getScanner(Scan scan) throws IOException { final List<Result> ret = new ArrayList<Result>(); byte[] st = scan.getStartRow(); byte[] sp = scan.getStopRow(); Filter filter = scan.getFilter(); for (byte[] row : data.keySet()) { // if row is equal to startRow emit it. When startRow (inclusive) // and // stopRow (exclusive) is the same, it should not be excluded which // would // happen w/o this control. if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) != 0) { // if row is before startRow do not emit, pass to next row if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) > 0) continue; // if row is equal to stopRow or after it do not emit, stop // iteration if (sp != null && sp.length > 0 && Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0) break; } List<KeyValue> kvs = null; if (!scan.hasFamilies()) { kvs = toKeyValue(row, data.get(row), scan.getTimeRange() .getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions()); } else { kvs = new ArrayList<KeyValue>(); for (byte[] family : scan.getFamilyMap().keySet()) { if (data.get(row).get(family) == null) continue; NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get( family); if (qualifiers == null || qualifiers.isEmpty()) qualifiers = data.get(row).get(family) .navigableKeySet(); for (byte[] qualifier : qualifiers) { if (data.get(row).get(family).get(qualifier) == null) continue; for (Long timestamp : data.get(row).get(family) .get(qualifier).descendingKeySet()) { if (timestamp < scan.getTimeRange().getMin()) continue; if (timestamp > scan.getTimeRange().getMax()) continue; byte[] value = data.get(row).get(family) .get(qualifier).get(timestamp); kvs.add(new KeyValue(row, family, qualifier, timestamp, value)); if (kvs.size() == scan.getMaxVersions()) { break; } } } } } if (filter != null) { filter.reset(); List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size()); for (KeyValue kv : kvs) { if (filter.filterAllRemaining()) { break; } if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) { continue; } ReturnCode filterResult = filter.filterKeyValue(kv); if (filterResult == ReturnCode.INCLUDE) { nkvs.add(kv); } else if (filterResult == ReturnCode.NEXT_ROW) { break; } // ignoring next key hint which is a optimization to reduce // file system IO } if (filter.hasFilterRow()) { filter.filterRow(nkvs); } kvs = nkvs; } if (!kvs.isEmpty()) { ret.add(new Result(kvs)); } } return new ResultScanner() { private final Iterator<Result> iterator = ret.iterator(); public Iterator<Result> iterator() { return iterator; } public Result[] next(int nbRows) throws IOException { ArrayList<Result> resultSets = new ArrayList<Result>(nbRows); for (int i = 0; i < nbRows; i++) { Result next = next(); if (next != null) { resultSets.add(next); } else { break; } } return resultSets.toArray(new Result[resultSets.size()]); } public Result next() throws IOException { try { return iterator().next(); } catch (NoSuchElementException e) { return null; } } public void close() { } }; } @Override public ResultScanner getScanner(byte[] family) throws IOException { Scan scan = new Scan(); scan.addFamily(family); return getScanner(scan); } @Override public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { Scan scan = new Scan(); scan.addColumn(family, qualifier); return getScanner(scan); } @Override public void put(Put put) throws IOException { byte[] row = put.getRow(); NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind( data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>( Bytes.BYTES_COMPARATOR)); for (byte[] family : put.getFamilyMap().keySet()) { NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind( rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>( Bytes.BYTES_COMPARATOR)); for (KeyValue kv : put.getFamilyMap().get(family)) { kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis())); byte[] qualifier = kv.getQualifier(); NavigableMap<Long, byte[]> qualifierData = forceFind( familyData, qualifier, new TreeMap<Long, byte[]>()); qualifierData.put(kv.getTimestamp(), kv.getValue()); } } } /** * Helper method to find a key in a map. If key is not found, newObject is * added to map and returned * * @param map * map to extract value from * @param key * key to look for * @param newObject * set key to this if not found * @return found value or newObject if not found */ private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) { V data = map.get(key); if (data == null) { data = newObject; map.put(key, data); } return data; } @Override public void put(List<Put> puts) throws IOException { for (Put put : puts) put(put); } /** * Checks if the value with given details exists in database, or is * non-existent in the case of value being null * * @param row * row * @param family * family * @param qualifier * qualifier * @param value * value * @return true if value is not null and exists in db, or value is null and * not exists in db, false otherwise */ private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) { if (value == null || value.length == 0) return !data.containsKey(row) || !data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier); else return data.containsKey(row) && data.get(row).containsKey(family) && data.get(row).get(family).containsKey(qualifier) && !data.get(row).get(family).get(qualifier).isEmpty() && Arrays.equals(data.get(row).get(family).get(qualifier) .lastEntry().getValue(), value); } @Override public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { if (check(row, family, qualifier, value)) { put(put); return true; } return false; } @Override public void delete(Delete delete) throws IOException { byte[] row = delete.getRow(); if (data.get(row) == null) return; if (delete.getFamilyMap().size() == 0) { data.remove(row); return; } for (byte[] family : delete.getFamilyMap().keySet()) { if (data.get(row).get(family) == null) continue; if (delete.getFamilyMap().get(family).isEmpty()) { data.get(row).remove(family); continue; } for (KeyValue kv : delete.getFamilyMap().get(family)) { data.get(row).get(kv.getFamily()).remove(kv.getQualifier()); } if (data.get(row).get(family).isEmpty()) { data.get(row).remove(family); } } if (data.get(row).isEmpty()) { data.remove(row); } } @Override public void delete(List<Delete> deletes) throws IOException { for (Delete delete : deletes) delete(delete); } @Override public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException { if (check(row, family, qualifier, value)) { delete(delete); return true; } return false; } @Override public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { return incrementColumnValue(row, family, qualifier, amount, true); } @Override public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException { if (check(row, family, qualifier, null)) { Put put = new Put(row); put.add(family, qualifier, Bytes.toBytes(amount)); put(put); return amount; } long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier) .lastEntry().getValue()) + amount; data.get(row).get(family).get(qualifier) .put(System.currentTimeMillis(), Bytes.toBytes(newValue)); return newValue; } @Override public boolean isAutoFlush() { return true; } @Override public void flushCommits() throws IOException { } @Override public void close() throws IOException { } @Override public RowLock lockRow(byte[] row) throws IOException { return null; } @Override public void unlockRow(RowLock rl) throws IOException { } public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { List<Result> results = new ArrayList<Result>(); for (Row r : actions) { if (r instanceof Delete) { delete((Delete) r); continue; } if (r instanceof Put) { put((Put) r); continue; } if (r instanceof Get) { results.add(get((Get) r)); } } return results.toArray(); } @Override public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { results = batch(actions); } @Override public Result[] get(List<Get> gets) throws IOException { List<Result> results = new ArrayList<Result>(); for (Get g : gets) { results.add(get(g)); } return results.toArray(new Result[results.size()]); } @Override public Result increment(Increment increment) throws IOException { List<KeyValue> kvs = new ArrayList<KeyValue>(); Map<byte[], NavigableMap<byte[], Long>> famToVal = increment .getFamilyMap(); for (Entry<byte[], NavigableMap<byte[], Long>> ef : famToVal.entrySet()) { byte[] family = ef.getKey(); NavigableMap<byte[], Long> qToVal = ef.getValue(); for (Entry<byte[], Long> eq : qToVal.entrySet()) { incrementColumnValue(increment.getRow(), family, eq.getKey(), eq.getValue()); kvs.add(new KeyValue(increment.getRow(), family, eq.getKey(), Bytes.toBytes(eq.getValue()))); } } return new Result(kvs); } private MockHTable() { } /** * Default way of constructing a MockHTable * * @return a new MockHTable */ public static MockHTable create() { return new MockHTable(); } /** * Create a MockHTable with some pre-loaded data. Parameter should be a map * of column-to-data mappings of rows. It can be created with a YAML like * * <pre> * rowid: * family1:qualifier1: value1 * family2:qualifier2: value2 * </pre> * * @param dump * pre-loaded data * @return a new MockHTable loaded with given data */ public static MockHTable with(Map<String, Map<String, String>> dump) { MockHTable ret = new MockHTable(); for (String row : dump.keySet()) { for (String column : dump.get(row).keySet()) { String val = dump.get(row).get(column); put(ret, row, column, val); } } return ret; } /** * Helper method of pre-loaders, adds parameters to data. * * @param ret * data to load into * @param row * rowid * @param column * family:qualifier encoded value * @param val * value */ private static void put(MockHTable ret, String row, String column, String val) { String[] fq = split(column); byte[] family = Bytes.toBytesBinary(fq[0]); byte[] qualifier = Bytes.toBytesBinary(fq[1]); NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> families = ret .forceFind( ret.data, Bytes.toBytesBinary(row), new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>( Bytes.BYTES_COMPARATOR)); NavigableMap<byte[], NavigableMap<Long, byte[]>> qualifiers = ret .forceFind(families, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>( Bytes.BYTES_COMPARATOR)); NavigableMap<Long, byte[]> values = ret.forceFind(qualifiers, qualifier, new TreeMap<Long, byte[]>()); values.put(System.currentTimeMillis(), Bytes.toBytesBinary(val)); } /** * Create a MockHTable with some pre-loaded data. Parameter should be an * array of string arrays which define every column value individually. * * <pre> * new String[][] { * { "<rowid>", "<column>", "<value>" }, * { "id", "family:qualifier1", "data1" }, * { "id", "family:qualifier2", "data2" } * }); * </pre> * * @param dump * @return */ public static MockHTable with(String[][] dump) { MockHTable ret = new MockHTable(); for (String[] row : dump) { put(ret, row[0], row[1], row[2]); } return ret; } /** * Column identification helper * * @param column * column name in the format family:qualifier * @return <code>{"family", "qualifier"}</code> */ private static String[] split(String column) { return new String[] { column.substring(0, column.indexOf(':')), column.substring(column.indexOf(':') + 1) }; } /** * Read a value saved in the object. Useful for making assertions in tests. * * @param rowid * rowid of the data to read * @param column * family:qualifier of the data to read * @return value or null if row or column of the row does not exist */ public byte[] read(String rowid, String column) { NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> row = data .get(Bytes.toBytesBinary(rowid)); if (row == null) return null; String[] fq = split(column); byte[] family = Bytes.toBytesBinary(fq[0]); byte[] qualifier = Bytes.toBytesBinary(fq[1]); if (!row.containsKey(family)) return null; if (!row.get(family).containsKey(qualifier)) return null; return row.get(family).get(qualifier).lastEntry().getValue(); } public static String toEString(boolean val) { return Bytes.toStringBinary(Bytes.toBytes(val)); } public static String toEString(double val) { return Bytes.toStringBinary(Bytes.toBytes(val)); } public static String toEString(float val) { return Bytes.toStringBinary(Bytes.toBytes(val)); } public static String toEString(int val) { return Bytes.toStringBinary(Bytes.toBytes(val)); } public static String toEString(long val) { return Bytes.toStringBinary(Bytes.toBytes(val)); } public static String toEString(short val) { return Bytes.toStringBinary(Bytes.toBytes(val)); } @Override public Result append(Append arg0) throws IOException { // TODO Auto-generated method stub return null; } @Override public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec( Class<T> arg0, byte[] arg1, byte[] arg2, Call<T, R> arg3) throws IOException, Throwable { // TODO Auto-generated method stub return null; } @Override public <T extends CoprocessorProtocol, R> void coprocessorExec( Class<T> arg0, byte[] arg1, byte[] arg2, Call<T, R> arg3, Callback<R> arg4) throws IOException, Throwable { // TODO Auto-generated method stub } @Override public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> arg0, byte[] arg1) { // TODO Auto-generated method stub return null; } @Override public void mutateRow(RowMutations arg0) throws IOException { // TODO Auto-generated method stub } }
具体测试代码为
@Test public void testMapResultPro() throws IOException { // MockHTable table = MockHTable.with(new String[][] { // { "110", "cat:123", "{\"categoryId\":123,\"categoryLevel\":2,\"categoryWeight\":0.818,\"userAttriProfiles\":[{\"attributePref\":0.9585,\"attributeType\":0,\"attributeValue\":9223372036854775806,\"attributeWeight\":0.9585,\"items\":[{\"av\":0.9585,\"id\":931079}]}]}" }}); // Get get = new Get("110".getBytes()); // table.get(get); mapDriver.withInput(new LongWritable(1l), new Text("user_profile_mobile110 {\"categoryId\":123,\"categoryLevel\":2,\"categoryWeight\":0.818,\"userAttriProfiles\":[{\"attributePref\":0.9585,\"attributeType\":0,\"attributeValue\":9223372036854775806,\"attributeWeight\":0.9585,\"items\":[{\"av\":0.9585,\"id\":931079}]}]}")); mapDriver.withOutput(new Text("110"), new Text("user_profile_mobile{\"categoryId\":123,\"categoryLevel\":2,\"categoryWeight\":0.818,\"userAttriProfiles\":[{\"attributePref\":0.9585,\"attributeType\":0,\"attributeValue\":9223372036854775806,\"attributeWeight\":0.9585,\"itemValue\":0,\"items\":[{\"av\":0.9585,\"id\":931079}]}]}")); mapDriver.runTest(); }
相关推荐
在大数据处理领域,Hadoop MapReduce是一种广泛应用的分布式计算框架,它使得在大规模数据集上进行并行计算成为可能。本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document ...
【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...
【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...
《Hadoop MapReduce实战手册》是一本专注于大数据处理技术的专著,主要针对Apache Hadoop中的MapReduce框架进行了深入的探讨。MapReduce是Hadoop生态系统中的核心组件之一,用于处理和生成大规模数据集。该书旨在...
《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...
### Hadoop MapReduce V2 知识点概览 #### 一、Hadoop MapReduce V2 生态系统介绍 **Hadoop MapReduce V2** 是Hadoop生态系统中的一个关键组件,用于处理大规模数据集。相较于V1版本,V2版本在架构上进行了重大...
在大数据处理领域,Python、Hadoop MapReduce是两个非常重要的工具。本文将深入探讨如何使用Python来编写Hadoop MapReduce程序,以实现微博关注者之间的相似用户分析。这个任务的关键在于理解并应用分布式计算原理,...
在大数据处理领域,Apriori算法与Hadoop MapReduce的结合是实现大规模数据挖掘的关键技术之一。Apriori算法是一种经典的关联规则学习算法,用于发现数据集中频繁出现的项集,进而挖掘出有趣的关联规则。而Hadoop ...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...
Hadoop MapReduce 教程概述 Hadoop MapReduce 是 Hadoop 生态系统中的一部分,它是一种可靠的、可扩展的并行处理框架,用于处理大规模数据集。MapReduce 是一种编程模型,它将计算任务分解为两个阶段:Map 阶段和 ...
从给定的文件信息来看,文档标题为"Hadoop MapReduce教程.pdf",描述与标题相同,标签为"Hadoop Map Reduce",部分内容虽然无法完全解析,但可以推断出与Hadoop MapReduce的基本概念、操作流程以及相关的编程模型...
[Packt Publishing] Hadoop MapReduce 经典实例 (英文版) [Packt Publishing] Hadoop MapReduce Cookbook (E-Book) ☆ 出版信息:☆ [作者信息] Srinath Perera, Thilina Gunarathne [出版机构] Packt ...
AQI空气质量分析-基于Hadoop MapReduce实现源代码+分析实验报告(高分完整项目),含有代码注释,新手也可看懂,个人手打98分项目,导师非常认可的高分项目,毕业设计、期末大作业和课程设计高分必看,下载下来,...
《Hadoop MapReduce v2 Cookbook》是一本针对大数据处理领域的重要参考书籍,专注于介绍Hadoop MapReduce的最新版本——v2(也称为YARN,Yet Another Resource Negotiator)。Hadoop MapReduce是Apache Hadoop框架的...
在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架,它允许高效地处理海量数据。KMeans 是一种常见的无监督机器学习算法,用于聚类分析,将数据集中的对象按照相似性分组成不同的簇。现在我们来...