Hadoop的序列化机制特征:
- 紧凑:带宽是hadoop集群中最稀缺的资源,一个紧凑的序列化机制可以充分利用带宽。
- 快速:mapreduce会大量的使用序列化机制。因此,要尽可能减少序列化开销。
- 可扩张:序列化机制需要可定制
- 互操作:可以支持不同开发语言间的通信。
java本身的序列化,将要序列化的类,类签名、类的所有非暂态和非静态成员的值,以及所有的父类都要写入,导致序列化的对象过于充实。可能比原来扩大了几十上百倍。
由上面的条件,hadoop自定义了序列化机制,引入org.apache.hadoop.io.Writable
/** * A serializable object which implements a simple, efficient, serialization * protocol, based on {@link DataInput} and {@link DataOutput}. * * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce * framework implements this interface.</p> * * <p>Implementations typically implement a static <code>read(DataInput)</code> * method which constructs a new instance, calls {@link #readFields(DataInput)} * and returns the instance.</p> * * <p>Example:</p> * <p><blockquote><pre> * public class MyWritable implements Writable { * // Some data * private int counter; * private long timestamp; * * public void write(DataOutput out) throws IOException { * out.writeInt(counter); * out.writeLong(timestamp); * } * * public void readFields(DataInput in) throws IOException { * counter = in.readInt(); * timestamp = in.readLong(); * } * * public static MyWritable read(DataInput in) throws IOException { * MyWritable w = new MyWritable(); * w.readFields(in); * return w; * } * } * </pre></blockquote></p> */ public interface Writable { /** * Serialize the fields of this object to <code>out</code>. * 输出(序列化)对象到流中 * @param out <code>DataOuput</code> to serialize this object into. * @throws IOException */ void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object from <code>in</code>. * 从流中读取(反序列化)对象,为了效率请尽可能服用现有的对象 * <p>For efficiency, implementations should attempt to re-use storage in the * existing object where possible.</p> * * @param in <code>DataInput</code> to deseriablize this object from. * @throws IOException */ void readFields(DataInput in) throws IOException; }
hadoop序列化机制还包括几个重要的接口org.apache.hadoop.io.WritableComparable,org.apache.hadoop.io.WritableComparator,RawComparator
org.apache.hadoop.io.WritableComparable:继承java.lang.Comparable,来提供类型比较。
org.apache.hadoop.io.RawComparator:继承java.util.Comparator,允许去比较未被序列化为对象的记录,省去了创建对象的所有开销。
java.util.Comparator和的java.lang.Comparable区别可见 http://lavasoft.blog.51cto.com/62575/68380
每个java基本类型都对应Writable封装。
ObjectWritable可应用在需要序列化不同类型的对象到某一个字段,也可用在hadoop远程过程调用中参数的序列化和反序列化。
ObjectWritable的实现:
有三个成员变量,包括被封装的对象实例instance、该对象运行时类的Class对象和Configuration对象。
ObjectWritable的write方法调用的是静态方法ObjectWritable .writeObject()该方法可以往DataOutput接口中写入各种java对象。
/** Write a {@link Writable}, {@link String}, primitive type, or an array of * the preceding. */ public static void writeObject(DataOutput out, Object instance, Class declaredClass, Configuration conf) throws IOException { if (instance == null) { // null 空 instance = new NullInstance(declaredClass, conf); declaredClass = Writable.class; } UTF8.writeString(out, declaredClass.getName()); // always write declared if (declaredClass.isArray()) { // array 数组 int length = Array.getLength(instance); out.writeInt(length); for (int i = 0; i < length; i++) { writeObject(out, Array.get(instance, i), declaredClass.getComponentType(), conf); } } else if (declaredClass == String.class) { // String 字符串 UTF8.writeString(out, (String)instance); } else if (declaredClass.isPrimitive()) { // primitive type 基本类型 if (declaredClass == Boolean.TYPE) { // boolean out.writeBoolean(((Boolean)instance).booleanValue()); } else if (declaredClass == Character.TYPE) { // char out.writeChar(((Character)instance).charValue()); } else if (declaredClass == Byte.TYPE) { // byte out.writeByte(((Byte)instance).byteValue()); } else if (declaredClass == Short.TYPE) { // short out.writeShort(((Short)instance).shortValue()); } else if (declaredClass == Integer.TYPE) { // int out.writeInt(((Integer)instance).intValue()); } else if (declaredClass == Long.TYPE) { // long out.writeLong(((Long)instance).longValue()); } else if (declaredClass == Float.TYPE) { // float out.writeFloat(((Float)instance).floatValue()); } else if (declaredClass == Double.TYPE) { // double out.writeDouble(((Double)instance).doubleValue()); } else if (declaredClass == Void.TYPE) { // void } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } } else if (declaredClass.isEnum()) { // enum 枚举 UTF8.writeString(out, ((Enum)instance).name()); } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable Writable的子类 UTF8.writeString(out, instance.getClass().getName()); ((Writable)instance).write(out); } else { throw new IOException("Can't write: "+instance+" as "+declaredClass); } }
和输出对应的调用的是org.apache.hadoop.io.ObjectWritable.readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
/** Read a {@link Writable}, {@link String}, primitive type, or an array of * the preceding. */ @SuppressWarnings("unchecked") public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf) throws IOException { String className = UTF8.readString(in); Class<?> declaredClass = PRIMITIVE_NAMES.get(className); if (declaredClass == null) { try { declaredClass = conf.getClassByName(className); } catch (ClassNotFoundException e) { throw new RuntimeException("readObject can't find class " + className, e); } } Object instance; if (declaredClass.isPrimitive()) { // primitive types if (declaredClass == Boolean.TYPE) { // boolean instance = Boolean.valueOf(in.readBoolean()); } else if (declaredClass == Character.TYPE) { // char instance = Character.valueOf(in.readChar()); } else if (declaredClass == Byte.TYPE) { // byte instance = Byte.valueOf(in.readByte()); } else if (declaredClass == Short.TYPE) { // short instance = Short.valueOf(in.readShort()); } else if (declaredClass == Integer.TYPE) { // int instance = Integer.valueOf(in.readInt()); } else if (declaredClass == Long.TYPE) { // long instance = Long.valueOf(in.readLong()); } else if (declaredClass == Float.TYPE) { // float instance = Float.valueOf(in.readFloat()); } else if (declaredClass == Double.TYPE) { // double instance = Double.valueOf(in.readDouble()); } else if (declaredClass == Void.TYPE) { // void instance = null; } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } } else if (declaredClass.isArray()) { // array int length = in.readInt(); instance = Array.newInstance(declaredClass.getComponentType(), length); for (int i = 0; i < length; i++) { Array.set(instance, i, readObject(in, conf)); } } else if (declaredClass == String.class) { // String instance = UTF8.readString(in); } else if (declaredClass.isEnum()) { // enum instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in)); } else { // Writable Class instanceClass = null; String str = ""; try { str = UTF8.readString(in); instanceClass = conf.getClassByName(str); } catch (ClassNotFoundException e) { throw new RuntimeException("readObject can't find class " + str, e); } //利用instanceClass创建WritableFactories Writable writable = WritableFactories.newInstance(instanceClass, conf); writable.readFields(in); instance = writable; if (instanceClass == NullInstance.class) { // null declaredClass = ((NullInstance)instance).declaredClass; instance = null; } } if (objectWritable != null) { // store values objectWritable.declaredClass = declaredClass; objectWritable.instance = instance; } return instance;
//保存了类型和WritableFactories工厂的对应关系 private static final HashMap<Class, WritableFactory> CLASS_TO_FACTORY = new HashMap<Class, WritableFactory>(); /** Create a new instance of a class with a defined factory. */ public static Writable newInstance(Class<? extends Writable> c, Configuration conf) { WritableFactory factory = WritableFactories.getFactory(c); if (factory != null) { Writable result = factory.newInstance(); if (result instanceof Configurable) { ((Configurable) result).setConf(conf); } return result; } else { //采用传统的反射工具,创建对象 return ReflectionUtils.newInstance(c, conf); } }
相关推荐
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
在这个特定的兼容包中,我们可以看到两个文件:flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(实际的兼容库)和._flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(可能是Mac OS的元数据文件,通常...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中对Hadoop集群进行操作,如创建、编辑和运行MapReduce任务,极大...
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。
3. **HDFS初始化**:使用`winutils.exe`初始化HDFS文件系统,创建NameNode和DataNode的数据目录,这通常涉及到创建一些特定的目录结构并设置相应的权限。 4. **配置文件**:修改`conf/core-site.xml`和`conf/hdfs-...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
Hadoop-Eclipse-Plugin 3.1.1是该插件的一个特定版本,可能包含了一些针对Hadoop 3.x版本的优化和修复,以确保与Hadoop集群的兼容性和稳定性。 6. **使用场景**: 这个插件主要适用于大数据开发人员,特别是那些...
赠送jar包:hadoop-auth-2.5.1.jar; 赠送原API文档:hadoop-auth-2.5.1-javadoc.jar; 赠送源代码:hadoop-auth-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-auth-2.5.1.pom; 包含翻译后的API文档:hadoop...
hadoop-eclipse-plugin-2.7.4.jar和hadoop-eclipse-plugin-2.7.3.jar还有hadoop-eclipse-plugin-2.6.0.jar的插件都在这打包了,都可以用。
3. `hadoop.exp`:这可能是一个导出文件,包含了Hadoop库对外公开的函数和符号信息。 4. `libwinutils.lib`:这是一个静态链接库文件,用于在Windows上编译和链接依赖于`winutils`的程序。 5. `hadoop.lib`:类似地...
flink-shaded-hadoop-3下载
hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包
`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发和测试。这个版本的Hadoop包含了对Windows系统的优化,比如提供了`winutils.exe`,这是在Windows...
Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...
3. `yarn`: 用于管理和调度YARN(Yet Another Resource Negotiator)资源的命令行工具,YARN是Hadoop的第二代资源管理系统。 4. `mapred`: MapReduce的命令行工具,MapReduce是Hadoop的分布式计算模型,用于处理和...
Ubuntu虚拟机HADOOP集群搭建eclipse环境 hadoop-eclipse-plugin-3.3.1.jar
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。
在这个"apache-hadoop-3.1.3-winutils-master.zip"压缩包中,包含了在Windows环境下配置Hadoop HDFS客户端所需的组件,特别是`hadoop-winutils`和`hadoop.dll`这两个关键文件,它们对于在Windows系统上运行Hadoop...