如何把Pig的结果存储到Solr中,那么可能就会有朋友问了,为什么不存到数据库呢? 不支持还是? 其实只要我们愿意,我们可以存储它的结果集到任何地方,只需要重写我们自己的StoreFunc类即可。
关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,
piggybank的地址是
https://cwiki.apache.org/confluence/display/PIG/PiggyBank
,感兴趣的朋友们,可以看一看。
将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有:
(1)piggybank.jar的jar包
(2)依赖数据库的对应的驱动jar
有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败!
散仙在存储到远程的MySQL上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:
- Access denied for user 'root'@'localhost'
Access denied for user 'root'@'localhost'
当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:
(1)允许所有的机器ip访问
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
(2)允许指定的机器ip访问:
1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,测试数据如下:
- 1,2,3
- 1,2,4
- 2,2,4
- 3,4,2
- 8,2,4
1,2,3 1,2,4 2,2,4 3,4,2 8,2,4
提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:
最后,在来看下我们的pig脚本是如何定义和使用的:
- --注册数据库驱动包和piggybank的jar
- register ./dependfiles/mysql-connector-java-5.1.23-bin.jar;
- register ./dependfiles/piggybank.jar
- --为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名
- a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ;
- --过滤出id大于2的数据
- a = filter a by id > 2;
- --存储结果到数据库里
- STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd',
- 'INSERT into pig(id,name,count) values (?,?,?)');
- ~
--注册数据库驱动包和piggybank的jar register ./dependfiles/mysql-connector-java-5.1.23-bin.jar; register ./dependfiles/piggybank.jar --为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名 a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ; --过滤出id大于2的数据 a = filter a by id > 2; --存储结果到数据库里 STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd', 'INSERT into pig(id,name,count) values (?,?,?)'); ~
执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:
最后,附上DBStore类的源码:
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.pig.piggybank.storage;
- import org.joda.time.DateTime;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.OutputCommitter;
- import org.apache.hadoop.mapreduce.OutputFormat;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.pig.StoreFunc;
- import org.apache.pig.backend.executionengine.ExecException;
- import org.apache.pig.data.DataByteArray;
- import org.apache.pig.data.DataType;
- import org.apache.pig.data.Tuple;
- import java.io.IOException;
- import java.sql.*;
- public class DBStorage extends StoreFunc {
- private final Log log = LogFactory.getLog(getClass());
- private PreparedStatement ps;
- private Connection con;
- private String jdbcURL;
- private String user;
- private String pass;
- private int batchSize;
- private int count = 0;
- private String insertQuery;
- public DBStorage(String driver, String jdbcURL, String insertQuery) {
- this(driver, jdbcURL, null, null, insertQuery, "100");
- }
- public DBStorage(String driver, String jdbcURL, String user, String pass,
- String insertQuery) throws SQLException {
- this(driver, jdbcURL, user, pass, insertQuery, "100");
- }
- public DBStorage(String driver, String jdbcURL, String user, String pass,
- String insertQuery, String batchSize) throws RuntimeException {
- log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX,"
- + insertQuery + ")");
- try {
- Class.forName(driver);
- } catch (ClassNotFoundException e) {
- log.error("can't load DB driver:" + driver, e);
- throw new RuntimeException("Can't load DB Driver", e);
- }
- this.jdbcURL = jdbcURL;
- this.user = user;
- this.pass = pass;
- this.insertQuery = insertQuery;
- this.batchSize = Integer.parseInt(batchSize);
- }
- /**
- * Write the tuple to Database directly here.
- */
- public void putNext(Tuple tuple) throws IOException {
- int sqlPos = 1;
- try {
- int size = tuple.size();
- for (int i = 0; i < size; i++) {
- try {
- Object field = tuple.get(i);
- switch (DataType.findType(field)) {
- case DataType.NULL:
- ps.setNull(sqlPos, java.sql.Types.VARCHAR);
- sqlPos++;
- break;
- case DataType.BOOLEAN:
- ps.setBoolean(sqlPos, (Boolean) field);
- sqlPos++;
- break;
- case DataType.INTEGER:
- ps.setInt(sqlPos, (Integer) field);
- sqlPos++;
- break;
- case DataType.LONG:
- ps.setLong(sqlPos, (Long) field);
- sqlPos++;
- break;
- case DataType.FLOAT:
- ps.setFloat(sqlPos, (Float) field);
- sqlPos++;
- break;
- case DataType.DOUBLE:
- ps.setDouble(sqlPos, (Double) field);
- sqlPos++;
- break;
- case DataType.DATETIME:
- ps.setDate(sqlPos, new Date(((DateTime) field).getMillis()));
- sqlPos++;
- break;
- case DataType.BYTEARRAY:
- byte[] b = ((DataByteArray) field).get();
- ps.setBytes(sqlPos, b);
- sqlPos++;
- break;
- case DataType.CHARARRAY:
- ps.setString(sqlPos, (String) field);
- sqlPos++;
- break;
- case DataType.BYTE:
- ps.setByte(sqlPos, (Byte) field);
- sqlPos++;
- break;
- case DataType.MAP:
- case DataType.TUPLE:
- case DataType.BAG:
- throw new RuntimeException("Cannot store a non-flat tuple "
- + "using DbStorage");
- default:
- throw new RuntimeException("Unknown datatype "
- + DataType.findType(field));
- }
- } catch (ExecException ee) {
- throw new RuntimeException(ee);
- }
- }
- ps.addBatch();
- count++;
- if (count > batchSize) {
- count = 0;
- ps.executeBatch();
- ps.clearBatch();
- ps.clearParameters();
- }
- } catch (SQLException e) {
- try {
- log
- .error("Unable to insert record:" + tuple.toDelimitedString("\t"),
- e);
- } catch (ExecException ee) {
- // do nothing
- }
- if (e.getErrorCode() == 1366) {
- // errors that come due to utf-8 character encoding
- // ignore these kind of errors TODO: Temporary fix - need to find a
- // better way of handling them in the argument statement itself
- } else {
- throw new RuntimeException("JDBC error", e);
- }
- }
- }
- class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> {
- @Override
- public void checkOutputSpecs(JobContext context) throws IOException,
- InterruptedException {
- // IGNORE
- }
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new OutputCommitter() {
- @Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- try {
- if (ps != null) {
- ps.close();
- }
- if (con != null) {
- con.rollback();
- con.close();
- }
- } catch (SQLException sqe) {
- throw new IOException(sqe);
- }
- }
- @Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- if (ps != null) {
- try {
- ps.executeBatch();
- con.commit();
- ps.close();
- con.close();
- ps = null;
- con = null;
- } catch (SQLException e) {
- log.error("ps.close", e);
- throw new IOException("JDBC Error", e);
- }
- }
- }
- @Override
- public boolean needsTaskCommit(TaskAttemptContext context)
- throws IOException {
- return true;
- }
- @Override
- public void cleanupJob(JobContext context) throws IOException {
- // IGNORE
- }
- @Override
- public void setupJob(JobContext context) throws IOException {
- // IGNORE
- }
- @Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- // IGNORE
- }
- };
- }
- @Override
- public RecordWriter<NullWritable, NullWritable> getRecordWriter(
- TaskAttemptContext context) throws IOException, InterruptedException {
- // We don't use a record writer to write to database
- return new RecordWriter<NullWritable, NullWritable>() {
- @Override
- public void close(TaskAttemptContext context) {
- // Noop
- }
- @Override
- public void write(NullWritable k, NullWritable v) {
- // Noop
- }
- };
- }
- }
- @SuppressWarnings("unchecked")
- @Override
- public OutputFormat getOutputFormat()
- throws IOException {
- return new MyDBOutputFormat();
- }
- /**
- * Initialise the database connection and prepared statement here.
- */
- @SuppressWarnings("unchecked")
- @Override
- public void prepareToWrite(RecordWriter writer)
- throws IOException {
- ps = null;
- con = null;
- if (insertQuery == null) {
- throw new IOException("SQL Insert command not specified");
- }
- try {
- if (user == null || pass == null) {
- con = DriverManager.getConnection(jdbcURL);
- } else {
- con = DriverManager.getConnection(jdbcURL, user, pass);
- }
- con.setAutoCommit(false);
- ps = con.prepareStatement(insertQuery);
- } catch (SQLException e) {
- log.error("Unable to connect to JDBC @" + jdbcURL);
- throw new IOException("JDBC Error", e);
- }
- count = 0;
- }
- @Override
- public void setStoreLocation(String location, Job job) throws IOException {
- // IGNORE since we are writing records to DB.
- }
- }
相关推荐
它是Hadoop项目下的一个子项目,广泛应用于Hadoop生态中的各个组件如HBase、Hive、Pig等。 ##### 2.2 Zookeeper的主要功能 1. **配置管理**:提供集中式的配置存储服务,确保配置的一致性和高可用性。例如,在...
《大数据与云计算教程课件》系列课程覆盖了大数据技术栈中的多个重要组件,从基础到高级,详细介绍了Hadoop、MapReduce、Hive、HBase、Pig等核心技术。其中,“Pig Latin(共36页).pptx”部分专注于Pig语言的学习,...
这个框架里面通过Pig的UDF函数封装了Store方法,只要Pig能读的任何数据源,我们都可以通过Store函数,将结果轻而易举的存储到我们的索引里面, ####使用步骤 (1)下载源码后,根据自己的业务情况,稍作修改,然后...
【大数据与云计算教程】本课程涵盖了大数据处理的关键...总的来说,这个课程提供了全面的大数据与云计算技术学习路径,涵盖了从数据存储、处理、分析到实时流处理的各个环节,适合希望深入理解和掌握大数据技术的学员。
本系列课程包括多个主题,如Hadoop、MapReduce、YARN、HDFS、序列化、Zookeeper、Sqoop、Flume、Kafka、Storm、Spark等,以及数据存储与分析工具Hive、HBase、Pig、Impala、Solr、Lily、Titan和Neo4j等。 【Hive...
Hadoop生态系统中的工具远不止HDFS和MapReduce,还包括了Hive、Pig、HBase、Spark、Storm、Kafka、Flume、Oozie、Zookeeper、Mahout、Flink、Cassandra、Solr、Nifi、Sqoop等。这些工具在不同的场景下发挥着重要作用...
教程还包含了如Zookeeper(分布式协调服务)、Pig(大数据分析工具)、Hive(数据仓库工具)、Hive操作、HBase、Pig Latin、Pig模式与函数、Sqoop(数据导入导出工具)、Flume(日志收集系统)、Kafka(消息队列)、...
如Hive(提供SQL接口进行数据查询)、HBase(分布式列式数据库)、Pig(数据分析平台)、Pig Latin(Pig的脚本语言)、Sqoop(数据导入导出工具)、Flume(日志收集系统)、Kafka(分布式流处理平台)、Spark(快速...
Hue的搜索功能可以让用户通过搜索引擎接口对存储在Hadoop中的数据进行全文搜索。 `pig` 文件夹表示Hue对Apache Pig的支持。Pig是一种高级数据处理语言,用于编写处理大数据集的脚本,通常称为Pig Latin。Hue的Pig...
- Channel:数据缓冲区,存储从Source接收到的数据,等待传输到Sink。 Flume内部的数据传输单位是Event,包含headers、body和事件信息,可以视为最小的完整数据单元。Event的流转保证了数据的安全传输。 1.2 Flume...
课程涵盖了从基础到高级的大数据处理技术,主要围绕Hadoop和相关生态系统的组件展开,包括Hadoop的安装与入门、MapReduce编程模型、HDFS分布式文件系统、Hive数据仓库工具、HBase分布式数据库、Pig数据处理语言、...
这些课程旨在帮助学习者掌握大数据处理的全貌,从数据存储、计算模型到实时处理和查询,涵盖了从基础到高级的全面技能。通过学习这些内容,开发者可以有效地处理海量数据,实现高效的数据分析和决策支持。
课程还涉及到了Pig,这是一种用于大数据分析的高级语言,Pig Latin是Pig的语言,提供了对MapReduce的抽象,简化了大规模数据处理。Zookeeper是Hadoop生态中的协调服务,用于管理分布式应用程序的配置信息、命名服务...
通过阅读关于YARN和ZooKeeper的章节,读者可以了解到如何配置和管理大数据存储系统。 此外,本书还详细讨论了如何收集数据,使用了Nutch和Solr这两个常用工具。Nutch主要用于网络数据的抓取,而Solr则是一个搜索...
课程还包括了Hive(数据仓库工具)、HBase(列式存储数据库)、Pig(数据流处理)、Sqoop(数据库导入导出工具)、Flume(日志收集系统)、Kafka(消息队列)、Strom(实时流处理)、Spark(快速大数据处理框架)、...
其中,数据采集层负责采集来自各种数据源的数据,数据存储层负责存储和管理大量的数据,数据处理层负责处理和分析数据,数据分析层负责分析和可视化数据,数据可视化层负责将分析结果可视化。 四、数据源与SQL引擎 ...
Hadoop是最著名的MapReduce框架之一,其生态中包括了Hive、Pig等工具,用于简化MapReduce编程。Cascading、Cascalog、mrjob是提供MapReduce作业编写的抽象层库。Caffeine、S4是流数据处理系统。MapR是Hadoop的商业...
- **案例简介**:未提供具体细节,但可以从名称推测该系统主要用于管理Hadoop集群中的各种资源。 **4. Pig相关——可视化用户自主查询** - **案例简介**:未提供具体细节,但可以从名称推测该应用旨在利用Pig工具...