- 浏览: 2188662 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
上篇介绍了如何把Pig的结果存储到Solr中,那么可能就会有朋友问了,为什么不存到数据库呢? 不支持还是? 其实只要我们愿意,我们可以存储它的结果集到任何地方,只需要重写我们自己的StoreFunc类即可。
关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,
piggybank的地址是
https://cwiki.apache.org/confluence/display/PIG/PiggyBank
,感兴趣的朋友们,可以看一看。
将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有:
(1)piggybank.jar的jar包
(2)依赖数据库的对应的驱动jar
有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败!
散仙在存储到远程的MySQL上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:
当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:
(1)允许所有的机器ip访问
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
(2)允许指定的机器ip访问:
1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,测试数据如下:
提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:
最后,在来看下我们的pig脚本是如何定义和使用的:
执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:
最后,附上DBStore类的源码:
欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!
关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,
piggybank的地址是
https://cwiki.apache.org/confluence/display/PIG/PiggyBank
,感兴趣的朋友们,可以看一看。
将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有:
(1)piggybank.jar的jar包
(2)依赖数据库的对应的驱动jar
有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败!
散仙在存储到远程的MySQL上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:
Access denied for user 'root'@'localhost'
当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:
(1)允许所有的机器ip访问
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
(2)允许指定的机器ip访问:
1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,测试数据如下:
1,2,3 1,2,4 2,2,4 3,4,2 8,2,4
提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:
最后,在来看下我们的pig脚本是如何定义和使用的:
--注册数据库驱动包和piggybank的jar register ./dependfiles/mysql-connector-java-5.1.23-bin.jar; register ./dependfiles/piggybank.jar --为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名 a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ; --过滤出id大于2的数据 a = filter a by id > 2; --存储结果到数据库里 STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd', 'INSERT into pig(id,name,count) values (?,?,?)'); ~
执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:
最后,附上DBStore类的源码:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.pig.piggybank.storage; import org.joda.time.DateTime; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import java.io.IOException; import java.sql.*; public class DBStorage extends StoreFunc { private final Log log = LogFactory.getLog(getClass()); private PreparedStatement ps; private Connection con; private String jdbcURL; private String user; private String pass; private int batchSize; private int count = 0; private String insertQuery; public DBStorage(String driver, String jdbcURL, String insertQuery) { this(driver, jdbcURL, null, null, insertQuery, "100"); } public DBStorage(String driver, String jdbcURL, String user, String pass, String insertQuery) throws SQLException { this(driver, jdbcURL, user, pass, insertQuery, "100"); } public DBStorage(String driver, String jdbcURL, String user, String pass, String insertQuery, String batchSize) throws RuntimeException { log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX," + insertQuery + ")"); try { Class.forName(driver); } catch (ClassNotFoundException e) { log.error("can't load DB driver:" + driver, e); throw new RuntimeException("Can't load DB Driver", e); } this.jdbcURL = jdbcURL; this.user = user; this.pass = pass; this.insertQuery = insertQuery; this.batchSize = Integer.parseInt(batchSize); } /** * Write the tuple to Database directly here. */ public void putNext(Tuple tuple) throws IOException { int sqlPos = 1; try { int size = tuple.size(); for (int i = 0; i < size; i++) { try { Object field = tuple.get(i); switch (DataType.findType(field)) { case DataType.NULL: ps.setNull(sqlPos, java.sql.Types.VARCHAR); sqlPos++; break; case DataType.BOOLEAN: ps.setBoolean(sqlPos, (Boolean) field); sqlPos++; break; case DataType.INTEGER: ps.setInt(sqlPos, (Integer) field); sqlPos++; break; case DataType.LONG: ps.setLong(sqlPos, (Long) field); sqlPos++; break; case DataType.FLOAT: ps.setFloat(sqlPos, (Float) field); sqlPos++; break; case DataType.DOUBLE: ps.setDouble(sqlPos, (Double) field); sqlPos++; break; case DataType.DATETIME: ps.setDate(sqlPos, new Date(((DateTime) field).getMillis())); sqlPos++; break; case DataType.BYTEARRAY: byte[] b = ((DataByteArray) field).get(); ps.setBytes(sqlPos, b); sqlPos++; break; case DataType.CHARARRAY: ps.setString(sqlPos, (String) field); sqlPos++; break; case DataType.BYTE: ps.setByte(sqlPos, (Byte) field); sqlPos++; break; case DataType.MAP: case DataType.TUPLE: case DataType.BAG: throw new RuntimeException("Cannot store a non-flat tuple " + "using DbStorage"); default: throw new RuntimeException("Unknown datatype " + DataType.findType(field)); } } catch (ExecException ee) { throw new RuntimeException(ee); } } ps.addBatch(); count++; if (count > batchSize) { count = 0; ps.executeBatch(); ps.clearBatch(); ps.clearParameters(); } } catch (SQLException e) { try { log .error("Unable to insert record:" + tuple.toDelimitedString("\t"), e); } catch (ExecException ee) { // do nothing } if (e.getErrorCode() == 1366) { // errors that come due to utf-8 character encoding // ignore these kind of errors TODO: Temporary fix - need to find a // better way of handling them in the argument statement itself } else { throw new RuntimeException("JDBC error", e); } } } class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> { @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { // IGNORE } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new OutputCommitter() { @Override public void abortTask(TaskAttemptContext context) throws IOException { try { if (ps != null) { ps.close(); } if (con != null) { con.rollback(); con.close(); } } catch (SQLException sqe) { throw new IOException(sqe); } } @Override public void commitTask(TaskAttemptContext context) throws IOException { if (ps != null) { try { ps.executeBatch(); con.commit(); ps.close(); con.close(); ps = null; con = null; } catch (SQLException e) { log.error("ps.close", e); throw new IOException("JDBC Error", e); } } } @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { return true; } @Override public void cleanupJob(JobContext context) throws IOException { // IGNORE } @Override public void setupJob(JobContext context) throws IOException { // IGNORE } @Override public void setupTask(TaskAttemptContext context) throws IOException { // IGNORE } }; } @Override public RecordWriter<NullWritable, NullWritable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { // We don't use a record writer to write to database return new RecordWriter<NullWritable, NullWritable>() { @Override public void close(TaskAttemptContext context) { // Noop } @Override public void write(NullWritable k, NullWritable v) { // Noop } }; } } @SuppressWarnings("unchecked") @Override public OutputFormat getOutputFormat() throws IOException { return new MyDBOutputFormat(); } /** * Initialise the database connection and prepared statement here. */ @SuppressWarnings("unchecked") @Override public void prepareToWrite(RecordWriter writer) throws IOException { ps = null; con = null; if (insertQuery == null) { throw new IOException("SQL Insert command not specified"); } try { if (user == null || pass == null) { con = DriverManager.getConnection(jdbcURL); } else { con = DriverManager.getConnection(jdbcURL, user, pass); } con.setAutoCommit(false); ps = con.prepareStatement(insertQuery); } catch (SQLException e) { log.error("Unable to connect to JDBC @" + jdbcURL); throw new IOException("JDBC Error", e); } count = 0; } @Override public void setStoreLocation(String location, Job job) throws IOException { // IGNORE since we are writing records to DB. } }
欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!
发表评论
-
centos7安装mysql
2016-11-14 16:41 1294centos7的默认yum源已经 ... -
MySQL的InsertOrUpdate语法
2016-08-04 14:47 1747MySQL的插入语法提供了 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2536目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3844这两天,打算给现有的 ... -
unbutu+mysql的root密码重置方法
2016-01-11 14:16 1267MySQL密码重置策略: 1,停止mysql服务 s ... -
Apache Pig中如何使用Replace函数
2015-11-17 18:48 1528今天分享一个小案例, ... -
Apache Pig的UDF返回值问题
2015-11-11 16:34 1522今天写了关于Pig的EvalFunc UDF函数,结果一执行 ... -
使用shell分页读取600万+的MySQL数据脚本
2015-07-15 13:02 2735shell-mysql (1)脚本背景: 由于要在Linux ... -
Pig0.15集成Tez,让猪飞起来
2015-06-29 19:45 18301,Tez是什么? Tez是Hortonworks公司开源 ... -
CDH-Hadoop2.6+ Apache Pig0.15安装记录
2015-06-26 20:06 27371,使用CDH的hadoop里面有对应的组件Pig,但版本较低 ... -
Pig配置vim高亮
2015-05-01 17:14 1668(1) 下载文末上传的压缩包,上到对应的linux机器上,并 ... -
Hadoop2.2如何集成Apache Pig0.12.1?
2015-05-01 16:48 978散仙假设你的Hadoop环境已经安装完毕 (1)到ht ... -
Apache Pig和Solr问题笔记(一)
2015-04-02 13:35 2071记录下最近两天散仙在工作中遇到的有关Pig0.12.0和Sol ... -
Pig使用问题总结
2015-03-29 18:39 11171,如果是a::tags#'pic'作为参数,传递给另一个函 ... -
玩转大数据系列之Apache Pig高级技能之函数编程(六)
2015-03-18 21:57 2177原创不易,转载请务必注明,原创地址,谢谢配合! http:/ ... -
Apache Pig字符串截取实战小例子
2015-03-13 17:23 2355记录一个Pig字符串截取的实战小例子: 需求如下,从下面的字 ... -
玩转大数据系列之Apache Pig如何通过自定义UDF查询数据库(五)
2015-03-12 21:06 1933GMV(一定时间内的成交 ... -
玩转大数据系列之如何给Apache Pig自定义存储形式(四)
2015-03-07 20:35 1189Pig里面内置大量的工具函数,也开放了大量的接口,来给我们开发 ... -
玩转大数据系列之Apache Pig如何与Apache Solr集成(二)
2015-03-06 21:52 1534散仙,在上篇文章中介绍了,如何使用Apache Pig与Luc ... -
玩转大数据系列之Apache Pig如何与Apache Lucene集成(一)
2015-03-05 21:54 2920在文章开始之前,我们 ...
相关推荐
大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例...
综上所述,“Python玩转大数据”课程是一门结合理论与实践的课程,旨在培养具有实战能力的大数据处理人才。通过合肥工业大学的精心教学,学生将能够充分利用Python在大数据领域的强大功能,为未来的职业生涯打下坚实...
Apache DolphinScheduler 架构演进与 Roadmap Apache DolphinScheduler 是一个分布式、易扩展并带有强大的可视化界面的大数据工作流调度系统。自 2021 年 03 月 18 日正式成为 Apache 顶级项目以来,Dolphin...
06.集成Flink_环境准备 07.集成Flink_环境准备_解决依赖冲突 08.集成Flink_Catalog_文件系统 09.集成Flink_Catalog_Hive&初始化文件 10.集成Flink_DDL_创建管理表 11.集成Flink_DDL_管理表_CTAS和表属性 12.集成...
同时,MySQL也在增强其对大数据分析的支持,如支持复杂查询和大数据处理工具(如Apache Spark与MySQL的集成)。此外,MySQL还不断优化其事务处理能力,确保数据一致性,以满足企业级应用对数据安全的要求。 总结来...
### 大数据与Apache Spark实用详解 在当前的数据驱动时代,大数据已经成为企业成功的关键因素之一。随着数据量的不断增长,传统的数据处理方法已经无法满足需求。为了应对这些挑战,分布式计算框架应运而生,其中...
MySQL是世界上最受欢迎的关系型数据库管理系统之一,而Hive则是大数据处理领域的重要工具,主要用于结构化数据的查询、分析和管理。这两个技术在后端开发和大数据处理中扮演着至关重要的角色。下面,我们将深入探讨...
这份“Python玩转大数据的大作业”正是针对大学生学习Python进行大数据处理而设计的资源包,旨在帮助学生巩固所学知识,提升实战技能。 1. **Pandas库**:Pandas是Python中用于数据分析的核心库,其DataFrame结构...
xampp-win32-5.6.21-0-VC11(apache,tomcat,php,mysql统一集成) 一个整合apache,tomcat,php,mysql的软件,维护起服务器很方便
HBase与HDFS紧密集成,能够处理PB级别的数据,是大数据应用中的实时数据访问层。 Sqoop是一款用于在Hadoop和传统数据库之间导入导出数据的工具。它简化了批量数据迁移过程,允许用户在关系型数据库和Hadoop之间进行...
在现代大数据处理领域,Elasticsearch(简称ES)与大数据平台的集成变得越来越重要,尤其在结合Spark等处理引擎时,能实现高效的数据检索、分析和可视化。本资料集主要探讨了如何将Elasticsearch与大数据平台进行...
**大数据入门:HIVE与MySQL安装指南** 在大数据领域,Hive和MySQL是两种非常重要的数据存储和管理工具。Hive作为一个数据仓库工具,能够将结构化的数据文件映射为一张数据库表,并提供SQL(HQL)查询功能,适合处理...
1.1 大数据导论课程简介 2.1 什么是大数据 2.2 大数据的价值和作用 2.3 大数据时代的思维变革 3.1 大数据相关技术基础(1) ...6.7 ApacheSpark之三 7.1 数据描述性分析 7.2 回归分析 7.3 聚类分析 ····
2. 数据清洗与预处理:在大数据项目中,原始数据往往需要经过一系列清洗步骤,SQL可以用来去除重复值、处理缺失值和异常值,为后续的分析提供干净的数据。 3. 数据聚合:SQL的GROUP BY和AGGREGATE函数(如COUNT、...
大数据集成是大数据处理过程中的关键环节,它涉及将来自不同来源、格式的数据进行整合,以形成统一、准确的数据视图。这一过程对于大数据分析和决策制定至关重要,因为它消除了数据孤岛,使得组织能够获得全面的业务...
福建师范大学精品大数据导论课程系列 (6.7.1)--5.2 ApacheSpark之三.pdf 福建师范大学精品大数据导论课程系列 (7.1.1)--6.1 《数据描述性分析》课件PPT.pdf 福建师范大学精品大数据导论课程系列 (7.2.1)--6.2 《回归...
大数据技术分享 Apache Kylin-Hadoop上的大规模OLAP联机分析处理平台 共38页.pptx
Apache Pig是一种高级数据流语言和执行框架,用于处理和分析大数据,其运行在Hadoop上。Pig提供了一种名为Pig Latin的数据处理语言,它是一种类SQL语言,可以让用户编写更简洁的代码来处理数据,相对于传统的...
Apache大数据平台技术框架选型分析主要集中在如何利用开源技术构建一个稳定、易用、安全且高性能的数据处理系统,尤其以Apache Hadoop为基础的CDH(Cloudera Distribution Including Apache Hadoop)发行版为核心。...