`
sunasheng
  • 浏览: 122907 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

mapreduce处理结果向输出至mysql(直接插入/更新/追加式更新)

阅读更多

package cn.m15.ipj.job.usergroup;

Mapreduce处理结果向输出至mysql

1.写入mysql
	<1>job中输出的配置:
		DBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
		MySQLConstant.MYSQL_FIX_OPEN_FIRST_FIELDS);
		(DBOutputFormat为hadoop自带API,将输入插入数据库)
		public final static String
		MYSQL_FIX_USER =
		"ipj_fix_user";
		public final static String[]
		MYSQL_FIX_OPEN_FIRST_FIELDS = {"app_id","version","imei","first_open","date"};
		 
	<2>reduce中写入:
		private FixOpenAppFirstRecord record = new FixOpenAppFirstRecord();
		 
		record.setApp_id(Integer.parseInt(app_id));
		record.setImei(imei);
		record.setVersion(version);
		record.setFirst_open(exactDate);
		record.setDate(date);
		context.write(record, NULL);
		 
	<3>FixOpenAppFirstRecord中字段的顺序配置(只列出一条):
		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setInt(1, app_id);
			statement.setString(2, version);
			statement.setString(3, imei);
			statement.setString(4, first_open);
			statement.setString(5, date);
		}
	<4>注意:
		1.reduce中record的set值的顺序无所谓,可以任意
		2.job的mysql字段MYSQL_FIX_OPEN_FIRST_FIELDS的顺序一定要和类FixOpenAppFirstRecord中字段的配置顺序一致
2.更新mysql(改变值)
	<1>job中输出的配置:
		FixDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
		MySQLConstant.MYSQL_FIX_IS_TAOBAO_FIELDS);
		(FixDBOutputFormat为自定义Format类,用于更新mysql)
		public final static String
		MYSQL_FIX_USER =
		"ipj_fix_user";
		public final static String[]
		MYSQL_FIX_IS_WEIBO_FIELDS = {"is_weibo","app_id","version","imei"};
	<2>reduce中写入:
		private FixIsMallUserRecord record = new FixIsMallUserRecord();
		record.setApp_id(Integer.parseInt(app_id));
		record.setVersion(version);
		record.setImei(imei);
		record.setIs_taobao(is_taobao);
		context.write(record, NULL);
	<3>FixIsMallUserRecord 中字段的顺序配置(只列出一条):
		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.is_taobao = resultSet.getInt(1);
			this.app_id = resultSet.getInt(2);
			this.version = resultSet.getString(3);
			this.imei = resultSet.getString(4);
		}
	<4>FixDBOutputFormat中关键的拼接sql代码:
		public String constructQuery(String table, String[] fieldNames) {
		if (fieldNames == null) {
			throw new IllegalArgumentException(
			"Field names may not be null");
		}
		 
		StringBuilder query = new StringBuilder();
		query.append("UPDATE ").append(table);
		if (fieldNames.length > 0 && fieldNames[0] != null
		&& fieldNames[1] != null&& fieldNames[2] != null
		&& fieldNames[3] != null) {
			query.append(" SET ");
			query.append(fieldNames[0] + " =?");
			query.append(" WHERE ");
			query.append(fieldNames[1] + " =?");
			query.append(" AND ");
			query.append(fieldNames[2] + " =?");
			query.append(" AND ");
			query.append(fieldNames[3] + " =?");
			return query.toString();
			}
			return null;
		}
	<5>注意:
		1.reduce中record的set值的顺序无所谓,可以任意
		2.job的mysql字段MYSQL_FIX_IS_WEIBO_FIELDS的顺序一定要和类
		3.2中的顺序也一定要和FixDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值,第二三四个参数分别为条件参数)
3.更新mysql(值的追加)
	<1>job中输出的配置:
		FixAppendDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
		MySQLConstant.MYSQL_FIX_MALL_LOGIN_FIELDS);
		(FixAppendDBOutputFormat自定义Format,用户更新mysql[追加])
		public final static String
		MYSQL_FIX_USER =
		"ipj_fix_user";
		public final static String[]
		MYSQL_FIX_MALL_LOGIN_FIELDS = {"login_taobao_count","app_id","version","imei"};
	<2>reduce中写入:
		private FixMallTotalLoginRecord record = new FixMallTotalLoginRecord();
		record.setApp_id(Integer.parseInt(app_id));
		record.setVersion(version);
		record.setImei(imei);
		record.setLogin_taobao_count(num);
		context.write(record, NULL);
	<3>FixIsMallUserRecord 中字段的顺序配置(只列出一条):
		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setInt(1, login_taobao_count);
			statement.setInt(2, app_id);
			statement.setString(3, version);
			statement.setString(4, imei);
		}
	<4>FixAppendDBOutputFormat中关键的拼接sql代码
		public String constructQuery(String table, String[] fieldNames) {
			if (fieldNames == null) {
				throw new IllegalArgumentException
				("Field names may not be null");
			}
			StringBuilder query = new StringBuilder();
			query.append("UPDATE ").append(table);
			if ( fieldNames.length > 0 &&
			fieldNames[0] != null &&
			fieldNames[1] != null &&
			fieldNames[2] != null &&
			fieldNames[3] != null) {
				query.append(" SET ");
				query.append(fieldNames[0] +
				" = "+fieldNames[0]+"+?");
				query.append(" WHERE ");
				query.append(fieldNames[1] + " =?");
				query.append(" AND ");
				query.append(fieldNames[2] + " =?");
				query.append(" AND ");
				query.append(fieldNames[3] + " =?");
				return query.toString();
				}
			return null;
		}
	<5>注意:
		1.reduce中record的set值的顺序无所谓,可以任意
		2.job的mysql字段MYSQL_FIX_MALL_LOGIN_FIELDS的顺序一定要和类
		3.2中的顺序也一定要和FixAppendDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值[在原有基础上增加],第二三四个参数分别为条件参数)

 

分享到:
评论

相关推荐

    18、MapReduce的计数器与通过MapReduce读取-写入数据库示例

    对于写入数据库,通常在Reducer类的reduce()方法或cleanup()方法中进行,将处理后的数据转换为适合数据库存储的格式,然后通过JDBC API执行插入、更新或删除等操作。需要注意的是,由于MapReduce作业可能涉及大量的...

    MapReduce多路径输入输出

    这是 MapReduce 的多路径输入输出示例代码。有关大数据的相关文章可以阅读我的专栏:《大数据之Hadoop》 http://blog.csdn.net/column/details/bumblebee-hadoop.html

    用mapreduce进行文本处理

    Reduce 函数负责将中间结果进行合并或聚合,最终产生输出结果。 ##### 2.2 MapReduce 的优势 - **可扩展性**:MapReduce 支持数据的水平扩展,可以通过增加更多的服务器来提高处理能力。 - **容错性**:该框架能够...

    MapReduce输出至hbase共16页.pdf.zip

    3. 实现TableOutputFormat:MapReduce的输出格式默认为文件,但要将结果直接写入HBase,需自定义TableOutputFormat类,使其能够将MapReduce的输出直接转化为HBase的Put操作。 4. 写入HBase:在Reduce阶段,每个...

    MapReduce海量数据处理

    - **绪论**:MapReduce的核心思想是将大规模数据的处理分解为两个主要操作:Map和Reduce,这两个操作都是以函数式编程中的概念为基础。Map阶段将原始数据分片并应用映射函数,生成中间键值对;Reduce阶段则对这些...

    MapReduce详解Shuffle过程

    在map端,map task将输出结果存储在内存缓冲区中,当缓冲区快满的时候将缓冲区的数据以一个临时文件的方式存放到磁盘,然后对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件。 在reduce端,...

    【MapReduce篇04】MapReduce之OutputFormat数据输出1

    在Hadoop MapReduce框架中,OutputFormat扮演着至关重要的角色,它是定义如何将Mapper和Reducer产生的中间结果转化为最终输出格式的规范。MapReduce之OutputFormat数据输出主要涉及到以下几个方面: 1. **...

    MapReduce基础.pdf

    - Reduce函数接收一组相同的键及其对应的值列表,然后对这些值进行聚合处理,生成最终的输出结果。 - Reduce阶段的目标是减少输出数据量,通常输出的数据量远小于输入数据量。 #### 三、MapReduce的设计原则 ...

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    MapReduce:超大机群上的简单数据处理

    在词频统计的例子中,reduce函数接受一个词(key)和一个计数列表(values),通过迭代器遍历这个列表,将所有计数值相加,得到单词的总出现次数result,然后输出结果,如Emit(AsString(result))。 MapReduce库处理...

    MapReduce类型及格式

    MapReduce是一种编程模型,用于处理和生成大数据集,尤其适用于大规模数据集的分布式运算。它最初由Google开发,是Apache Hadoop项目的核心组件。MapReduce模型的核心思想是将任务分解为两个阶段:Map(映射)阶段和...

    MapReduce处理通过采集的气象数据分析每年的最高温度源代码

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个模型将复杂的计算任务拆分成两个阶段:Map(映射)和Reduce(规约),并能在大规模集群上并行执行,极大地提高了数据...

    MapReduce.docx

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。Hadoop MapReduce是Apache Hadoop项目的一部分,它实现了这个模型,允许开发者编写能够处理PB级数据的程序,即使在由数千个...

    【MapReduce篇07】MapReduce之数据清洗ETL1

    MapReduce之数据清洗ETL详解 MapReduce是一种基于Hadoop的分布式计算框架,广泛应用于大数据处理领域。...MapReduce框架提供了强大的数据处理能力,能够处理大规模数据集,广泛应用于大数据处理领域。

    大数据、大数据处理模型及MapReduce

    在处理大规模数据集时,MapReduce模型发挥着至关重要的作用,它不仅是大数据处理的基石之一,也为后续的技术发展奠定了基础。通过这种模型,数据科学家和工程师们能够更高效地分析、处理大数据,从而在商业、科研等...

    mapreduce解析网络日志文件(或从mysql数据库获取记录)并计算相邻日志记录间隔时长

    在大数据处理领域,MapReduce是一种广泛使用的编程模型,尤其适合处理和存储海量数据。本话题主要探讨如何利用MapReduce解析网络日志文件,或者从MySQL数据库中获取记录,并计算相邻日志记录之间的间隔时长。这涉及...

    mapreduce mapreduce mapreduce

    MapReduce是一种分布式计算模型,由Google开发,用于处理和生成大量数据。这个模型主要由两个主要阶段组成:Map...在云计算和大数据领域,MapReduce已经成为一个基础概念,对于理解和开发大规模数据处理系统至关重要。

Global site tag (gtag.js) - Google Analytics