- 浏览: 77078 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
hitliya:
请问"将Mahout导入Eclipse后,在maho ...
Mahout-0.5运行时异常处理 -
JerryLead:
hongst 写道刚好遇到一样的错误,可以通过添加环境变量的方 ...
Mahout-0.5运行时异常处理 -
hongst:
刚好遇到一样的错误,可以通过添加环境变量的方法解决:expor ...
Mahout-0.5运行时异常处理
package org.apache.hadoop.io; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; public final class WritableUtils { /** * 压缩数据流 -> 解压后bytes * 数据流向: DataInput -> buffer (byte[]) -> ByteArrayInputStream -> GZIPInputStream * -> outbuf (byte[], decompressed) -> ByteOutputStream (memory) * -> memoryToBytesArray * 因为解压后的bytes大小未知,因此利用了ByteOutputStream自带的缓冲区来保存解压后的bytes */ public static byte[] readCompressedByteArray(DataInput in) throws IOException { int length = in.readInt(); if (length == -1) return null; byte[] buffer = new byte[length]; in.readFully(buffer); // could/should use readFully(buffer,0,length)? GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length)); byte[] outbuf = new byte[length]; ByteArrayOutputStream bos = new ByteArrayOutputStream(); int len; while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){ bos.write(outbuf, 0, len); } byte[] decompressed = bos.toByteArray(); bos.close(); gzi.close(); return decompressed; } public static void skipCompressedByteArray(DataInput in) throws IOException { int length = in.readInt(); if (length != -1) { skipFully(in, length); } } /** * bytes -> 压缩后输出到DataOutput * 数据流向: bytes -> GZIPOutputStream -> ByteArrayOutputStream (memory) -> buffer (memoryToBytesArray) * -> DataOutput (先写入压缩字节数,再写入buffer) */ public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException { if (bytes != null) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzout = new GZIPOutputStream(bos); gzout.write(bytes, 0, bytes.length); gzout.close(); byte[] buffer = bos.toByteArray(); int len = buffer.length; out.writeInt(len); out.write(buffer, 0, len); /* debug only! Once we have confidence, can lose this. */ return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0); } else { out.writeInt(-1); return -1; } } /* * 直接将从DataInput in里的输入数据流解压缩后,以UTF-8形式解析到String中 */ /* Ugly utility, maybe someone else can do this better */ public static String readCompressedString(DataInput in) throws IOException { byte[] bytes = readCompressedByteArray(in); if (bytes == null) return null; return new String(bytes, "UTF-8"); } /* * 先将String s以UTF-8形式变成bytes,然后压缩写入DataOutput */ public static int writeCompressedString(DataOutput out, String s) throws IOException { return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null); } /* * * Write a String as a Network Int n, followed by n Bytes * Alternative to 16 bit read/writeUTF. * Encoding standard is... ? * */ public static void writeString(DataOutput out, String s) throws IOException { if (s != null) { byte[] buffer = s.getBytes("UTF-8"); int len = buffer.length; out.writeInt(len); out.write(buffer, 0, len); } else { out.writeInt(-1); } } /* * Read a String as a Network Int n, followed by n Bytes * Alternative to 16 bit read/writeUTF. * Encoding standard is... ? * */ public static String readString(DataInput in) throws IOException{ int length = in.readInt(); if (length == -1) return null; byte[] buffer = new byte[length]; in.readFully(buffer); // could/should use readFully(buffer,0,length)? return new String(buffer,"UTF-8"); } /* * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. * Could be generalised using introspection. * */ public static void writeStringArray(DataOutput out, String[] s) throws IOException{ out.writeInt(s.length); for(int i = 0; i < s.length; i++) { writeString(out, s[i]); } } /* * Write a String array as a Nework Int N, followed by Int N Byte Array of * compressed Strings. Handles also null arrays and null values. * Could be generalised using introspection. * */ public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{ if (s == null) { out.writeInt(-1); return; } out.writeInt(s.length); for(int i = 0; i < s.length; i++) { writeCompressedString(out, s[i]); } } /* * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. * Could be generalised using introspection. Actually this bit couldn't... * */ public static String[] readStringArray(DataInput in) throws IOException { int len = in.readInt(); if (len == -1) return null; String[] s = new String[len]; for(int i = 0; i < len; i++) { s[i] = readString(in); } return s; } /* * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. * Could be generalised using introspection. Handles null arrays and null values. * */ public static String[] readCompressedStringArray(DataInput in) throws IOException { int len = in.readInt(); if (len == -1) return null; String[] s = new String[len]; for(int i = 0; i < len; i++) { s[i] = readCompressedString(in); } return s; } /* * * Test Utility Method Display Byte Array. * */ public static void displayByteArray(byte[] record){ int i; for(i=0;i < record.length -1; i++){ if (i % 16 == 0) { System.out.println(); } System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); System.out.print(Integer.toHexString(record[i] & 0x0F)); System.out.print(","); } System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); System.out.print(Integer.toHexString(record[i] & 0x0F)); System.out.println(); } /** * Make a copy of a writable object using serialization to a buffer. * @param orig The object to copy * @return The copied object */ public static <T extends Writable> T clone(T orig, Configuration conf) { try { @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T> T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf); ReflectionUtils.copy(conf, orig, newInst); return newInst; } catch (IOException e) { throw new RuntimeException("Error writing/reading clone buffer", e); } } /** * Make a copy of the writable object using serialiation to a buffer * @param dst the object to copy from * @param src the object to copy into, which is destroyed * @throws IOException * @deprecated use ReflectionUtils.cloneInto instead. */ @Deprecated public static void cloneInto(Writable dst, Writable src) throws IOException { ReflectionUtils.cloneWritableInto(dst, src); } /** * Serializes an integer to a binary stream with zero-compressed encoding. * For -120 <= i <= 127, only one byte is used with the actual value. * For other values of i, the first byte value indicates whether the * integer is positive or negative, and the number of bytes that follow. * If the first byte value v is between -121 and -124, the following integer * is positive, with number of bytes that follow are -(v+120). * If the first byte value v is between -125 and -128, the following integer * is negative, with number of bytes that follow are -(v+124). Bytes are * stored in the high-non-zero-byte-first order. * * @param stream Binary output stream * @param i Integer to be serialized * @throws java.io.IOException */ public static void writeVInt(DataOutput stream, int i) throws IOException { writeVLong(stream, i); } /** * Serializes a long to a binary stream with zero-compressed encoding. * For -112 <= i <= 127, only one byte is used with the actual value. * For other values of i, the first byte value indicates whether the * long is positive or negative, and the number of bytes that follow. * If the first byte value v is between -113 and -120, the following long * is positive, with number of bytes that follow are -(v+112). * If the first byte value v is between -121 and -128, the following long * is negative, with number of bytes that follow are -(v+120). Bytes are * stored in the high-non-zero-byte-first order. * * @param stream Binary output stream * @param i Long to be serialized * @throws java.io.IOException */ /* * 将一个long类型的i,写入输出流DataOutput中 * 如果 -112 <= i <= 127,只使用一个byte表示i并写入输出流中 * 第一个字节表示i的正负和接下来表示i的字节数 * 如果第一个字节-113 <= v <= -120,那么i是正数,并且接下来i占的字节数是-(v+112)(也就是1到8个字节之间) * 如果第一个字节-121 <= v <= -128,那么i是负数,并且接下来的i占的字节数是-(v+120)(也就是1到8个字节之间) * 写入时先写i的高位,再写低位 * */ public static void writeVLong(DataOutput stream, long i) throws IOException { if (i >= -112 && i <= 127) { stream.writeByte((byte)i); return; } int len = -112; if (i < 0) { i ^= -1L; // take one's complement' len = -120; } long tmp = i; while (tmp != 0) { tmp = tmp >> 8; len--; } stream.writeByte((byte)len); len = (len < -120) ? -(len + 120) : -(len + 112); for (int idx = len; idx != 0; idx--) { int shiftbits = (idx - 1) * 8; long mask = 0xFFL << shiftbits; stream.writeByte((byte)((i & mask) >> shiftbits)); } } /** * Reads a zero-compressed encoded long from input stream and returns it. * @param stream Binary input stream * @throws java.io.IOException * @return deserialized long from stream. */ public static long readVLong(DataInput stream) throws IOException { byte firstByte = stream.readByte(); int len = decodeVIntSize(firstByte); if (len == 1) { return firstByte; } long i = 0; for (int idx = 0; idx < len-1; idx++) { byte b = stream.readByte(); i = i << 8; i = i | (b & 0xFF); } return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); } /** * Reads a zero-compressed encoded integer from input stream and returns it. * @param stream Binary input stream * @throws java.io.IOException * @return deserialized integer from stream. */ public static int readVInt(DataInput stream) throws IOException { return (int) readVLong(stream); } /** * Given the first byte of a vint/vlong, determine the sign * @param value the first byte * @return is the value negative */ public static boolean isNegativeVInt(byte value) { return value < -120 || (value >= -112 && value < 0); } /** * Parse the first byte of a vint/vlong to determine the number of bytes * @param value the first byte of the vint/vlong * @return the total number of bytes (1 to 9) */ public static int decodeVIntSize(byte value) { if (value >= -112) { return 1; } else if (value < -120) { return -119 - value; } return -111 - value; } /** * Get the encoded length if an integer is stored in a variable-length format * @return the encoded length */ public static int getVIntSize(long i) { if (i >= -112 && i <= 127) { return 1; } if (i < 0) { i ^= -1L; // take one's complement' } // find the number of bytes with non-leading zeros int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i); // find the number of data bytes + length byte return (dataBits + 7) / 8 + 1; } /** * Read an Enum value from DataInput, Enums are read and written * using String values. * @param <T> Enum type * @param in DataInput to read from * @param enumType Class type of Enum * @return Enum represented by String read from DataInput * @throws IOException */ public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType) throws IOException{ return T.valueOf(enumType, Text.readString(in)); } /** * writes String value of enum to DataOutput. * @param out Dataoutput stream * @param enumVal enum value * @throws IOException */ public static void writeEnum(DataOutput out, Enum<?> enumVal) throws IOException{ Text.writeString(out, enumVal.name()); } /** * Skip <i>len</i> number of bytes in input stream<i>in</i> * @param in input stream * @param len number of bytes to skip * @throws IOException when skipped less number of bytes */ public static void skipFully(DataInput in, int len) throws IOException { int total = 0; int cur = 0; while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) { total += cur; } if (total<len) { throw new IOException("Not able to skip " + len + " bytes, possibly " + "due to end of input."); } } /** Convert writables to a byte array */ public static byte[] toByteArray(Writable... writables) { final DataOutputBuffer out = new DataOutputBuffer(); try { for(Writable w : writables) { w.write(out); } out.close(); } catch (IOException e) { throw new RuntimeException("Fail to convert writables to a byte array",e); } return out.getData(); } }
相关推荐
必须将此jar包放在org.apache.hadoop.io包下,否则无法正常覆盖使用
包org.apache.hadoop.mapreduce的Hadoop源代码分析
Java-org.apache.hadoop是Apache Hadoop项目的核心组件,它在分布式计算领域扮演着至关重要的角色。Hadoop是由Apache软件基金会开发的一个开源框架,主要用于处理和存储大量数据。它设计的初衷是为了支持数据密集型...
org.apache.hadoop.io.serializer.avro org.apache.hadoop.jmx org.apache.hadoop.lib.lang org.apache.hadoop.lib.server org.apache.hadoop.lib.service org.apache.hadoop.lib.service.hadoop org.apache....
Hive错误之 Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask错误分析_xiaohu21的博客-CSDN博客.mht
org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V 解决方案:下载本资源解压将hadoop.dll和winutils.exe文件复制到hadoop2.7.3的bin目录下即可解决。
看清楚版本,想要其他版本的可以私聊我,版本经测试,可以用,请大家放心下载使用
NULL 博文链接:https://ouyida3.iteye.com/blog/1144326
hadoop支持LZO压缩配置 将...org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, ...
ERROR org.apache.hadoop.mapred.TaskTracker: Can not start task tracker because java.io.IOException: Failed to set permissions of path: \tmp\hadoop-admin \mapred\local\ttprivate to 0700 at org.apache...
Maven坐标:org.apache.hadoop:hadoop-mapreduce-client-common:2.6.5; 标签:apache、mapreduce、common、client、hadoop、jar包、java、API文档、中英对照版; 使用方法:解压翻译后的API文档,用浏览器打开...
IDEA中通过Java的API操作MapReducer报错org.apache.hadoop.io.nativeio.NativeIO$Windows...的解决办法(进来看一下)-附件资源
Exception in thread "main" java.lang.UnsatisfiedLinkError:''boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)' * at org.apache.hadoop.io.nativeio.NativeIO$...
Exception in thread main org.apache.hadoop.security.AccessControlException: Permission denied: user=L.MOON, access=WRITE, inode=/user/lsy/result1/_temporary/0:lsy:supergroup:drwxr-xr-x Caused by: org....
UnsatisfiedLinkError: org. apache . hadoop. io. nativeio. NativeIOSWindows . access0 (Ijava/ lang/String;I)Z 原因:在调用NativeIO中的access0()时调不到 解决方法:①第一步:在项目工程中...
解决方案:Exceptionin thread "main" java.lang.UnsatisfiedLinkError:org.apache.hadoop.util.NativeCrc32.nativeCo
4. **数据存储与序列化**:Hadoop使用`org.apache.hadoop.io`包处理数据的存储和序列化,包括各种基本类型(如IntWritable、Text等)和复杂的可序列化对象(如SequenceFile、Avro等)。 5. **配置管理**:`org....
7. **测试运行**:编译完成后,可以通过运行Hadoop的一些基本命令来测试安装是否成功,例如启动本地模式的HDFS或运行简单的MapReduce程序。 8. **注意事项**:Windows上运行Hadoop可能遇到权限问题,需要正确配置...
Maven坐标:org.apache.hadoop:hadoop-mapreduce-client-core:2.5.1; 标签:core、apache、mapreduce、client、hadoop、jar包、java、API文档、中文版; 使用方法:解压翻译后的API文档,用浏览器打开“index.html...