`

用java代码来定时增量同步数据库表的实现代码

阅读更多
import java.io.Serializable;
import java.util.List;

/**
* Created by Administrator on 2017/4/19.
* 这是基于是单线程来执行同步   千万不允许多线程执行    多线程执行太难写了  放弃吧!!!!!
* 这里我们考虑有时间因素、以及mysql mvcc非锁定读的因素。
* 时间因素我们这样解决:1、以主服务器的时间为准。而非本地时间。本地时间快与慢不会影响同步功能
*                       2、主服务器可以调整它的时间,可以向前(改小时间)或向后(改大时间)调整,程序都是支持的。
*                              假设我们的同步时间是1小时同步一次,事物最大超时时间是5分钟(就是没有事物可以超过5分钟)。假设当前主服务器时间
*                              是: 08:00 , 那么上次同步时间是:06:55
*                              2.1、向前调整:假设向前调整5小时,那么是 03:00 ,   当我们同步的时候,获取主服务器的时间是 X ( 03:00 <= X <= 04:00) 因为我们的同步时间间隔是1小时
*                                   假设X=03:56 ,  那么min(06:55+01:00,03:56-01:00-00:05)=02:51   那么我们此次的同步时间将会是:02:51,那么是正确同步时间。
*                                   下次认为的上次同步时间将会是:02:51
*                              2.2、向后调整:假设向后调整5小时,那么是13:00,    当我们同步的时候,获取主服务器的时间是:Y (13:00 <= Y <= 14:00) 同理。
*                                   假设Y=13:56,那么min(06:55+01:00,13:56-01:00-00:05)=07:55   那么我们此次的同步时间将会是:07:55,那么是正确同步时间。
*                                   下次认为的上次同步时间将会是:13:56-01:00-00:05 = 12:51   这个时候时间就一下进步了很多了(跟上时间的步伐)。
*
* <T>是id类型
*/
public abstract class BaseSyncUserInfoTaskImpl<T extends Serializable> implements ISyncUserInfoTask{

    //同步时间间隔  可以稍微大点(比真实在定时任务的执行中的间隔大,但是千万别小于他,等于定时任务执行间隔最好)
    private int sync_time_interval_in_milsecond = 10*60*1000;

    //事物处理最长时间  建议同步时间间隔大于此时间
    private int tx_time_out_in_milsecond = 5*60*1000;

    //上次同步时间
    private Long up_sync_time = null;
    //本次同步时间
    private Long this_sync_time = null;

    /**
     * 执行总体架构
     */
    @Override
    public final void sync() {

        //获取同步时间  与主服务器商定同步时间
        long nowSyncTime = getSyncTime();

        //开始数据同步
        syncDatas(nowSyncTime);

        //同步数据仅仅解决更新与插入的问题   这里去解决删除的问题
        //有些表不会存在删除操作,这里对那些不需要删除的表直接跳过
        if(isNeedDel)
            syncDel();

        //这个放到最后   怕事物回滚   而时间没有被回滚  导致下次同步时,up_sync_time不正确
        updateUpSyncTime();
    }

    //加synchronized 仅仅是怕jvm优化  导致语句重排   一定要在最后来更新这个时间
    protected synchronized  void updateUpSyncTime(){
        up_sync_time = this_sync_time;
    }


    private  void syncDel() {

        //1、本地取全部id集合的摘要   MD5,以及记录数   拿去远程比较,相等则啥都不做
        CommonKVQuery<String,Integer> abstractAndCount = getLocalIdsAbstractAndCount();
        boolean isMach = isMachMasterServer(abstractAndCount);
        if(isMach)
            return ;
        //2、把本地的数据按照id进行分页拿到远程去对比,没有则拿回来进行删除 。
        DefaultPageRequest pageRequest = new DefaultPageRequest();
        pageRequest.setLimit(100);
        pageRequest.setPage(1);
        List<T> ids = null;
        List<T> delIds = null;
        do {
            ids = listLocalIds(pageRequest);
            delIds = getNeedDelIdsFromMasterServer(ids);
            deleteLocalByIds(delIds);
            pageRequest.setPage(pageRequest.getPage()+1);
        } while (ids.size() == pageRequest.getLimit());
    }

    /**
     * 删除本地的数据  通过id集合
     * @param delIds
     */
    protected abstract void deleteLocalByIds(List<T> delIds);

    /**
     * 去远处匹配  找出需要删除的id集合
     * @param ids
     * @return
     */
    protected abstract List<T> getNeedDelIdsFromMasterServer(List ids);

    //分页获取本地id集合
    protected abstract List<T> listLocalIds(DefaultPageRequest pageRequest);

    //去主服务器匹配摘要及记录数
    protected abstract boolean isMachMasterServer(CommonKVQuery<String,Integer> abstractAndCount);

    //获取本地id集合摘要及记录数量
    protected abstract  CommonKVQuery<String,Integer> getLocalIdsAbstractAndCount() ;

    //同步数据   更新时间在指定的时间之后的数据进行更新
    protected abstract void syncDatas(long nowSyncTime) ;

    private long getSyncTime(){

        final long masterServerTime = getMasterServiceTime();
        if(up_sync_time == null){

            up_sync_time = getLocalMaxUpdateTime() - sync_time_interval_in_milsecond;

            //若上次跟新时间为null,表示系统重启,进行第一次同步,那么来一次全量同步
//            this_sync_time = masterServerTime - sync_time_interval_in_milsecond-tx_time_out_in_milsecond;
//            return 0l;

        }

//        min(主服务器时间 - 同步时间间隔(1小时) - 最大事物超时时间(5分钟),上次商定的时间 + 同步时间间隔)
//        这里的5分钟我考虑的是最大事物的用时。就是假定所有事物的时间长度不可以超过5分钟。
//        因为我们在程序中经常是先设置更新时间,然后插入数据库,然后再做些别的(浪费了一些时间),
//        最后提交了事物。那么根据mvcc模式,非锁定读,是读快照。导致更新时间本应该在本次同步中被同步的,而并没有同步到
//(不可见),而下一次的同步时间又大于了这个更新时间。导致会丢失更新。所以每次同步,都多同步5分钟的数据。
//        就怕丢下这种间隙中的数据。
        this_sync_time =  Math.min(up_sync_time + sync_time_interval_in_milsecond,
                masterServerTime-sync_time_interval_in_milsecond-tx_time_out_in_milsecond) ;
        final long result = Math.max(0,this_sync_time);
        //这里的这一次同步时间取值是 主服务器时间-同步时间间隔-事物最大超时时间
        //而舍弃了up_sync_time + sync_time_interval_in_milsecond 这个取值,原因在于让下一次的更新跟上主服务器的时间,不要距离太远
        this_sync_time = masterServerTime - sync_time_interval_in_milsecond-tx_time_out_in_milsecond;
        return result;
    }

    /**
     * 获取本地记录中的最大更新时间
     * @return
     */
    protected abstract long getLocalMaxUpdateTime() ;

    //获取主服务器的当前时间
    protected abstract long getMasterServiceTime();

    /**
     * 表数据是否需要删除操作,不会删除,则可以减少去同步被删的数据
     */
    private boolean isNeedDel = false;

    public void setSync_time_interval_in_milsecond(int sync_time_interval_in_milsecond) {
        this.sync_time_interval_in_milsecond = sync_time_interval_in_milsecond;
    }

    public void setTx_time_out_in_milsecond(int tx_time_out_in_milsecond) {
        this.tx_time_out_in_milsecond = tx_time_out_in_milsecond;
    }

    public void setNeedDel(boolean needDel) {
        isNeedDel = needDel;
    }
}
分享到:
评论

相关推荐

    java使用datax增量同步代码

    Java 使用 DataX 进行增量同步是大数据领域中常见的数据迁移任务,DataX 是阿里开源的一个强大、高效的数据同步工具,它可以...通过以上步骤,你可以将这个功能直接集成到你的项目中,实现数据的实时或定时增量同步。

    JAVA将一个数据中数据定时自动复制(抽取)到另一个数据库

    7. **数据库同步策略**:除了简单的定时全量或增量抽取,还可以考虑使用触发器、中间件或其他工具(如ETL工具)来实现更复杂的同步策略,以满足不同业务需求。 最后,对于实际项目,我们还需要考虑资源管理,如关闭...

    Java实现数据库迁移同步

    总的来说,使用Java实现数据库迁移同步需要对数据库操作、JDBC以及ETL流程有深入的理解。合理的设计和实现可以确保迁移过程的高效、稳定和可靠。在实践中,你还需要考虑到数据安全、性能监控和异常处理等方面,确保...

    JAVA 线程实现数据库的主从同步更新

    在Java编程环境中,...总之,使用Java线程实现数据库主从同步更新是一种常见且实用的技术手段,它涉及到多线程编程、数据库操作、事务管理等多个方面。理解和掌握这些知识点对于开发高可用性的分布式系统至关重要。

    SpringBoot定时任务实现Oracle和mysql数据同步

    - **增量同步**:只同步自上次同步以来发生变化的数据,这需要在数据库中记录每个数据的最后修改时间。 - **全量同步**:每次同步都获取全部数据,适合于数据量不大或者初始同步的情况。 - **异步处理**:为了避免...

    solr同步数据库需要jar包

    使用这些jar包,开发者可以构建一个定制的数据同步程序,根据业务需求定时或实时地同步数据库和Solr索引。确保所有必要的jar包都包含在类路径中,是成功运行同步程序的关键。 总的来说,Solr与数据库的同步是一个...

    java数据实时同步系统

    - **增量同步**:仅同步自上次同步以来发生变化的数据,节省带宽和资源,提高效率。 - **实时/事务同步**:每当源数据库发生事务提交时,立即同步到目标数据库。 4. **技术框架**: 常用的Java数据同步框架有...

    springboot代码整合kettle实现数据增量同步:1、kettle同步文件编写说明;2、java整合代码

    实现数据库的增量同步,需要编写好kettle转换文件以配置文件的形式,整合到springboot,有改动只需要更换转换文件,不需要更新jar包,job用于灵活调整定时同步,再结合jenkins自动部署,十分方便。

    cloudin-datax是基于DataX开发的分布式数据同步工具,提供简单易用的操作界面,可视化定时任务配置监控和增量同步功能

    - **增量同步**:支持基于时间戳或唯一标识的增量数据同步,有效减少不必要的全量迁移,降低资源消耗。 **3. 数据库管理和监控** 在数据库管理领域,Cloudin-DataX提供了有效的解决方案,能够对多个数据库进行统一...

    solr-dataimportscheduler-1.4.jar 增量定时同步数据到solr.rar

    这样,Solr启动时会自动加载这个库,然后配置`data-config.xml`文件以指定数据库连接参数、表名以及增量同步的策略。一旦配置完成,定时同步功能就会生效。 5. **性能优化**: 在实际应用中,为了提高数据同步性能,...

    solr 定时增量更新jar包

    总的来说,"solr 定时增量更新jar包"是Solr实现高效、自动化的数据同步策略的重要工具,通过合理的配置和使用,可以确保Solr索引始终与数据源保持一致,提升系统的搜索性能和用户体验。在实际部署时,用户需要注意...

    oracle数据同步

    在这个项目中,可能使用了基于应用层的解决方案,即通过Java编程来定期或实时地抓取源数据库的更改并同步到目标数据库。Spring框架因其强大的IoC(Inversion of Control)和AOP(Aspect-Oriented Programming)特性...

    mysql基于springboot,vue数据定时同步

    本项目以"mysql基于springboot,vue数据定时同步"为主题,旨在实现MySQL数据库的数据自动、定时地与前端Vue应用程序进行同步。以下是这个项目的核心知识点及详细说明: 1. **MySQL数据库**:MySQL是一个广泛使用的...

    jdbc+webservice定时任务

    这个项目中,开发者利用Java的JDBC(Java Database Connectivity)接口来与Oracle数据库进行交互,执行数据库的备份操作,同时结合WebService技术,将增量备份的表数据发送到服务端,确保数据的一致性和完整性。...

    mysql-connector-java-5.1.7

    这个连接器是用于在Java应用程序中与MySQL数据库进行交互的关键组件,它实现了Java Database Connectivity (JDBC) API,使得Java开发者能够通过编写Java代码来执行SQL查询、操作数据库以及管理MySQL数据库中的数据。...

    基于Java的备份数据源.zip

    "基于Java的备份数据源.zip"这个压缩包文件显然包含了一些与使用Java语言实现数据备份相关的资源和代码示例。下面,我们将深入探讨Java在数据备份领域的应用,以及可能涉及到的关键知识点。 1. **Java连接数据库**:...

    HBase-Elasticsearch数据同步1

    - 示例应用程序包含了使用JDK Timer实现的定时全量同步(`HBase2ESFullDemo`)和增量同步(`HBase2ESIncrementalDemo`),这些工具可以帮助快速构建和发布数据导入工具。 4. **特性与优势** - Bboss的数据同步工具...

    jdbc.rar elasticsearch 与mysql 同步所需要jar包

    8. **实时同步与定时同步**:根据需求,可以选择实时同步(通过监听MySQL的binlog)或定时全量/增量同步。 9. **版本兼容性**:确保使用的是与Elasticsearch和MySQL版本兼容的JDBC驱动,避免因版本不匹配导致的兼容...

    多数据源数据同步,数据接口

    具体同步策略可以根据业务需求选择全量同步、增量同步或是基于版本号的差异同步。 在实际开发中,我们还需要关注数据安全和性能优化。比如,使用JWT进行身份验证和授权,保护数据接口不被非法访问;使用缓存技术...

    Platform_Server.rar_数据同步

    "Source"可能包含源代码文件,这将有助于我们深入理解数据同步的具体实现,包括可能使用的编程语言(如Java、Python或C#)、框架(如Spring Boot或Node.js)以及具体的同步算法(如基于事件驱动、定时任务或数据库...

Global site tag (gtag.js) - Google Analytics