参考文档: Hadoop:The Definitive Guide3E Chapter11.Pig
代码具体地址: tomwhite-hadoop-book-32dae01\ch11\src\main\java\com\hadoopbook\pig
工具类
public class Range {
private final int start;
private final int end;
public Range(int start, int end) {
this.start = start;
this.end = end;
}
public int getStart() {
return start;
}
public int getEnd() {
return end;
}
public String getSubstring(String line) { //abcdefghi ==>sample.txt的每一行
//rangeSpec: 1-2,5-6 ==>CutLoadFunc()的参数值
//1-2: start=1,end=2, "abcedfghi".substring(0,2)=>ab
//5-6: start=5,end=6, "abcedfghi".substring(4,6)=>df
return line.substring(start - 1, end);
}
@Override
public int hashCode() {
return start * 37 + end;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Range)) {
return false;
}
Range other = (Range) obj;
return this.start == other.start && this.end == other.end;
}
//1-2,5-6
public static List<Range> parse(String rangeSpec)
throws IllegalArgumentException {
if (rangeSpec.length() == 0) {
return Collections.emptyList();
}
List<Range> ranges = new ArrayList<Range>();
String[] specs = rangeSpec.split(","); //["1-2", "5-6"]
for (String spec : specs) {
String[] split = spec.split("-"); //["1", "2"]
try {
ranges.add(new Range(Integer.parseInt(split[0]), Integer
.parseInt(split[1]))); //start=1, end=2
} catch (NumberFormatException e) {
throw new IllegalArgumentException(e.getMessage());
}
}
return ranges;
}
}
自定义函数
public class CutLoadFunc extends LoadFunc {
private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);
private final List<Range> ranges; //工具类,解析范围参数,范围针对的是文件里的每一行数据.进行截取操作
private final TupleFactory tupleFactory = TupleFactory.getInstance();
private RecordReader reader; //LOAD命令加载文件,reader会去读取文件里的每一行数据
public CutLoadFunc(String cutPattern) { //构造函数接收参数:范围->'16-19,88-92,93-93'
ranges = Range.parse(cutPattern);
}
@Override
public void setLocation(String location, Job job)
throws IOException {
FileInputFormat.setInputPaths(job, location);
}
@Override
public InputFormat getInputFormat() {
return new TextInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
this.reader = reader;
}
//读取文件里的每一行记录
@Override
public Tuple getNext() throws IOException {
try {
if (!reader.nextKeyValue()) {
return null;
}
Text value = (Text) reader.getCurrentValue(); //读取到当前行的数据
String line = value.toString(); //line为当前行的数据
Tuple tuple = tupleFactory.newTuple(ranges.size());
for (int i = 0; i < ranges.size(); i++) {
Range range = ranges.get(i); //调用构造函数时,通过范围参数,已经将List<Range>的数据填充完毕.即此时每一个Range对象的start.end都是有值的.
if (range.getEnd() > line.length()) {
LOG.warn(String.format(
"Range end (%s) is longer than line length (%s)",
range.getEnd(), line.length()));
continue;
}
tuple.set(i, new DataByteArray(range.getSubstring(line))); //传入当前行的数据,调用截取方法(start,end已经有值),截取当前行对应的范围的字符串.
}
return tuple; //tuple可以看做是一个上下文. 参数就是通过上下文传入的.
} catch (InterruptedException e) {
throw new ExecException(e);
}
}
}
分享到:
相关推荐
pig udf 示例
在大数据处理领域,Apache Pig 是一个非常重要的工具,它提供了一种高级的脚本语言(Pig ...通过理解并正确使用这些自定义 UDF,你可以更灵活地处理各种复杂的数据处理任务,提高 Pig 在大数据分析中的效率和实用性。
"大数据 Java Hive UDF 函数示例代码(手机号码脱敏)" 大数据 Java Hive UDF 函数示例代码(手机号码脱敏)是指使用 Java 语言开发的用户定义函数(User Defined Function,UDF),该函数可以在 Hive 中使用,实现...
6. **UDF的使用示例**:提供了实际的代码示例,展示了如何在查询语句中调用UDF,以及UDF在实际业务场景中的应用。 7. **安全性与权限管理**:讲解了UDF的安全性问题,包括权限设置,如何限制非授权用户的访问,以及...
udf提权PHP代码
在《Programming Pig》第二版中,读者可以期待更加详尽的示例、更深入的UDF开发指南以及多样化的数据集。这本书不仅适合初学者,也对有经验的Pig用户有所启发,帮助他们提高在大数据环境下的编程技能。通过阅读和...
- **test.c**:这可能是一个C语言编写的UDF示例代码,用于在Fluent中实现特定的波浪生成算法。 - **123.msh**:这通常是网格文件,网格是CFD模拟的基础,用于划分计算域并分配物理属性,如速度、压力等。 - **udf....
这个项目的目标是为 Pentaho Kettle 提供与 Pig 集成的 UDF 支持,从而扩展了 Kettle 在大数据分析场景下的能力。 **Pentaho Data Integration (Kettle) 知识点:** 1. **ETL 工具**:Kettle 是一个图形化的 ETL ...
在本场景中,“udf_udf_”可能是指一个关于UDF的项目或实验,目标是测试所编写的UDF代码是否正确,为后续步骤做好准备。 UDF分为两种类型:存储过程和自定义函数。存储过程是一组为了完成特定功能的SQL语句集合,...
虽然没有提供具体的源代码,但“pig-udf-master”项目很可能是包含了一些示例的Java UDF,演示了如何创建、编译和使用Pig UDF。这些示例可能覆盖了上述的三种UDF类型,帮助开发者理解如何将Java代码与Pig的处理流程...
描述中提到的“关于fluent中多相流模拟的udf的例子”,意味着这个压缩包可能包含了一些具体的UDF代码示例,用于指导用户如何在Fluent中编写和应用UDF进行多相流的模拟。这些示例可能涵盖了初始化多相流模型、定义交...
例如,以下是一个简单的Pig Latin脚本示例,用于分析`access_log.txt`中的数据: ``` pig LOGS = LOAD 'access_log.txt' USING PigStorage('\t') AS (timestamp:chararray, ip:chararray, agent:chararray, method:...
"新建文本文档.txt"的名称虽然不明确,但根据上下文推测,可能包含的是UDF的编写指南、示例代码或者关于如何在Fluent中编译和链接UDF的说明。 总的来说,这个压缩包中的文件为Fluent MHD模拟提供了一套完整的UDF...
教程内容非常丰富,通过示例代码和实际案例,让读者在实践中学习如何使用UDF来解决各种CFD模拟问题,从而提高解决问题的能力和模拟的准确性。对于希望深入学习和掌握ANSYS Fluent高级功能的工程师或研究人员来说,这...
描述中提到的"fluent修改动量源项UDF案例(fluent文件及UDF源代码)"意味着这个压缩包可能包含一个具体的示例,指导用户如何在Fluent中通过UDF来修改动量源项。动量源项在流体流动问题中至关重要,它可以用来模拟...
总的来说,这些文件系统规范和示例代码涵盖了从简单的光盘文件系统(如UDF和ISO9660)到更复杂的磁盘文件系统(如FAT和NTFS)的各种技术。通过研究这些代码,开发者可以深入理解文件系统的工作原理,如何进行读写...
UDF(User Defined Functions)是OpenFOAM等流体动力学软件中的一种用户自定义功能,允许用户根据特定需求编写源...通过分析和调试这些源代码,我们可以更好地理解和控制流体模拟过程,为工程问题提供精确的解决方案。
**第六章:UDF的应用示例** 在“第6章.doc”中,可能详细介绍了UDF在实际问题中的应用,例如如何创建一个自定义的湍流模型或者实现特定的源项。这部分内容可能涵盖了以下知识点: 1. **自定义源项**:UDF可用于...
手册也会提供一些示例代码,帮助用户更好地理解如何在实践中应用这些技术。 此外,手册还可能包含有关调试和优化UDF的章节,包括如何使用Fluent的内置诊断工具,以及如何通过调整算法和内存管理来提高性能。对于...