本文介绍ES的数据迁移方案:
由于ES更新速度比较快,很大程度上, 我们需要更新版本、插件、甚至更新分词器, 单纯的upgrade很有可能不能满足业务需求, 更坏的情况下, 可能需要重建索引。本文从Java API 的角度来介绍ES的数据迁移(或数据重新索引)。基于以下逻辑实现,个人已测试过2亿数据的迁移,可以放心使用。
1. 获取clientl连接。本文选择transportClient。
public class ClientUtil { static Settings defaultSettings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", false).put("client.transport.ping_timeout","10s").build(); //如果你的集群node数量是稳定的,那么最好关闭sniff。 同时, 将ping时间设置高于默认5s, 很大程序上可以解决No Node available exception. // 创建私有对象 private static TransportClient targetClient; private static TransportClient sourceClient; static { try { Class<?> clazz = Class.forName(TransportClient.class.getName()); Constructor<?> constructor = clazz.getDeclaredConstructor(Settings.class); constructor.setAccessible(true); Settings finalSettings = ImmutableSettings.settingsBuilder() .put(defaultSettings) .build(); targetClient = (TransportClient) constructor.newInstance(finalSettings); targetClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.100", 9300)) .addTransportAddress(new InetSocketTransportAddress("192.168.1.101", 9300)); sourceClient = (TransportClient) constructor.newInstance(finalSettings); sourceClient.addTransportAddress(new InetSocketTransportAddress("192.168.1.110", 9300)) .addTransportAddress(new InetSocketTransportAddress("192.168.1.111", 9300)); } catch (Exception e) { e.printStackTrace(); } } // 取得源实例 public static synchronized Client getSourceTransportClient() { return sourceClient; } // 取得目标实例 public static synchronized Client getTargetTransportClient() { return targetClient; } }
以上代码用于获取源cluster和目标cluster的client.
2.迁移主方法:
private void doMigrate(Client sourceclient, Client targetclient, String sourceIndexName, String targetIndexName, String indexDocType, int pageSize) { int total = 0; SearchResponse searchResponse = sourceclient.prepareSearch(sourceIndexName).setSearchType(SearchType.SCAN) .setQuery(matchAllQuery()).setSize(pageSize).setScroll(TimeValue.timeValueSeconds(20)).execute() .actionGet(); //scroll 的time不能太大, 以免对集群造成负载 boolean exists = targetclient.admin().indices().prepareExists(targetIndexName).execute().actionGet().isExists(); if (!exists) targetclient .admin() .indices() .prepareCreate(targetIndexName) .setSettings( settingsBuilder().put("index.number_of_replicas", 0).put("index.refresh_interval", "-1")) .execute().actionGet(); //设置replica为0, 不refresh, 为了提高索引速度。 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } BulkProcessor bulkProcessor = BulkProcessor.builder(targetclient, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { throw new RuntimeException("BulkResponse show failures: " + response.buildFailureMessage()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { throw new RuntimeException("Caught exception in bulk: " + request + ", failure: " + failure, failure); } }).setConcurrentRequests(10).build(); //设置线程数量, 大小可以根据自己机器调配。 while (true) { searchResponse = sourceclient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(TimeValue.timeValueSeconds(20)).execute().actionGet(); for (SearchHit hit : searchResponse.getHits()) { IndexRequestBuilder indexRequestBuilder = targetclient.prepareIndex(targetIndexName, indexDocType); indexRequestBuilder.setSource(hit.getSource()); indexRequestBuilder.setId(hit.getId()); indexRequestBuilder.setOpType(IndexRequest.OpType.INDEX); bulkProcessor.add(indexRequestBuilder.request()); total++; } System.out.println("Already migrated : " + total + " records!"); if (searchResponse.getHits().hits().length == 0) { break; } } try { Thread.sleep(10000);//Sleep 10s waiting the cluster. } catch (InterruptedException e) { e.printStackTrace(); } bulkProcessor.close(); targetclient .admin() .indices().prepareUpdateSettings(targetIndexName).setSettings( settingsBuilder().put("index.number_of_replicas", 1).put("index.refresh_interval", "1s")) .execute().actionGet(); }
3.测试:
public static void main(String[] args) throws ElasticSearchException, IOException, InterruptedException { int pageSize = 40; //分页大小, 不能过大, 太大影响集群性能, 可能引起no node 异常。 Client sourceclient = ClientUtil.getSourceTransportClient(); Client targetclient = ClientUtil.getTargetTransportClient(); //调用doMigrate方法。 doMigrate(sourceclient, targetclient, "test", "testnew", "test", pageSize);}
相关推荐
Spring Data Elasticsearch提供了便捷的Repository接口,我们可以定义一个接口继承`ElasticsearchRepository`,然后声明对应的操作方法,如查询、插入、更新和删除。 ```java public interface UserRepository ...
我们可以根据标题推断出可能的相关技术标签,例如:机器学习(Machine Learning)、二进制序列化(Binary Serialization)、PostgreSQL数据库管理、Elasticsearch集成、数据迁移(Data Migration)以及数据检索...
- 使用 Magento Data Migration Tool,遵循官方文档的步骤进行数据迁移。 - 迁移过程包括结构迁移、数据迁移和最后的优化步骤。 10. **错误排查**: - 在安装或迁移过程中可能会遇到各种问题,如权限错误、依赖...
进一步的数据丰富和分析可能涉及Amazon Elasticsearch Service、Amazon Redshift、Amazon Neptune和AWS Glue。最后,通过Amazon SageMaker进行模型训练和监控,以及Amazon AppSync实现数据同步。 整体来看,AWS提供...
进一步的数据丰富过程可能涉及Amazon EMR(Elastic Map Reduce),DynamoDB用于实时数据存储,而Amazon Elasticsearch Service和Neptune则提供了搜索和图数据库功能。AWS Glue用于ETL作业,Amazon SageMaker进行模型...
7.2.5 User-Defined Message Search 7.3 Troubleshooting 7.3.1 Configuring Log and Traces 7.3.2 Using the Log Viewer 7.4 Summary 8 Migrating Interfaces from SAP PI Dual Stack to SAP PO 8.1 ...
Connecting to Elasticsearch by Using Spring Data 30.6.3. Spring Data Elasticsearch Repositories 30.7. Cassandra 30.7.1. Connecting to Cassandra 30.7.2. Spring Data Cassandra Repositories 30.8. ...
Zephyr: Live Migration in Shared Nothing Databases for Elastic Cloud Platforms (Page 301) Aaron J. Elmore (University of California, Santa Barbara) Sudipto Das (University of California, Santa Barbara...
6. **Elasticsearch Service**:用于运营分析,提供实时、可扩展的搜索和分析功能。 7. **Aurora**:AWS的高可用、高性能的关系型数据库,兼容MySQL和PostgreSQL,具有与商业数据库相当的性能,但成本更低。 8. **...
在数据分析领域,AWS提供了数据仓库服务如Amazon Redshift,以及支持实时流处理的数据服务如Kinesis Data Streams,配合Elasticsearch、Hadoop/Spark等工具,帮助企业实现大规模数据的存储、分析和洞察。 在安全性...