- 浏览: 92270 次
- 性别:
- 来自: 长沙
文章分类
- 全部博客 (36)
- 开源框架应用 (5)
- java (2)
- Database (3)
- 杂聊 (0)
- Linux (8)
- Chrome (1)
- centos (6)
- svn (1)
- wiki (1)
- Elasticsearch (9)
- Facet (0)
- Bugzilla (1)
- tomcat集群 (0)
- apache项目 (0)
- GIT (1)
- mongodb集群 (1)
- Elasticsearch插件Mongodb River 安装 (0)
- Elasticsearch Mongodb River (1)
- Elasticsearch Suggest Plugin (0)
- drools (1)
- M9 (1)
- maven (0)
- 狼与兔子 (0)
- Tomcat (1)
- Enonic (0)
- elasticsearch jetty (0)
- nexus (0)
- 序列化传输 (1)
- 批量修改文件后缀 (0)
- BP神经网络 (0)
- Devops (1)
最新评论
-
maxrocray:
两种方式: 1. 配置indexmapping. 这样可以为每 ...
Elasticsearch 10版本插件安装 -
rmn190:
请问下, 用mongo river导数据时, 中文分词成功没? ...
Elasticsearch 10版本插件安装
Elasticsearch mongodb river同步大量数据内存溢出
插件安装见鸿智兄的博客
http://blog.csdn.net/laigood12345/article/details/7691068
安装目前最新的插件命令如下:
1.
./plugin -install elasticsearch/elasticsearch-mapper-attachments/1.6.0
2.
./plugin -install richardwilly98/elasticsearch-river-mongodb/1.4.0
ES mongodb river 的时候, 如果是Mongo中数据量过大,会造成内存溢出。
原因是river 从mong里抓取数据的时候是100M/s. 但是ES索引的速度为5000-10000docs/s.
说明见:https://github.com/richardwilly98/elasticsearch-river-mongodb/issues/30
这样就会导致抓取过快, 索引速度跟不上从而导致OOM.
解决方案:
修改mong river源码。
/* * Licensed to Elastic Search and Shay Banon under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. Elastic Search licenses this * file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.elasticsearch.river.mongodb; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.elasticsearch.client.Requests.indexRequest; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.regex.Pattern; import org.bson.types.BSONTimestamp; import org.bson.types.ObjectId; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.river.AbstractRiverComponent; import org.elasticsearch.river.River; import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.river.RiverName; import org.elasticsearch.river.RiverSettings; import org.elasticsearch.river.mongodb.util.GridFSHelper; import org.elasticsearch.script.ScriptService; import com.mongodb.BasicDBObject; import com.mongodb.Bytes; import com.mongodb.CommandResult; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; import com.mongodb.MongoException; import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.gridfs.GridFS; import com.mongodb.gridfs.GridFSDBFile; import com.mongodb.util.JSON; /** * @author richardwilly98 (Richard Louapre) * @author flaper87 (Flavio Percoco Premoli) * @author aparo (Alberto Paro) * @author kryptt (Rodolfo Hansen) */ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String RIVER_TYPE = "mongodb"; public final static String ROOT_NAME = RIVER_TYPE; public final static String DB_FIELD = "db"; public final static String SERVERS_FIELD = "servers"; public final static String HOST_FIELD = "host"; public final static String PORT_FIELD = "port"; public final static String OPTIONS_FIELD = "options"; public final static String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference"; public final static String FILTER_FIELD = "filter"; public final static String CREDENTIALS_FIELD = "credentials"; public final static String USER_FIELD = "user"; public final static String PASSWORD_FIELD = "password"; public final static String SCRIPT_FIELD = "script"; public final static String COLLECTION_FIELD = "collection"; public final static String GRIDFS_FIELD = "gridfs"; public final static String INDEX_OBJECT = "index"; public final static String NAME_FIELD = "name"; public final static String TYPE_FIELD = "type"; public final static String DB_LOCAL = "local"; public final static String DB_ADMIN = "admin"; public final static String DEFAULT_DB_HOST = "localhost"; public final static int DEFAULT_DB_PORT = 27017; public final static String BULK_SIZE_FIELD = "bulk_size"; [color=red] public final static String THROTTLE_SIZE_FIELD = "throttle_size";[/color] public final static String BULK_TIMEOUT_FIELD = "bulk_timeout"; public final static String LAST_TIMESTAMP_FIELD = "_last_ts"; public final static String MONGODB_LOCAL = "local"; public final static String MONGODB_ADMIN = "admin"; public final static String OPLOG_COLLECTION = "oplog.rs"; public final static String OPLOG_NAMESPACE = "ns"; public final static String OPLOG_OBJECT = "o"; public final static String OPLOG_UPDATE = "o2"; public final static String OPLOG_OPERATION = "op"; public final static String OPLOG_UPDATE_OPERATION = "u"; public final static String OPLOG_INSERT_OPERATION = "i"; public final static String OPLOG_DELETE_OPERATION = "d"; public final static String OPLOG_TIMESTAMP = "ts"; protected final Client client; protected final String riverIndexName; protected final List<ServerAddress> mongoServers = new ArrayList<ServerAddress>(); protected final String mongoDb; protected final String mongoCollection; protected final boolean mongoGridFS; protected final String mongoAdminUser; protected final String mongoAdminPassword; protected final String mongoLocalUser; protected final String mongoLocalPassword; protected final String mongoDbUser; protected final String mongoDbPassword; protected final String mongoOplogNamespace; protected final boolean mongoSecondaryReadPreference; protected final String indexName; protected final String typeName; protected final int bulkSize; protected final int throttleSize; protected final TimeValue bulkTimeout; protected Thread tailerThread; protected Thread indexerThread; protected volatile boolean active = true; [color=red] private final BlockingQueue<Map<String, Object>> stream;[/color] @SuppressWarnings("unchecked") @Inject public MongoDBRiver(final RiverName riverName, final RiverSettings settings, @RiverIndexName final String riverIndexName, final Client client, final ScriptService scriptService) { super(riverName, settings); if (logger.isDebugEnabled()) { logger.debug("Prefix: " + logger.getPrefix() + " - name: " + logger.getName()); } this.riverIndexName = riverIndexName; this.client = client; String mongoHost; int mongoPort; if (settings.settings().containsKey(RIVER_TYPE)) { Map<String, Object> mongoSettings = (Map<String, Object>) settings .settings().get(RIVER_TYPE); if (mongoSettings.containsKey(SERVERS_FIELD)) { Object mongoServersSettings = mongoSettings.get(SERVERS_FIELD); logger.info("mongoServersSettings: " + mongoServersSettings); boolean array = XContentMapValues.isArray(mongoServersSettings); if (array) { ArrayList<Map<String, Object>> feeds = (ArrayList<Map<String, Object>>) mongoServersSettings; for (Map<String, Object> feed : feeds) { mongoHost = XContentMapValues.nodeStringValue(feed.get(HOST_FIELD), null); mongoPort = XContentMapValues.nodeIntegerValue(feed.get(PORT_FIELD), 0); logger.info("Server: " + mongoHost + " - " + mongoPort); try { mongoServers.add(new ServerAddress(mongoHost, mongoPort)); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } else { mongoHost = XContentMapValues.nodeStringValue( mongoSettings.get(HOST_FIELD), DEFAULT_DB_HOST); mongoPort = XContentMapValues.nodeIntegerValue( mongoSettings.get(PORT_FIELD), DEFAULT_DB_PORT); try { mongoServers.add(new ServerAddress(mongoHost, mongoPort)); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // MongoDB options if (mongoSettings.containsKey(OPTIONS_FIELD)) { Map<String, Object> mongoOptionsSettings = (Map<String, Object>) mongoSettings.get(OPTIONS_FIELD); mongoSecondaryReadPreference = XContentMapValues.nodeBooleanValue( mongoOptionsSettings.get(SECONDARY_READ_PREFERENCE_FIELD), false); } else { mongoSecondaryReadPreference = false; } // Credentials if (mongoSettings.containsKey(CREDENTIALS_FIELD)) { String dbCredential; String mau = ""; String map = ""; String mlu = ""; String mlp = ""; String mdu = ""; String mdp = ""; Object mongoCredentialsSettings = mongoSettings.get(CREDENTIALS_FIELD); boolean array = XContentMapValues.isArray(mongoCredentialsSettings); if (array) { ArrayList<Map<String, Object>> credentials = (ArrayList<Map<String, Object>>) mongoCredentialsSettings; for (Map<String, Object> credential : credentials) { dbCredential = XContentMapValues.nodeStringValue(credential.get(DB_FIELD), null); if (DB_ADMIN.equals(dbCredential)) { mau = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null); map = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null); } else if (DB_LOCAL.equals(dbCredential)) { mlu = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null); mlp = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null); } else { mdu = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null); mdp = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null); } } } mongoAdminUser = mau; mongoAdminPassword = map; mongoLocalUser = mlu; mongoLocalPassword = mlp; mongoDbUser = mdu; mongoDbPassword = mdp; } else { mongoAdminUser = ""; mongoAdminPassword = ""; mongoLocalUser = ""; mongoLocalPassword = ""; mongoDbUser = ""; mongoDbPassword = ""; } mongoDb = XContentMapValues.nodeStringValue( mongoSettings.get(DB_FIELD), riverName.name()); mongoCollection = XContentMapValues.nodeStringValue( mongoSettings.get(COLLECTION_FIELD), riverName.name()); mongoGridFS = XContentMapValues.nodeBooleanValue( mongoSettings.get(GRIDFS_FIELD), false); } else { mongoHost = DEFAULT_DB_HOST; mongoPort = DEFAULT_DB_PORT; try { mongoServers.add(new ServerAddress(mongoHost, mongoPort)); } catch (UnknownHostException e) { e.printStackTrace(); } mongoSecondaryReadPreference = false; mongoDb = riverName.name(); mongoCollection = riverName.name(); mongoGridFS = false; mongoAdminUser = ""; mongoAdminPassword = ""; mongoLocalUser = ""; mongoLocalPassword = ""; mongoDbUser = ""; mongoDbPassword = ""; } mongoOplogNamespace = mongoDb + "." + mongoCollection; if (settings.settings().containsKey(INDEX_OBJECT)) { Map<String, Object> indexSettings = (Map<String, Object>) settings .settings().get(INDEX_OBJECT); indexName = XContentMapValues.nodeStringValue( indexSettings.get(NAME_FIELD), mongoDb); typeName = XContentMapValues.nodeStringValue( indexSettings.get(TYPE_FIELD), mongoDb); bulkSize = XContentMapValues.nodeIntegerValue( indexSettings.get(BULK_SIZE_FIELD), 100); if (indexSettings.containsKey(BULK_TIMEOUT_FIELD)) { bulkTimeout = TimeValue.parseTimeValue( XContentMapValues.nodeStringValue( indexSettings.get(BULK_TIMEOUT_FIELD), "10ms"), TimeValue.timeValueMillis(10)); } else { bulkTimeout = TimeValue.timeValueMillis(10); } [color=red]throttleSize = XContentMapValues.nodeIntegerValue(indexSettings.get(THROTTLE_SIZE_FIELD), bulkSize * 5);[/color] } else { indexName = mongoDb; typeName = mongoDb; bulkSize = 100; bulkTimeout = TimeValue.timeValueMillis(10); [color=red]throttleSize = bulkSize * 5;[/color] } [color=red] if (throttleSize == -1) { stream = new LinkedTransferQueue<Map<String, Object>>(); } else { stream = new ArrayBlockingQueue<Map<String, Object>>(throttleSize); }[/color] //构造方法里初始化throttleSize及stream. } @Override public void start() { for (ServerAddress server : mongoServers) { logger.info( "Using mongodb server(s): host [{}], port [{}]", server.getHost(), server.getPort()); } logger.info( "starting mongodb stream: options: secondaryreadpreference [{}], gridfs [{}], filter [{}], db [{}], indexing to [{}]/[{}]", mongoSecondaryReadPreference, mongoGridFS, mongoDb, indexName, typeName); try { client.admin().indices().prepareCreate(indexName).execute() .actionGet(); } catch (Exception e) { if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { // that's fine } else if (ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException) { // ok, not recovered yet..., lets start indexing and hope we // recover by the first bulk // TODO: a smarter logic can be to register for cluster event // listener here, and only start sampling when the // block is removed... } else { logger.warn("failed to create index [{}], disabling river...", e, indexName); return; } } if (mongoGridFS) { try { client.admin().indices().preparePutMapping(indexName) .setType(typeName).setSource(getGridFSMapping()) .execute().actionGet(); } catch (Exception e) { logger.warn("Failed to set explicit mapping (attachment): {}", e); if (logger.isDebugEnabled()) { logger.debug("Set explicit attachment mapping.", e); } } } tailerThread = EsExecutors.daemonThreadFactory( settings.globalSettings(), "mongodb_river_slurper").newThread( new Slurper()); indexerThread = EsExecutors.daemonThreadFactory( settings.globalSettings(), "mongodb_river_indexer").newThread( new Indexer()); indexerThread.start(); tailerThread.start(); } @Override public void close() { if (active) { logger.info("closing mongodb stream river"); active = false; tailerThread.interrupt(); indexerThread.interrupt(); } } private class Indexer implements Runnable { @Override public void run() { while (active) { try { BSONTimestamp lastTimestamp = null; BulkRequestBuilder bulk = client.prepareBulk(); // 1. Attempt to fill as much of the bulk request as // possible Map<String, Object> data = stream.take(); lastTimestamp = updateBulkRequest(bulk, data); while ((data = stream.poll(bulkTimeout.millis(), MILLISECONDS)) != null) { lastTimestamp = updateBulkRequest(bulk, data); if (bulk.numberOfActions() >= bulkSize) { break; } } // 2. Update the timestamp if (lastTimestamp != null) { updateLastTimestamp(mongoOplogNamespace, lastTimestamp, bulk); } // 3. Execute the bulk requests try { BulkResponse response = bulk.execute().actionGet(); if (response.hasFailures()) { // TODO write to exception queue? logger.warn("failed to execute" + response.buildFailureMessage()); } } catch (Exception e) { logger.warn("failed to execute bulk", e); } } catch (InterruptedException e) { if (logger.isDebugEnabled()) { logger.debug("river-mongodb indexer interrupted"); } } } } private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk, final Map<String, Object> data) { if (data.get("_id") == null) { logger.warn( "Cannot get object id. Skip the current item: [{}]", data); return null; } BSONTimestamp lastTimestamp = (BSONTimestamp) data .get(OPLOG_TIMESTAMP); String operation = data.get(OPLOG_OPERATION).toString(); String objectId = data.get("_id").toString(); data.remove(OPLOG_TIMESTAMP); data.remove(OPLOG_OPERATION); try { if (OPLOG_INSERT_OPERATION.equals(operation)) { if (logger.isDebugEnabled()) { logger.debug( "Insert operation - id: {} - contains attachment: {}", operation, objectId, data.containsKey("attachment")); } bulk.add(indexRequest(indexName).type(typeName) .id(objectId).source(build(data, objectId))); } if (OPLOG_UPDATE_OPERATION.equals(operation)) { if (logger.isDebugEnabled()) { logger.debug( "Update operation - id: {} - contains attachment: {}", objectId, data.containsKey("attachment")); } bulk.add(new DeleteRequest(indexName, typeName, objectId)); bulk.add(indexRequest(indexName).type(typeName) .id(objectId).source(build(data, objectId))); // new UpdateRequest(indexName, typeName, objectId) } if (OPLOG_DELETE_OPERATION.equals(operation)) { logger.info("Delete request [{}], [{}], [{}]", indexName, typeName, objectId); bulk.add(new DeleteRequest(indexName, typeName, objectId)); } } catch (IOException e) { logger.warn("failed to parse {}", e, data); } return lastTimestamp; } private XContentBuilder build(final Map<String, Object> data, final String objectId) throws IOException { if (data.containsKey("attachment")) { logger.info("Add Attachment: {} to index {} / type {}", objectId, indexName, typeName); return GridFSHelper.serialize((GridFSDBFile) data .get("attachment")); } else { return XContentFactory.jsonBuilder().map(data); } } } private class Slurper implements Runnable { private Mongo mongo; private DB slurpedDb; private DBCollection slurpedCollection; private DB oplogDb; private DBCollection oplogCollection; private boolean assignCollections() { DB adminDb = mongo.getDB(MONGODB_ADMIN); oplogDb = mongo.getDB(MONGODB_LOCAL); if (!mongoAdminUser.isEmpty() && !mongoAdminPassword.isEmpty()) { logger.info("Authenticate {} with {}", MONGODB_ADMIN, mongoAdminUser); CommandResult cmd = adminDb.authenticateCommand(mongoAdminUser, mongoAdminPassword.toCharArray()); if (! cmd.ok()) { logger.error("Autenticatication failed for {}: {}", MONGODB_ADMIN, cmd.getErrorMessage()); // Can still try with mongoLocal credential if provided. // return false; } oplogDb = adminDb.getMongo().getDB(MONGODB_LOCAL); } if (!mongoLocalUser.isEmpty() && !mongoLocalPassword.isEmpty() && !oplogDb.isAuthenticated()) { logger.info("Authenticate {} with {}", MONGODB_LOCAL, mongoLocalUser); CommandResult cmd = oplogDb.authenticateCommand(mongoLocalUser, mongoLocalPassword.toCharArray()); if (! cmd.ok()) { logger.error("Autenticatication failed for {}: {}", MONGODB_LOCAL, cmd.getErrorMessage()); return false; } } Set<String> collections = oplogDb.getCollectionNames(); if (! collections.contains(OPLOG_COLLECTION)){ logger.error("Cannot find " + OPLOG_COLLECTION + " collection. Please use check this link: http://goo.gl/2x5IW"); return false; } oplogCollection = oplogDb.getCollection(OPLOG_COLLECTION); slurpedDb = mongo.getDB(mongoDb); if (!mongoAdminUser.isEmpty() && !mongoAdminUser.isEmpty() && adminDb.isAuthenticated()) { slurpedDb = adminDb.getMongo().getDB(mongoDb); } if (!mongoDbUser.isEmpty() && !mongoDbPassword.isEmpty() && !slurpedDb.isAuthenticated()) { logger.info("Authenticate {} with {}", mongoDb, mongoDbUser); CommandResult cmd = slurpedDb.authenticateCommand(mongoDbUser, mongoDbPassword.toCharArray()); if (! cmd.ok()) { logger.error("Autenticatication failed for {}: {}", mongoDb, cmd.getErrorMessage()); return false; } } slurpedCollection = slurpedDb.getCollection(mongoCollection); return true; } @Override public void run() { mongo = new Mongo(mongoServers); if (mongoSecondaryReadPreference) { mongo.setReadPreference(ReadPreference.SECONDARY); } while (active) { try { if (!assignCollections()) { break; // failed to assign oplogCollection or // slurpedCollection } DBCursor oplogCursor = oplogCursor(null); if (oplogCursor == null) { oplogCursor = processFullCollection(); } while (oplogCursor.hasNext()) { DBObject item = oplogCursor.next(); processOplogEntry(item); } Thread.sleep(5000); } catch (MongoException mEx) { logger.error("Mongo gave an exception", mEx); } catch (NoSuchElementException nEx) { logger.warn("A mongoDB cursor bug ?", nEx); } catch (InterruptedException e) { if (logger.isDebugEnabled()) { logger.debug("river-mongodb slurper interrupted"); } } } } /* * Remove fscynlock and unlock - https://github.com/richardwilly98/elasticsearch-river-mongodb/issues/17 */ private DBCursor processFullCollection() { // CommandResult lockResult = mongo.fsyncAndLock(); // if (lockResult.ok()) { try { BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection .find() .sort(new BasicDBObject(OPLOG_TIMESTAMP, -1)) .limit(1).next().get(OPLOG_TIMESTAMP); addQueryToStream("i", currentTimestamp, null); return oplogCursor(currentTimestamp); } finally { // mongo.unlock(); } // } else { // throw new MongoException( // "Could not lock the database for FullCollection sync"); // } } @SuppressWarnings("unchecked") private void processOplogEntry(final DBObject entry) { String operation = entry.get(OPLOG_OPERATION).toString(); String namespace = entry.get(OPLOG_NAMESPACE).toString(); BSONTimestamp oplogTimestamp = (BSONTimestamp) entry .get(OPLOG_TIMESTAMP); DBObject object = (DBObject) entry.get(OPLOG_OBJECT); // Not interested by chunks - skip all if (namespace.endsWith(".chunks")) { return; } if (logger.isTraceEnabled()) { logger.trace("oplog processing item {}", entry); } if (mongoGridFS && namespace.endsWith(".files") && ("i".equals(operation) || "u".equals(operation))) { String objectId = object.get("_id").toString(); GridFS grid = new GridFS(mongo.getDB(mongoDb), mongoCollection); GridFSDBFile file = grid.findOne(new ObjectId(objectId)); if (file != null) { logger.info("Caught file: {} - {}", file.getId(), file.getFilename()); object = file; } else { logger.warn("Cannot find file from id: {}", objectId); } } if (object instanceof GridFSDBFile) { logger.info("Add attachment: {}", object.get("_id")); HashMap<String, Object> data = new HashMap<String, Object>(); data.put("attachment", object); data.put("_id", object.get("_id")); addToStream(operation, oplogTimestamp, data); } else { if ("u".equals(operation)) { DBObject update = (DBObject) entry.get(OPLOG_UPDATE); addQueryToStream(operation, oplogTimestamp, update); } else { addToStream(operation, oplogTimestamp, object.toMap()); } } } private DBObject getIndexFilter(final BSONTimestamp timestampOverride) { BSONTimestamp time = timestampOverride == null ? getLastTimestamp(mongoOplogNamespace) : timestampOverride; if (time == null) { logger.info("No known previous slurping time for this collection"); return null; } else { BasicDBObject filter = new BasicDBObject(); filter.put(OPLOG_TIMESTAMP, new BasicDBObject("$gt", time)); filter.put(OPLOG_NAMESPACE, Pattern.compile(mongoOplogNamespace)); if (logger.isDebugEnabled()) { logger.debug("Using filter: {}", filter); } return filter; } } private DBCursor oplogCursor(final BSONTimestamp timestampOverride) { DBObject indexFilter = getIndexFilter(timestampOverride); if (indexFilter == null) { return null; } return oplogCollection.find(indexFilter) .sort(new BasicDBObject("$natural", 1)) .addOption(Bytes.QUERYOPTION_TAILABLE) .addOption(Bytes.QUERYOPTION_AWAITDATA); } @SuppressWarnings("unchecked") private void addQueryToStream(final String operation, final BSONTimestamp currentTimestamp, final DBObject update) { for (DBObject item : slurpedCollection.find(update)) { addToStream(operation, currentTimestamp, item.toMap()); } } private void addToStream(final String operation, final BSONTimestamp currentTimestamp, final Map<String, Object> data) { data.put(OPLOG_TIMESTAMP, currentTimestamp); data.put(OPLOG_OPERATION, operation); try { [color=red] stream.put(data); //将add方法改为put. 这样,加入的时候,会等待直到有空间才加入。[/color] } catch (InterruptedException e) { e.printStackTrace(); } } } private XContentBuilder getGridFSMapping() throws IOException { XContentBuilder mapping = jsonBuilder().startObject() .startObject(typeName).startObject("properties") .startObject("content").field("type", "attachment").endObject() .startObject("filename").field("type", "string").endObject() .startObject("contentType").field("type", "string").endObject() .startObject("md5").field("type", "string").endObject() .startObject("length").field("type", "long").endObject() .startObject("chunkSize").field("type", "long").endObject() .endObject().endObject().endObject(); logger.info("Mapping: {}", mapping.string()); return mapping; } /** * Get the latest timestamp for a given namespace. */ @SuppressWarnings("unchecked") private BSONTimestamp getLastTimestamp(final String namespace) { GetResponse lastTimestampResponse = client .prepareGet(riverIndexName, riverName.getName(), namespace) .execute().actionGet(); if (lastTimestampResponse.exists()) { Map<String, Object> mongodbState = (Map<String, Object>) lastTimestampResponse .sourceAsMap().get(ROOT_NAME); if (mongodbState != null) { String lastTimestamp = mongodbState.get(LAST_TIMESTAMP_FIELD) .toString(); if (lastTimestamp != null) { if (logger.isDebugEnabled()) { logger.debug("{} last timestamp: {}", namespace, lastTimestamp); } return (BSONTimestamp) JSON.parse(lastTimestamp); } } } return null; } /** * Adds an index request operation to a bulk request, updating the last * timestamp for a given namespace (ie: host:dbName.collectionName) * * @param bulk */ private void updateLastTimestamp(final String namespace, final BSONTimestamp time, final BulkRequestBuilder bulk) { try { bulk.add(indexRequest(riverIndexName) .type(riverName.getName()) .id(namespace) .source(jsonBuilder().startObject().startObject(ROOT_NAME) .field(LAST_TIMESTAMP_FIELD, JSON.serialize(time)) .endObject().endObject())); } catch (IOException e) { logger.error("error updating last timestamp for namespace {}", namespace); } } }
关键改动如上。 其实, 对源码的改动, 只修改stream.add(data);这句应该也可以。
只不过加入throttle_size的话, 可以像设置bulksize一样在新建river的时候去初始化设置这个抓取值。
花了一个上午解决这个问题, 留个笔记。
如果大家有更好的方案请留言。附件是我重新编译的mongo river 插件jar包, 替换ES 目录下的plugins\river-mongodb下的相关Jar就可以了。
注: 目前mongo-river 1.6及以后版本, 上述问题,作者已经修复,可放心使用。
- elasticsearch-river-mongodb-1.4.0-SNAPSHOT.jar (21.8 KB)
- 下载次数: 37
相关推荐
### Linux安装ElasticSearch与MongoDB分布式集群环境下数据同步 #### 概述 在现代的大数据处理场景下,为了提高数据处理效率以及更好地利用资源,通常会采用多种数据库技术组合的方式来构建系统架构。Elastic...
本文将详细介绍如何使用Python来实现MongoDB数据到ElasticSearch的同步,并探讨全量同步、增量同步和实时同步的实现方法,以及如何处理中间数据。 **Python与数据库交互** Python作为一种强大且易用的编程语言,...
9. **性能优化**:大量数据同步可能会对系统性能产生影响。因此,需要考虑批处理、并发处理和错误重试等策略来提高同步效率。 10. **监控与调试**:为了确保同步过程的稳定,需要设置日志记录和监控,以便于追踪...
Windows环境下实现MongoDB与Elasticsearch数据自动同步是大数据和搜索领域经常需要进行的操作,可以帮助用户实时将数据库中的数据变化同步到搜索引擎中,以便快速检索。本文介绍如何使用Python 2.7.14,MongoDB ...
Mongodb-Elasticsearch 数据同步工具,支持Mongodb到elasticsearch、Mongodb到数据库、Mongodb到kafka、Mongodb到excel文件、Mongodb到文本文件以及Mongodb到ftp/sftp文件数据采集、上报;支持海量PB级数据同步导入...
Elasticsearch-HBase-River是一个社区开发的插件,它的主要功能是实现HBase与Elasticsearch之间的数据同步。River是ES早期版本中用来实现外部数据源同步的一个概念,它负责定期抓取数据并导入ES索引。这个插件就是...
该项目是一个基于Java和Shell的MongoDB到Elasticsearch数据同步工具,包含59个文件,主要语言为Java。文件类型包括19个Java源文件、11个Shell脚本、7个批处理文件、7个XML配置文件、5个属性文件、1个Git属性文件、1...
标题中的"es file river同步"指的是Elasticsearch (ES) 中的一种数据同步方式,File River。在大数据处理和搜索引擎领域,Elasticsearch是一款强大的开源分布式搜索引擎,它允许实时地存储、搜索和分析大量数据。而...
jar包,官方版本,自测可用
Flink实时同步ElasticSearch数据到Doris
MongoDB和Elasticsearch都是现代数据存储和搜索的流行选择,它们各自在不同的场景下有着出色的表现。然而,在实际的数据管理中,有时我们需要在这些系统之间进行数据迁移或同步,以便充分利用各自的优点。这就引出了...
mongoDB是一个基于分布式文件存储的数据库,由 C++ 语言编写,旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。它介于关系数据库和非关系数据库之间,被认为是非关系数据库当中功能最丰富,最像关系数据库的...
Elasticsearch River Neo4j 是一个专为 Elasticsearch 设计的开源项目,其主要功能是作为两者之间的桥梁,将数据从 Neo4j 图形数据库实时同步到 Elasticsearch 搜索引擎,从而实现高效、灵活的数据检索。这个插件...
本设计源码提供了一个基于Java的Elasticsearch数据同步迁移工具。项目包含29个文件,主要使用Java和Shell编程语言。文件类型包括7个Java源代码文件、5个BAT批处理文件、5个Shell脚本文件、3个XML配置文件、2个...
MySQL作为一款广泛应用的关系型数据库管理系统,存储了大量结构化数据,而Elasticsearch则是一款实时分布式搜索和分析引擎,适用于非结构化数据的快速检索。将MySQL中的数据同步到Elasticsearch,可以实现更高效的...
- **标题**:“es file river同步文档” 这里提到的“river”可能是指早期Elasticsearch中的River插件机制,它允许数据源(如数据库、文件系统等)与Elasticsearch之间的实时同步。然而,随着Elasticsearch的发展,...
它是ELK(Elasticsearch, Logstash, Kibana)堆栈的核心,用于存储、搜索和分析大量数据。 3. **Logstash**: Logstash是ELK堆栈的一部分,负责数据收集、转换和分发。在这个场景中,Logstash作为中间件,从SQL ...
springboot整合elasticsearch7,进行数据同步。elasticsearch相关度查询、排序。高亮显示;自动补全等功能。代码仅供参考,代码中有具体的注释,可以根据代码及注释内容,对自己项目架构及业务进行修改、整合。
- **兼容性**:支持多种数据库系统(如MySQL、SQL Server、Oracle等)和数据源(如ElasticSearch、Hadoop),方便数据迁移。 - **实时性**:实现数据的实时同步,延迟在秒级,确保数据新鲜度。 - **自定义**:...
标题 "Go-go-mysql-elasticsearch-自动同步你的MySQL数据到Elasticsearch" 指的是一种使用 Go 语言编写的工具,它允许你实现实时或定期地将 MySQL 数据库中的数据自动同步到 Elasticsearch 搜索引擎。这个工具名为 `...