1.1 Join的实现原理
select u.name, o.orderid from order o join user u on o.uid = u.uid;
在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程如下:
对应map-reduce代码如下:
reduce :
1.2 Group By的实现原理
<!--EndFragment-->
select rank, isonline, count(*) from city group by rank, isonline;
将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不同的key
实现代码如下:
package mapreduce; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.Reducer; /** * 实现hive如下语句的mr代码 * select rank, isonline, count(*) from city group by rank, isonline; * @author zm * */ public class GroupByApp { // 0 定义操作地址 static final String FILE_ROOT = "hdfs://master:9000/"; static final String INPUT_PATH = "hdfs://master:9000/files"; static final String OUT_PATH = "hdfs://master:9000/out"; public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf); Path outpath = new Path(OUT_PATH); if(fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } // 0 定义干活的人 Job job = new Job(conf); // 1.1 告诉干活的人 输入流位置 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数 FileInputFormat.setInputPaths(job, INPUT_PATH); // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //用户在启动MapReduce的时候需要指定一个InputFormat的implement //1.2 指定自定义的map类 job.setMapperClass(GroupMapper.class); job.setMapOutputKeyClass(GroupBy.class); job.setMapOutputValueClass(LongWritable.class); //1.3 分区 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 目前按照默认方式执行 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(GroupReducer.class); job.setOutputKeyClass(GroupBy.class); job.setOutputValueClass(LongWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outpath); job.setOutputFormatClass(TextOutputFormat.class); // 让干活的人干活 job.waitForCompletion(true); } } class GroupMapper extends Mapper<LongWritable, Text, GroupBy, LongWritable> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { String v1 = value1.toString(); String[] splits = v1.split(","); //GroupBy groupBy = new GroupBy(Long.parseLong(splits[0]),Long.parseLong(splits[1])); GroupBy groupBy = new GroupBy(splits[0],Long.parseLong(splits[1])); context.write(groupBy, new LongWritable(1)); } } class GroupReducer extends Reducer<GroupBy, LongWritable, GroupBy, LongWritable>{ protected void reduce(GroupBy k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long count = 0; System.out.println("reduce----> k2: " + k2.toString()); for(LongWritable v2 : v2s){ System.out.println(v2.toString()); count += v2.get(); } context.write(k2, new LongWritable(count)); } } class GroupBy implements WritableComparable<GroupBy> { private String rank; private long isonline; public GroupBy(){} public GroupBy(String rank,long isonline){ this.rank = rank; this.isonline = isonline; } @Override public void write(DataOutput out) throws IOException { Text.writeString(out, this.rank); // 使用Text 实现string类型读写入操作 //out.writeLong(this.rank); out.writeLong(this.isonline); } @Override public void readFields(DataInput in) throws IOException { this.rank = Text.readString(in); //this.rank = in.readLong(); this.isonline = in.readLong(); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (isonline ^ (isonline >>> 32)); result = prime * result + ((rank == null) ? 0 : rank.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; GroupBy other = (GroupBy) obj; if (isonline != other.isonline) return false; if (rank == null) { if (other.rank != null) return false; } else if (!rank.equals(other.rank)) return false; return true; } @Override public String toString() { return this.rank + " " + this.isonline; } @Override public int compareTo(GroupBy other) { long result; result = this.rank.compareTo(other.rank); //result = this.rank - other.rank; if(result == 0){ result = this.isonline - other.isonline; } return (int)result; } }
结果:
[root@master ~]# hadoop fs -text /out/part-r-00000 Warning: $HADOOP_HOME is deprecated. A 1 3 B 0 1
<!--EndFragment-->
相关推荐
- `sql`:可能包含预定义的HQL语句或示例查询。 在部署Hive时,需要配置Hive的元数据存储(如设置Metastore的数据库连接),并确保Hadoop环境已经正确配置。然后,可以通过启动HiveServer2来接受客户端连接,并通过...
4. **编译与执行计划**:Hive将HQL语句转换为MapReduce任务,或者在更现代的Hadoop版本中,转换为Tez或Spark任务。这使得Hive能充分利用Hadoop集群的并行计算能力。 5. **优化器**:Hive的优化器(如CBO,Cost-...
2. **Hive CLI (Command Line Interface)**:Hive的命令行接口,让用户可以通过输入HQL语句来执行查询和管理数据仓库。 3. **Hive Server**:提供了远程访问Hive的接口,支持多种客户端连接方式,如Beeline(一个...
2. **HiveQL**:HQL是Hive的查询语言,与SQL高度兼容,允许用户编写复杂的查询语句,对HDFS上的数据进行处理。 3. **编译器和优化器**:Hive接收到HQL后,会将其转化为MapReduce任务。这个过程中,编译器会生成执行...
- **易于集成**:Hive 可以与多种数据源(如 HDFS、HBase、Cassandra)以及 ETL 工具(如 Pig、MapReduce、Spark)集成。 在实际应用中,使用 Hive 的步骤通常包括: 1. **创建表**:根据数据格式和需求定义表结构...
- **Hive服务器**:处理客户端请求,解析SQL语句,并生成MapReduce任务。 - **HDFS**:作为数据存储层,Hive将数据文件存储在HDFS上。 - **JobTracker/YARN**:负责调度和监控MapReduce任务的执行。 2. **Hive的...
3. **HQL(Hive Query Language)**:Hive 提供的 SQL 风格的查询语言,允许用户编写复杂的查询语句来处理大数据。 4. **编译器和优化器**:将 HQL 转换为 MapReduce 作业,同时进行查询优化,如消除冗余操作、选择...
在CDH5.9.3环境中使用Hive,用户可以通过Hue这样的图形界面工具进行数据探索和分析,或者直接使用Hive的命令行接口执行HQL语句。Hive的元数据存储在Metastore中,可以是MySQL或PostgreSQL等关系型数据库,负责存储表...
4. **MapReduce和Tez**:Hive默认使用MapReduce执行查询,但CDH 5.7.0中也支持更高效的Tez执行引擎,它减少了数据读写次数和作业间的开销,提高了整体性能。 5. **Hive的存储**:Hive数据可以存储在HDFS或其他...
通过Hive,用户可以编写SQL语句来操作HDFS(Hadoop Distributed File System)中的大量数据,无需了解底层的MapReduce或Pig等复杂大数据处理框架。 在"apache-hive-0.14.0-bin"这个压缩包中,包含了以下关键组件和...
其中,Hive Server负责接收客户端请求,Metastore存储表和分区的元数据,解析器将HQL转换为解析树,编译器再将其转化为MapReduce或Tez任务,优化器进行查询计划的优化,最后执行器负责调度和运行这些任务。...
4. **MapReduce支持**:Hive将HQL查询转换为一系列的MapReduce任务执行。虽然较新的版本开始支持Tez和Spark作为执行引擎,但在CDH5.7.0中,MapReduce仍然是主要的计算框架。 5. **容错性**:Hive能够处理MapReduce...
Apache Hive 是一个基于 Hadoop 的数据仓库工具,它允许用户使用 SQL 类似的查询语言(HQL)来查询、管理和处理存储在 Hadoop 分布式文件系统(HDFS)中的大规模数据集。`apache-hive-1.2.2-bin.tar.gz` 是 Apache ...
Hive也不是分布式计算框架,Hive的核心工作就是把sql语句翻译成MR程序去执行,不用我们再手动去写MapReduce了。 Hive也不提供资源调度系统,默认由Hadoop集群中的YARN集群来调度。 Hive可以将结构化的数据映射为...
Hive 提供了一种SQL-like(HQL,Hive Query Language)的接口,使得非编程背景的用户也能方便地处理大数据。在 `apache-hive-3.1.2-src.tar.gz` 压缩包中,包含了 Hive 的源代码,可供开发者进行定制化开发或深入...
Apache Hive 是一个基于 Hadoop 的数据仓库工具,它允许用SQL(HQL,Hive Query Language)语句进行数据查询、分析和汇总,极大地简化了大数据处理的复杂性。在Hive 0.13.0 版本中,它提供了更多增强的功能和性能...
4. **测试连接**:创建一个测试表并插入数据,然后通过HQL查询验证Hive是否正常工作。 **性能优化** 1. **查询优化**:合理使用分区和桶,避免全表扫描。使用合适的JOIN策略,如MapJOIN,减少数据交换。 2. **...
1. **Hive架构**:Hive将用户的SQL语句转换为MapReduce任务在Hadoop集群上执行,提供了一种将结构化数据文件映射为数据库表并提供简单的SQL查询功能的方式。 2. **元数据存储**:Hive使用MySQL或Derby等数据库存储...
4. **编译器**:HQL 查询会被编译成一系列的 MapReduce 作业,这些作业可以在 Hadoop 集群上并行执行。 5. **优化器**:Hive 内置了一套查询优化器,根据元数据和查询语句来优化执行计划,如选择最佳的 join 策略或...
4. **MapReduce 支持**:Hive 查询被编译成一系列 MapReduce 作业执行,提供了一层抽象,让用户无需直接编写 Java 代码。 5. **查询优化**:Hive 使用查询优化器(如 CBO,Cost-Based Optimization)来选择最优的...