`
huangjinjin520
  • 浏览: 70815 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

JAVA通过Gearman实现MySQL到Redis的数据同步(异步复制)

阅读更多
MySQL到Redis数据复制方案

无论MySQL还是Redis,自身都带有数据同步的机制,像比较常用的 MySQL的Master/Slave模式 ,就是由Slave端分析Master的binlog来实现的,这样的数据复制其实还是一个异步过程,只不过当服务器都在同一内网时,异步的延迟几乎可以忽略。

那么理论上我们也可以用同样方式,分析MySQL的binlog文件并将数据插入Redis。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于 binlog存在Statement/Row/Mixedlevel多种形式 ,分析binlog实现同步的工作量是非常大的。

因此这里选择了一种开发成本更加低廉的方式,借用已经比较成熟的MySQL UDF,将MySQL数据首先放入Gearman中,然后通过一个自己编写的PHP Gearman Worker,将数据同步到Redis。比分析binlog的方式增加了不少流程,但是实现成本更低,更容易操作。

Gearman的安装与使用

Gearman 是一个支持分布式的任务分发框架。设计简洁,获得了非常广泛的支持。一个典型的Gearman应用包括以下这些部分:






Gearman Job Server:Gearman核心程序,需要编译安装并以守护进程形式运行在后台
Gearman Client:可以理解为任务的收件员,比如我要在后台执行一个发送邮件的任务,可以在程序中调用一个Gearman Client并传入邮件的信息,然后就可以将执行结果立即展示给用户,而任务本身会慢慢在后台运行。
Gearman Worker:任务的真正执行者,一般需要自己编写具体逻辑并通过守护进程方式运行,Gearman Worker接收到Gearman Client传递的任务内容后,会按顺序处理。
以前曾经介绍过类似的 后台任务处理项目Resque 。两者的设计其实非常接近,简单可以类比为:

Gearman Job Server:对应Resque的Redis部分
Gearman Client:对应Resque的Queue操作
Gearman Worker:对应Resque的Worker和Job
这里之所以选择Gearman而不是Resque是因为Gearman提供了比较好用的MySQL UDF,工作量更小。


1、安装依赖
yum install -y boost-devel gperf libevent-devel libuuid-devel
yum install mysql-devel -y
2、下载gearman
wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz
3、编译安装,指定mysqlclient的链接路径
  tar -zxvf gearmand-1.1.12.tar.gz
  cd gearmand-1.1.12
   ./configure 
make && make install

4、启动gearmand服务端 (启动之时,在/var/log/下创建gearmand.log日志文件。-l 指定日志文件  -d后台运行 -L 0.0.0.0 绑定到IPV4
gearmand -L 0.0.0.0 -l /var/log/gearmand.log -d
5、查看是否启动成功
ps -ef | grep gearman
6、查看是否安装成功,查看gearman版本信息
gearmand -V

7、MySQL UDF + Trigger同步数据到Gearman (https://github.com/mysqludf)
安装lib_mysqludf_json(lib_mysqludf_json可以把MySQL表的数据以json数据格式输出)
wget https://github.com/mysqludf/lib_mysqludf_json/archive/master.zip
unzip master.zip
cd lib_mysqludf_json-master/
rm -rf lib_mysqludf_json.so
8、编译 mysql_config 这是mysql的配置文件,可以 find /usr -name mysql_config 搜索下在什么位置
gcc $(/usr/local/mysql/bin/mysql_config  --cflags) -shared -fPIC -o lib_mysqludf_json.so lib_mysqludf_json.c
9、拷贝lib_mysqludf_json.so到MySQL的plugin目录
(可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置)
cp lib_mysqludf_json.so /usr/local/mysql/lib/plugin/

演示lib_mysqludf_json功能
登录mysql
mysql -uroot -h127.0.0.1 -p
注册UDF函数
CREATE FUNCTION json_object RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_array RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_members RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_values RETURNS STRING SONAME "lib_mysqludf_json.so";
//json_array|json_members|json_values函数注册方式与json_object一样.
select json_object(id,file_save_type,base_dir) as sys_file_save_config from sys_file_save_config;
ERROR 1123 (HY000): Can't initialize function 'json_object'; Invalid json member name - name cannot be empty
以上错误这样解决,给每个成员名称使用别名即可:
select json_object(id as id ,file_save_type as fileSaveType,app_id as appID) as sys_file_save_config from sys_file_save_config;


10、安装gearman-mysql-udf (https://launchpad.net/gearman-mysql-udf)
wget https://launchpad.net/gearman-mysql-udf/trunk/0.6/+download/gearman-mysql-udf-0.6.tar.gz
  tar zxvf gearman-mysql-udf-0.6.tar.gz
   cd gearman-mysql-udf-0.6
11、安装libgearman-devel
   yum install libgearman-devel -y
   如果没有yum源,添加epel.repo yum源
  [epel]
name=Extra Packages for Enterprise Linux 6 - $basearch
#baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch
mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-6&arch=$basearch
failovermethod=priority
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6

[epel-debuginfo]
name=Extra Packages for Enterprise Linux 6 - $basearch - Debug
#baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch/debug
mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-debug-6&arch=$basearch
failovermethod=priority
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
gpgcheck=1

[epel-source]
name=Extra Packages for Enterprise Linux 6 - $basearch - Source
#baseurl=http://download.fedoraproject.org/pub/epel/6/SRPMS
mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-source-6&arch=$basearch
failovermethod=priority
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
gpgcheck=1
   
12、编译安装
(可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置, mysql_config的配置文件,以及插件库所在路径,编译之后会在此路径生成.so文件)
./configure --with-mysql=/usr/local/mysql/bin/mysql_config --libdir=/usr/local/mysql/lib/plugin/
make && make install

演示gearman-mysql-udf功能
mysql -uroot -p
CREATE FUNCTION gman_do_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_servers_set RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_do RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_do_high RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_do_low RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_do_high_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_do_low_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_sum RETURNS STRING SONAME "libgearman_mysql_udf.so";
//函数gman_do|gman_do_high|gman_do_low|gman_do_high_background|gman_do_low_background|gman_sum注册方式类似,请参考gearman-mysql-udf-0.6/README
//指定gearman job server地址
SELECT gman_servers_set('127.0.0.1:4730');

如果出现异常信息:
ERROR 1126 (HY000): Can't open shared library 'libgearman_mysql_udf.so' (errno: 11 libgearman.so.8: cannot open shared object file: No such file or directory)
表示系统找不到 libgearman.so 文件,一般so都在/usr/local/lib目录下,修改配置文件/etc/ld.so.conf,将/usr/local/lib目录加入进去即可:
$ cat /etc/ld.so.conf
include ld.so.conf.d/*.conf
/usr/local/lib
$ /sbin/ldconfig -v | grep gearman*

13、MySQL Trigger调用Gearman UDF实现同步
创建触发器
DELIMITER $$
CREATE TRIGGER test_data_to_redis AFTER UPDATE ON test FOR EACH ROW BEGIN
    SET@ret=gman_do_background('syncToRedis', json_object(NEW.id AS `id`, NEW.phone AS`phone`));
END$$;

DELIMITER $$
CREATE TRIGGER test_data_to_redis2 AFTER INSERT ON test
  FOR EACH ROW BEGIN
    SET @ret=gman_do_background('syncToRedis2', json_object(NEW.id AS `id`, NEW.phone AS`phone`));
  END$$
DELIMITER ;

DELIMITER $$
CREATE TRIGGER test_data_to_redis3 BEFORE DELETE ON test
  FOR EACH ROW BEGIN
    SET @ret=gman_do_background('syncToRedis3', json_object(OLD.id AS `id`, OLD.phone AS`phone`));
  END$$
DELIMITER ;

  说明以及问题:此类采用了gearman官网的java-gearman-service(地址:https://launchpad.net/gearman-java),目前release版本是0.6.6。java-gearman-servic.jar包中,即包括gearman server,还包括client和work客户端API。
  问题:config类为spring注入的配置文件类,在worker.addFunction中,如果通过config类的属性,并且属性是从配置文件来的就会有问题。不知道为啥,写死就是OK的。此类连接远程的gearman job server。
 
  jar包需要添加到本地jar仓库:
mvn install:install-file -Dfile=C:\software\java-gearman-service-0.6.6.jar -DgroupId=org.gearman.jgs -DartifactId=java-gearman-service -Dversion=0.6.6 -Dpackaging=jar

import java.util.concurrent.TimeUnit;

import org.gearman.Gearman;
import org.gearman.GearmanFunction;
import org.gearman.GearmanFunctionCallback;
import org.gearman.GearmanServer;
import org.gearman.GearmanWorker;

/**
* *ECHO_HOST = "192.168.125.131"为安装了Gearman并开启geramand服务的主机地址
*int ECHO_PORT = 4730默认端口为4730
*
* @author Administrator
*
*/
public class EchoWorker implements GearmanFunction {

// function name
public static final String ECHO_FUNCTION_NAME = "syncToRedis";

// job server地址
public static final String ECHO_HOST = "192.168.1.245";

// job server监听的端口
public static final int ECHO_PORT = 4730;

public static void main(String[] args) {
// 创建一个Gearman实例
Gearman gearman = Gearman.createGearman();
/*
* 创建一个jobserver
*
* Parameter 1: job server的IP地址 Parameter 2: job server监听的端口
*
* job server收到client的job,并将其分发给注册worker
*
*/
GearmanServer server = gearman.createGearmanServer(EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
// 创建一个Gearman的worker
GearmanWorker worker = gearman.createGearmanWorker(); // 正题来了,创建work节点。
worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间
worker.setMaximumConcurrency(5); // 最大并发数
// 告诉工人如何执行工作(主要实现了GearmanFunction接口)
worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
// worker连接服务器
worker.addServer(server);
}

@Override
public byte[] work(String function, byte[] data, GearmanFunctionCallback callback) throws Exception {
// work方法实现了GearmanFunction接口中的work方法,本实例中进行了字符串的反写
if (data != null) {
String str = new String(data);
System.out.println(str);
StringBuffer sb = new StringBuffer(str);
return sb.reverse().toString().getBytes();
} else {
return "未接收到data".getBytes();
}
}
}


import org.gearman.Gearman; 
import org.gearman.GearmanClient; 
import org.gearman.GearmanJobEvent; 
import org.gearman.GearmanJobReturn; 
import org.gearman.GearmanServer; 
 
public class EchoClient { 
    public static void main(String... args) throws InterruptedException { 
            //创建一个Gearman实例 
            Gearman gearman = Gearman.createGearman(); 
            //创建一个Gearman client              
            GearmanClient client = gearman.createGearmanClient(); 
            /* 
             * 创建一个jobserver 
             *  
             * Parameter 1: job server的IP地址 
             * Parameter 2: job server监听的端口 
             *  
             *job server收到client的job,并将其分发给注册worker 
             * 
             */ 
            GearmanServer server = gearman.createGearmanServer( 
                            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT); 
             // 告诉客户端,提交工作时它可以连接到该服务器 
            client.addServer(server); 
            /* 
             * 向job server提交工作 
             *  
             * Parameter 1: gearman function名字 
             * Parameter 2: 传送给job server和worker的数据 
             *  
             * GearmanJobReturn返回job发热结果 
             */ 
            GearmanJobReturn jobReturn = client.submitJob( 
                            EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes()); 
            //遍历作业事件,直到我们打到最后文件              
            while (!jobReturn.isEOF()) { 
 
                    //下一个作业事件 
                    GearmanJobEvent event = jobReturn.poll(); 
 
                    switch (event.getEventType()) { 
 
                    case GEARMAN_JOB_SUCCESS:     //job执行成功 
                            System.out.println(new String(event.getData())); 
                            break; 
                    case GEARMAN_SUBMIT_FAIL:     //job提交失败 
                         
                    case GEARMAN_JOB_FAIL:        //job执行失败 
                            System.err.println(event.getEventType() + ": " 
                                            + new String(event.getData())); 
                    default: 
                    } 
            } 
            //关闭 
            gearman.shutdown(); 
    } 


http://gearman.org/download/
php方案:https://www.tuicool.com/articles/B7Jjaa






  • 大小: 39.9 KB
  • 大小: 25.7 KB
分享到:
评论

相关推荐

    通过Gearman来同步Mysql和Redis

    ### 通过Gearman实现MySQL与Redis的同步 在IT领域,数据的一致性和实时性是系统设计中的关键因素之一。为了提升系统的响应速度并减轻主数据库的压力,常常需要使用缓存技术。本篇将深入探讨如何利用Gearman、PHP、...

    gearman-mysql-udf-0.6.tar.gz

    这个 "gearman-mysql-udf-0.6" 压缩包是将 Gearman 强大的分布式处理能力引入到 MySQL 数据库的一个实例,通过这种方式,开发者可以构建更加灵活和高效的数据库应用。理解并熟练使用 Gearman 和 MySQL UDF,能帮助...

    java-gearman-service-0.6.6.zip

    java-gearman-service-0.6.6.zip 包,gearman分为3部分,client - server - worker,创建 java 版本的client和worker部分。 其实在gearman中,client和worker的编写不复杂,但是不同厂商提供的API是不大相同的,本...

    java-gearman-service(gearman-java-service)

    java实现gearman的job实现的jar包,包括gearman server,client和work客户端API

    MySQL数据复制部署与维护

    MySQL数据复制是数据库管理员必须掌握的关键技术之一,它可以在多个服务器之间同步数据,保证数据的一致性,提高数据的安全性,同时能够分担负载,提高系统的性能。接下来,我们将详细探讨MySQL数据复制的部署与维护...

    java-gearman-service-0.6.6.jar

    java实现gearman的job实现的jar包,包括gearman server,client和work客户端API

    java-gearman-service-0.6.6.jar gearman java调度工具包

    2. **工作者接口**:开发者可以通过实现特定接口来创建自己的工作者,这些工作者可以监听Gearman服务器的任务,并在接收到任务后进行处理。 3. **连接管理**:库自动处理与Gearman服务器的连接,包括建立、保持和...

    java-gearman-service jar

    gearman的java库有两个,一个是gearman service ,一个是gearman java,相比来说service版本更好用一些,并且网上的教程一般是用的这个版本。因此我打好了gearman service的包提供给需要的开发者使用。

    Gearman java APIs和一个小Demo

    本篇文章将深入探讨 Gearman 的 Java APIs,并通过一个小 Demo 展示其用法。 ### Gearman 的基本概念 1. **Worker**:工作进程,负责接收并执行 Gearman 服务器分发的任务。 2. **Client**:客户端,用于提交任务...

    gearman-java.zip_BadMagicException_Gearman java

    Gearman-Java是Java语言对Gearman的客户端库,它提供了与Gearman服务器通信的能力,用于分发工作负载或者执行异步任务。 在"gearman-java.zip_BadMagicException_Gearman java"这个问题中,`BadMagicException`是一...

    gearman下载gearman下载

    Gearman是一款开源的分布式任务队列系统,它允许应用程序在多台机器上分发工作负载,从而实现异步处理和负载均衡。这个系统的重点在于解耦任务的发起者和执行者,使得系统能够灵活地扩展并提高处理能力。在本讨论中...

    gearman + mysql方式实现持久化操作示例

    Gearman 是一个分布式任务队列系统,它允许应用程序将任务分发到多个处理者,以实现负载均衡和异步处理。然而,Gearman 的默认配置下,Job Server 的工作队列仅存储在内存中,这意味着当服务器重启或崩溃时,未处理...

    GearMAN讲解及所带来的变革

    GearMAN的主要特点包括多方式支持(支持PHP、Java、Python等多种编程语言)、持久化支持(异步任务支持扩展持久化)、多线程并发处理请求(提高任务处理效率)、TCP通信(相比HTTP请求节省系统资源)、简单的接入和...

    Gearman C# API和示例

    这展示了Gearman如何在C#环境中实现异步任务处理和负载均衡。 在实际应用中,Gearman可以用于各种场景,例如批量数据处理、图片缩略图生成、电子邮件发送等耗时或计算密集型任务。通过 Gearman C# API,你可以轻松...

    gearman 文档

    - **异步处理**:例如评论发布或用户状态更新等操作,可以通过Gearman异步地进行处理,以减轻主服务器的压力。 - **批量任务**:如定期备份数据、清理过期文件等任务,也可以通过Gearman进行高效地批量处理。 - *...

    Laravel开发-php-gearman

    在 Laravel 中集成 Gearman,你可以创建自定义的 Job 类来封装需要异步处理的任务,然后使用 Gearman 客户端将这些任务发送到 Gearman 服务器。下面是一些关键步骤: #### 安装扩展 首先,你需要通过 Composer ...

    php使用gearman进行任务分发操作实例详解

    Gearman是一个工作负载分发服务器和库,允许将工作分散到多个机器或者机器上的多个核心上进行异步处理。它在Web应用程序中处理耗时的后台任务时尤其有用,如文件上传、邮件发送、图片处理等。 2. PHP Gearman扩展...

    Gearman环境搭建资料

    Gearman是一种分布式任务队列系统,它允许应用程序在不同的服务器之间分发工作负载,从而实现负载均衡和异步处理。本篇文章将详细讲解如何在Linux环境中搭建Gearman,包括依赖库的安装和Gearman服务的配置。 首先,...

    Gearman Worker实例 C++ vs2008

    Gearman是一个分布式任务队列系统,它允许应用程序将工作分发到多个服务器或进程,以实现负载均衡和异步处理。在本实例中,我们关注的是如何在C++环境中,利用Visual Studio 2008(VS2008)在Windows平台上创建一个...

Global site tag (gtag.js) - Google Analytics