`

MySQL Applier For Hadoop: Implementation

 
阅读更多

http://innovating-technology.blogspot.com/2013/04/mysql-hadoop-applier-part-2.html

This is a follow up post, describing the implementation details of Hadoop Applier, and steps to configure and install it. Hadoop Applier integrates MySQL with Hadoop providing the real-time replication of INSERTs to HDFS, and hence can be consumed by the data stores working on top of Hadoop. You can know more about the design rationale and per-requisites in theprevious post. 

Design and Implementation:

Hadoop Applier replicates rows inserted into a table in MySQL to the Hadoop Distributed File System(HDFS). It uses an API provided by libhdfs, a C library to manipulate files in HDFS.

 
The library comes pre-compiled with Hadoop distributions.
It connects to the MySQL master (or read a binary log generated by MySQL) and:
  • fetches the row insert events occurring on the master
  • decodes these events, extracts data inserted into each field of the row
  • uses content handlers to get it in the format required and appends it to a text file in HDFS.

Schema equivalence is a simple mapping:

Databases are mapped as separate directories, with tables in them as sub-directories. Data inserted into each table is written into text files (named as datafile1.txt) in HDFS. Data can be in comma separated format; or any other delimiter can be used, that is configurable by command line arguments. 

 
The diagram explains the mapping between MySQL and HDFS schema.

 



 

 

The file in which the data is stored is named datafile1.txt here; you can name is anything you want. The working directory where this datafile goes is base_dir/db_name.db/tb_name. 

The timestamp at which the event occurs is included as the first field in each row inserted in the text file.
The implementation follows these steps:

- Connect to the MySQL master using the interfaces to the binary log

#include “binlog_api.h”

Binary_log binlog(create_transport(mysql_uri.c_str()));
binlog.connect();

- Register content handlers


/*

Table_index is a sub class of Content_handler class in the Binlog API
*/
Table_index table_event_hdlr; 
Applier replay_hndlr(&table_event_hdlr, &sqltohdfs_obj);
binlog.content_handler_pipeline()->push_back(&table_event_hdlr);
binlog.content_handler_pipeline()->push_back(&replay_hndlr);

- Start an event loop and wait for the events to occur on the master

 

while (true)
{
/*
Pull events from the master. This is the heart beat of the event listener.
*/
Binary_log_event *event;
binlog.wait_for_next_event(&event);
}
 
 

- Decode the event using the content handler interfaces
 

class Applier : public mysql::Content_handler

{
public:
Applier(Table_index *index, HDFSSchema *mysqltohdfs_obj)
{
   m_table_index= index;
   m_hdfs_schema= mysqltohdfs_obj;
}
mysql::Binary_log_event *process_event(mysql::Row_event *rev)
{
   int table_id= rev->table_id;
   typedef std::map<long int, mysql::Table_map_event *> Int2event_map;
   int2event_map::iterator ti_it= m_table_index->find(table_id);

- Each row event contains multiple rows and fields.
Iterate one row at a time using Row_iterator.


mysql::Row_event_set rows(rev, ti_it->second);
mysql::Row_event_set::iterator it= rows.begin();
do
{
mysql::Row_of_fields fields= *it;
long int timestamp= rev->header()->timestamp;
if (rev->get_event_type() == mysql::WRITE_ROWS_EVENT)
table_insert(db_name, table_name, fields, timestamp, m_hdfs_schema);
} while (++it != rows.end());

- Get the field data separated by field delimiters and row delimiters.
 
Each row contains a vector of Value objects. The converter allows us to transform the value into another representation.

mysql::Row_of_fields::const_iterator field_it= fields.begin();

mysql::Converter converter;
std::ostringstream data;
data << timestamp;
do
{
field_index_counter++;
std::vector<long int>::iterator it;
std::string str;
converter.to(str, *field_it);
 
data << sqltohdfs_obj->hdfs_field_delim;
data << str;
} while (++field_it != fields.end());
data << sqltohdfs_obj->hdfs_row_delim;

- Connect to the HDFS file system. 
If not provided, the connection information (user name, password host and port) are read from the XML configuration file, hadoop-site.xml.
 

HdfsFS m_fs= hdfsConnect(host.c_str(), port);

- Create the directory structure in HDFS. 
Set the working directory and open the file in append mode.
 

hdfsSetWorkingDirectory(m_fs, (stream_dir_path.str()).c_str());
const char* write_path= "datafile1.txt";
hdfsFile writeFile;

 
- Append data at the end of the file.
 

writeFile= hdfsOpenFile(m_fs, write_path, O_WRONLY|O_APPEND, 0, 0, 0);
tSize num_written_bytes = hdfsWrite(m_fs, writeFile, (void*)data, strlen(data));
Install and Configure:
Follow these steps to install and run the Applier:
 
 
1. Download a Hadoop release (I am using 1.0.4); configure and install (for the purpose of the demo, install it in pseudo distributed mode). Flag 'dfs.support.append' must be set to true while configuring HDFS(hdfs-site.xml). Since append is not supported in Hadoop 1.x, set the flag 'dfs.support.broken.append' to true. 
 
2. Set the environment variable $HADOOP_HOME to point to the Hadoop installation directory.
 
3. CMake doesn't come with a 'find' module for libhdfs. Ensure that the 'FindHDFS.cmake' is in the CMAKE_MODULE_PATH. You can download a copy here
 
4. Edit the file 'FindHDFS.cmake', if necessary, to have HDFS_LIB_PATHS set as a path to libhdfs.so, and HDFS_INCLUDE_DIRS have the path pointing to the location of hdfs.h.
For 1.x versions, library path is $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib , and header files are contained in $ENV{HADOOP_HOME}/src/c++/libhdfs. For 2.x releases, header files and libraries can be found in $ENV{HADOOP_HOME}/lib/native, and $ENV{HADOOP_HOME}/include respectively.
For versions 1.x, this patch will fix the paths:
  --- a/cmake_modules/FindHDFS.cmake
  +++ b/cmake_modules/FindHDFS.cmake
  @@ -11,6 +11,7 @@ exec_program(hadoop ARGS version OUTPUT_VARIABLE
Hadoop_VERSION
   # currently only looking in HADOOP_HOME
   find_path(HDFS_INCLUDE_DIR hdfs.h PATHS
     $ENV{HADOOP_HOME}/include/
  +  $ENV{HADOOP_HOME}/src/c++/libhdfs/
     # make sure we don't accidentally pick up a different version
     NO_DEFAULT_PATH
   )
  @@ -26,9 +27,9 @@ endif()
   message(STATUS "Architecture: ${arch_hint}")
  
   if ("${arch_hint}" STREQUAL "x64")
  -  set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
  +  set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-amd64-64/lib)
   else ()
  -  set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
  +  set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib)
   endif ()
  
   message(STATUS "HDFS_LIB_PATHS: ${HDFS_LIB_PATHS}")

5.Since libhdfs is JNI based API, it requires JNI header files and libraries tobuild. If there exists a module FindJNI.cmake in the CMAKE_MODULE_PATH and JAVA_HOME is set; the headers will be included, and the libraries would be linked to. If not, it will be required to include the headers and load thelibraries separately (modify LD_LIBRARY_PATH).
 
6. Build and install the library 'libreplication', to be used by Hadoop Applier,using CMake.
  • Download a copy of Hadoop Applier from http://labs.mysql.com.
     
  • 'mysqlclient' library is required to be installed in the default library paths. You can either download and install it (you can get a copy here), or set the environment variable $MYSQL_DIR to point to the parent directory of MySQL source code. Make sure to run cmake on MySQL source directory.
                $export MYSQL_DIR=/usr/local/mysql-5.6
 
  • Run the 'cmake' command on the parent directory of the Hadoop Applier source. This will generate the necessary Makefiles. Make sure to set cmake option ENABLE_DOWNLOADS=1; which will install Google Test required to run the unit tests.
              $cmake . -DENABLE_DOWNLOADS=1

 

  • Run 'make' and 'make install' to build and install. This will install the library 'libreplication' which is to be used by Hadoop Applier.
     
7. Make sure to set the CLASSPATH to all the hadoop jars needed to run Hadoop itself.

         $export PATH=$HADOOP_HOME/bin:$PATH

         $export CLASSPATH=$(hadoop classpath)
 
 

8. The code for Hadoop Applier can be found in /examples/mysql2hdfs, in the Hadoop Applier repository. To compile, you can simply load the libraries (modify LD_LIBRARY_PATH if required), and run the command “make happlier” on your terminal. This will create an executable file in the mysql2hdfs directory.
 
.. and then you are done!

Now run hadoop dfs (namenode and datanode), start a MySQL server as master with row based replication (you can use mtr rpl suite for testing purposes : $MySQL-5.6/mysql-test$./mtr --start --suite=rpl --mysqld=--binlog_format='ROW' --mysqld=--binlog_checksum=NONE), start hive (optional) and run the executable ./happlier, optionally providing MySQL and HDFS uri's and other available command line options. (./happlier –help for more info).

There are useful filters as command line options to the Hadoop applier. 
Options Use
-r, --field-delimiter=DELIM
 
Use DELIM instead of ctrl-A for field delimiter. DELIM can be a string or an ASCII value in the format '\nnn' .
Escape sequences are not allowed.
Provide the string by which the fields in a row will be separated. By default, it is set to ctrl-A
-w, --row-delimiter=DELIM
 
Use DELIM instead of LINE FEED for row delimiter . DELIM can be a string or an ASCII value in the format '\nnn'
Escape sequences are not allowed.
Provide the string by which the rows of a table will be separated. By default, it is set to LINE FEED (\n)
-d, --databases=DB_LIST
 
DB-LIST is made up of one database name, or many names separated by commas.
Each database name can be optionally followed by table names.
The table names must follow the database name, separated by HYPHENS
 
Example: -d=db_name1-table1-table2,dbname2-table1,dbname3
Import entries for some databases, optionally include only specified tables.
-f, --fields=LIST
 
Similar to cut command, LIST is made up of one range, or many ranges separated by commas.Each range is one of:
N N'th byte, character or field, counted from 1
N- from N'th byte, character or field, to end of line
N-M from N'th to M'th (included) byte,
character or field
-M from first to M'th (included) byte, character or field
Import entries for some fields only in a table



-h, --help Display help
 
Integration with HIVE:
Hive runs on top of Hadoop. It is sufficient to install Hive only on the Hadoop master node.
Take note of the default data warehouse directory, set as a property in hive-default.xml.template configuration file. This must be the same as the base directory into which Hadoop Applier writes.
 
Since the Applier does not import DDL statements; you have to create similar schema's on both MySQL and Hive, i.e. set up a similar database in Hive using Hive QL(Hive Query Language). Since timestamps are inserted as first field in HDFS files,you must take this into account while creating tables in Hive.
 


SQL Query Hive Query
CREATE TABLE t (i INT);
CREATE TABLE t ( time_stamp INT, i INT)
[ROW FORMAT DELIMITED]
STORED AS TEXTFILE;

Now, when any row is inserted into table on MySQL databases, a corresponding entry is made in the Hive tables. Watch the demo to get a better understanding.
The demo is non audio, and is meant to be followed in conjunction with the blog.You can also create an external table in hive and load data into the tables; its your choice!
Watch the Hadoop Applier  Demo >> 
 
Limitations of the Applier:
In the first version we support WRITE_ROW_EVENTS, i.e. only insert statements are replicated.
We have considered adding support for deletes, updates and DDL's as well, but they are more complicated to handle and we are not sure how much interest there is in this.
We would very much appreciate your feedback on requirements - please use the comments section of this blog to let us know!
 
The Hadoop Applier is compatible with MySQL 5.6, however it does not import events if binlog checksums are enabled. Make sure to set them to NONE on the master, and the server in row based replication mode.
 
This innovation includes dedicated contribution from Neha Kumari, Mats Kindahl and Narayanan Venkateswaran. Without them, this project would not be a success.
 
Give it a try! You can download a copy from http://labs.mysql.com and get started NOW!
  • 大小: 103.6 KB
分享到:
评论

相关推荐

    MySQL 8 for Big Data-Packt Publishing(2017).pdf

    It will cover real-time use case scenarios to explain integration and achieving Big Data solutions using different technologies such as Apache Hadoop, Apache Sqoop, and MySQL Applier. The book will ...

    mysql-kafka-applier:用于kafka的mysql realtime-binlog

    MySQL Kafka应用程序 用于kafka的mysql realtime-binlog 要求 MySQL Binlog事件1.0.0 librdkafka MySQL 5.7.X(二进制和源代码) 安装 跑步

    MySQL 8数据库复制技术介绍.pptx

    * Threaded applier:用于异步应用变化。 * Persistent log buffer:用于存储变化的日志记录。 * Capture statements or data changes:用于捕获变化的语句或数据变化。 MySQL 8 数据库复制技术的工作流程包括: ...

    kube-applier:kube-applier为您的Kubernetes集群实现自动部署和声明式配置

    库伯应用程序 kube-applier是一项服务,可通过将声明性配置文件从Git存储库应用到Kubernetes集群,从而实现Kubernetes对象的连续部署。 kube-applier在您的集群中作为Pod运行,并监视以确保集群对象及其存储库中的...

    lvs+keepalived+mha+mysql架构最佳部署手册

    - `masterha_relay_logs_applier`: 应用中继日志。 - `masterha_binlog_replayer`: 重放二进制日志。 - **Node工具包**负责执行具体的故障切换任务,主要包括: - `masterha_node`:用于监控主服务器的状态,...

    MySQL_57_Replication_Enhancements

    在一个典型的MySQL复制设置中,主要包括以下三个角色:插入客户端(Insert Client),发送线程(Sender thread),接收线程(Receiver thread)以及应用线程(Applier Thread)。在复制过程中,主服务器(Master)上...

    feature-change-applier:在历时语音过程建模中的应用

    Feature Change Applier是将系统声音更改规则应用于输入词典的工具。 特征: 基于功能的电话定义基于功能的声音更改规则支持多字符电话支持多个规则集的比较运行什么是LATL?LATL是一种针对JavaScript的编译语言,...

    基于MySQL组复制技术数据备份策略实现.pdf

    隔离核心服务层是MySQL组复制技术的最上层,API接口层是用户访问核心层的接口,核心服务插件层包括capture、applier、recovery三个组件,Replication协议层模块实现了replication协议的逻辑,Group Communication ...

    MySQL MGR 有哪些优点

    MySQL Group Replication (MGR) 是MySQL 5.7版本引入的一个强大特性,它提供了一种高可用性和可扩展性的解决方案。MGR允许在集群中的多个节点之间进行数据复制,确保每个节点的数据保持同步,实现了真正的多主模式,...

    2020_05_22_mysql_复制问题处理全集.docx

    ### MySQL GTID复制问题处理全集 #### 一、引言 MySQL的复制功能是数据库运维中的重要组成部分,尤其在高可用性和数据一致性方面发挥着关键作用。然而,在实际操作过程中,由于各种因素可能会遇到复制中断的问题。...

    patch_applier:小型库,使用Java反射来合并“源”和“目标”对象

    使用Java反射合并“源”和“目标”对象的小型库。 可以在需要部分更新对象的情况下使用。 实际用例:REST API PATCH方法 好处: 易于使用 没有第三方依赖性 很小 如何使用: 看 ... ObjectMerger.mergerOf(source,...

    简述mysql监控组复制

    【MySQL 监控组复制】是MySQL高可用性与容错性的重要特性,它允许数据库集群中的多个服务器间实现数据的实时同步,确保在单个服务器出现故障时,整个系统仍能正常运行。监控组复制主要是通过对集群内各成员的状态...

    Licence Applier-开源

    《Licence Applier:自动化开源软件许可管理工具》 在当今的开源软件世界中,遵循正确的许可协议至关重要,它不仅保护了开发者的权益,也确保了用户和贡献者能够合法地使用、修改和分发软件。"Licence Applier" 是...

    CorePatch:通用补丁程序生成器和应用程序,例如使用BsDiffBsPatch和Google Archive Patch

    核心补丁 通用补丁程序生成器和应用程序,例如使用BsDiff / BsPatch和Google存档补丁程序。... compile "io.github.lizhangqu:corepatch-core-applier:1.0.4" } 专家 //for generator &lt;group

    详解MySQL 5.7 MGR单主确定主节点方法

    MySQL 5.7版本引入了MySQL Group Replication(MGR)功能,这是一个高可用性、分布式复制方案,允许多个MySQL服务器实例组成一个复制组,并在组内进行故障自动转移和数据一致性处理。在单主模式下,MGR会自动选举出...

    阿里巴巴开源的Oracle数据迁移同步工具yugong.zip

    yugong 是阿里巴巴推出的去Oracle数据迁移同步工具(全量 增量,目标支持MySQL/DRDS)。2008年左右,阿里巴巴开始尝试MySQL的... applier (将数据更新到目标库,可分为全量/增量/对比的实现) 标签:阿里巴巴

    rapidmain tutorial

    RapidMiner支持连接多种类型的数据库,包括MySQL、Oracle等,以便直接从数据库加载数据。 ### 3. 第一次使用 #### 3.1 首个示例 首次使用RapidMiner时,建议从简单的示例开始,例如加载数据集、进行基本的数据...

    MGR Best Practice - 娄帅 - 20210522.pdf

    MySQL 8.0的MGR(MySQL Group Replication)最佳实践是针对数据库的高可用性和数据一致性问题提出的解决方案。本文将详细介绍MGR的架构、推荐配置、监控、部署方案以及相关的知识点。 首先,让我们来理解MGR的概念...

    openstack-watcher-applier-3.0.2-1.el7.noarch.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

Global site tag (gtag.js) - Google Analytics