`

Elasticsearch Data Migration

阅读更多

本文介绍ES的数据迁移方案:

 

由于ES更新速度比较快,很大程度上, 我们需要更新版本、插件、甚至更新分词器, 单纯的upgrade很有可能不能满足业务需求, 更坏的情况下, 可能需要重建索引。本文从Java API 的角度来介绍ES的数据迁移(或数据重新索引)。基于以下逻辑实现,个人已测试过2亿数据的迁移,可以放心使用。

 

1. 获取clientl连接。本文选择transportClient。

public class ClientUtil
{

    static Settings defaultSettings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", false).put("client.transport.ping_timeout","10s").build(); //如果你的集群node数量是稳定的,那么最好关闭sniff。 同时, 将ping时间设置高于默认5s, 很大程序上可以解决No Node available exception.
 
    // 创建私有对象
    private static TransportClient targetClient;
    
    private static TransportClient sourceClient;
 
    static {
        try {
            Class<?> clazz = Class.forName(TransportClient.class.getName());
            Constructor<?> constructor = clazz.getDeclaredConstructor(Settings.class);
            constructor.setAccessible(true);
		    Settings finalSettings = ImmutableSettings.settingsBuilder()
		                .put(defaultSettings)
		                .build();
		    targetClient = (TransportClient) constructor.newInstance(finalSettings);
		    targetClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.100", 9300))
		    			.addTransportAddress(new InetSocketTransportAddress("192.168.1.101", 9300));
		    sourceClient = (TransportClient) constructor.newInstance(finalSettings);
		    sourceClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.110", 9300))
		                .addTransportAddress(new InetSocketTransportAddress("192.168.1.111", 9300));
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
 
    // 取得源实例
    public static synchronized Client getSourceTransportClient() {
        return sourceClient;
    }
    
    // 取得目标实例
    public static synchronized Client getTargetTransportClient() {
        return targetClient;
    }

}

 

以上代码用于获取源cluster和目标cluster的client.

 

2.迁移主方法:

private void doMigrate(Client sourceclient, Client targetclient, String sourceIndexName, String targetIndexName, String indexDocType, int pageSize)
	{
		int total = 0;
		SearchResponse searchResponse = sourceclient.prepareSearch(sourceIndexName).setSearchType(SearchType.SCAN)
				.setQuery(matchAllQuery()).setSize(pageSize).setScroll(TimeValue.timeValueSeconds(20)).execute()
				.actionGet(); //scroll 的time不能太大, 以免对集群造成负载
		boolean exists = targetclient.admin().indices().prepareExists(targetIndexName).execute().actionGet().isExists();
		if (!exists)
			targetclient
					.admin()
					.indices()
					.prepareCreate(targetIndexName)
					.setSettings(
							settingsBuilder().put("index.number_of_replicas", 0).put("index.refresh_interval", "-1"))
					.execute().actionGet(); //设置replica为0, 不refresh, 为了提高索引速度。
		try
		{
			Thread.sleep(200);
		} catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		BulkProcessor bulkProcessor = BulkProcessor.builder(targetclient, new BulkProcessor.Listener()
		{

			@Override
			public void beforeBulk(long executionId, BulkRequest request)
			{

			}

			@Override
			public void afterBulk(long executionId, BulkRequest request, BulkResponse response)
			{
				if (response.hasFailures())
				{
					throw new RuntimeException("BulkResponse show failures: " + response.buildFailureMessage());
				}
			}

			@Override
			public void afterBulk(long executionId, BulkRequest request, Throwable failure)
			{
				throw new RuntimeException("Caught exception in bulk: " + request + ", failure: " + failure, failure);
			}
		}).setConcurrentRequests(10).build(); //设置线程数量, 大小可以根据自己机器调配。

		while (true)
		{
			searchResponse = sourceclient.prepareSearchScroll(searchResponse.getScrollId())
					.setScroll(TimeValue.timeValueSeconds(20)).execute().actionGet();
			for (SearchHit hit : searchResponse.getHits())
			{

				IndexRequestBuilder indexRequestBuilder = targetclient.prepareIndex(targetIndexName, indexDocType);
				indexRequestBuilder.setSource(hit.getSource());
				indexRequestBuilder.setId(hit.getId());
				indexRequestBuilder.setOpType(IndexRequest.OpType.INDEX);
				bulkProcessor.add(indexRequestBuilder.request());
				total++;
			}
			System.out.println("Already migrated : " + total + " records!");
			if (searchResponse.getHits().hits().length == 0)
			{
				break;
			}
		}
		try
		{
			Thread.sleep(10000);//Sleep 10s waiting the cluster.
		} catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		bulkProcessor.close();
		targetclient
		.admin()
		.indices().prepareUpdateSettings(targetIndexName).setSettings(
				settingsBuilder().put("index.number_of_replicas", 1).put("index.refresh_interval", "1s"))
		.execute().actionGet();
	}

 

 3.测试:

public static void main(String[] args) throws ElasticSearchException, IOException, InterruptedException
	{
		int pageSize = 40; //分页大小, 不能过大, 太大影响集群性能, 可能引起no node 异常。
		Client sourceclient = ClientUtil.getSourceTransportClient();
		Client targetclient = ClientUtil.getTargetTransportClient();
	        //调用doMigrate方法。 doMigrate(sourceclient, targetclient, "test", "testnew", "test", pageSize);}
分享到:
评论

相关推荐

    springboot集成elasticSearch.zip

    Spring Data Elasticsearch提供了便捷的Repository接口,我们可以定义一个接口继承`ElasticsearchRepository`,然后声明对应的操作方法,如查询、插入、更新和删除。 ```java public interface UserRepository ...

    mlbs2p2es

    我们可以根据标题推断出可能的相关技术标签,例如:机器学习(Machine Learning)、二进制序列化(Binary Serialization)、PostgreSQL数据库管理、Elasticsearch集成、数据迁移(Data Migration)以及数据检索...

    magento2.45安装教程和数据迁移教程

    - 使用 Magento Data Migration Tool,遵循官方文档的步骤进行数据迁移。 - 迁移过程包括结构迁移、数据迁移和最后的优化步骤。 10. **错误排查**: - 在安装或迁移过程中可能会遇到各种问题,如权限错误、依赖...

    数据湖技术在汽车行业的应用.pptx

    进一步的数据丰富和分析可能涉及Amazon Elasticsearch Service、Amazon Redshift、Amazon Neptune和AWS Glue。最后,通过Amazon SageMaker进行模型训练和监控,以及Amazon AppSync实现数据同步。 整体来看,AWS提供...

    数据湖技术在汽车行业的应用.pdf

    进一步的数据丰富过程可能涉及Amazon EMR(Elastic Map Reduce),DynamoDB用于实时数据存储,而Amazon Elasticsearch Service和Neptune则提供了搜索和图数据库功能。AWS Glue用于ETL作业,Amazon SageMaker进行模型...

    SAP PO/PI教程 Process Orchestration The Comprehensive Guide

    7.2.5 User-Defined Message Search 7.3 Troubleshooting 7.3.1 Configuring Log and Traces 7.3.2 Using the Log Viewer 7.4 Summary 8 Migrating Interfaces from SAP PI Dual Stack to SAP PO 8.1 ...

    spring-boot-reference.pdf

    Connecting to Elasticsearch by Using Spring Data 30.6.3. Spring Data Elasticsearch Repositories 30.7. Cassandra 30.7.1. Connecting to Cassandra 30.7.2. Spring Data Cassandra Repositories 30.8. ...

    sigmod2011全部论文(1)

    Zephyr: Live Migration in Shared Nothing Databases for Elastic Cloud Platforms (Page 301) Aaron J. Elmore (University of California, Santa Barbara) Sudipto Das (University of California, Santa Barbara...

    AWS数据库上云最佳实践.pptx

    6. **Elasticsearch Service**:用于运营分析,提供实时、可扩展的搜索和分析功能。 7. **Aurora**:AWS的高可用、高性能的关系型数据库,兼容MySQL和PostgreSQL,具有与商业数据库相当的性能,但成本更低。 8. **...

    AWS数据库服务发展概述.pptx

    在数据分析领域,AWS提供了数据仓库服务如Amazon Redshift,以及支持实时流处理的数据服务如Kinesis Data Streams,配合Elasticsearch、Hadoop/Spark等工具,帮助企业实现大规模数据的存储、分析和洞察。 在安全性...

Global site tag (gtag.js) - Google Analytics