`
xiamizy
  • 浏览: 88698 次
  • 性别: Icon_minigender_1
  • 来自: 南京
博客专栏
78437efc-ad8e-387c-847f-a092d52e81a6
spring framew...
浏览量:4858
社区版块
存档分类
最新评论

ElasticSearch大批量数据入库

阅读更多

最近着手处理大批量数据的任务。

现状是这样的,一个数据采集程序承载大批量数据的存储和检索。后期可能需要对大批量数据进行统计。

数据分布情况

13个点定时生成采集结果到4个文件(小文件生成周期是5分钟)

 

名称                                                 大小(b)
gather_1_2014-02-27-14-50-0.txt                      568497
gather_1_2014-02-27-14-50-1.txt                      568665
gather_1_2014-02-27-14-50-2.txt                      568172
gather_1_2014-02-27-14-50-3.txt                      568275

 

 

同步使用shell脚本对四个文件入到sybase_iq库的一张表tab_tmp_2014_2_27中.

每天数据量大概是3亿条,所以小文件的总量大概是3G。小文件数量大,单表容量大执行复合主键查询,由原来2s延时变成了,5~10分钟。

针对上述情况需要对目前的储存结构进行优化。

才是看了下相关系统 catior使用的是环状数据库,存储相关的数据优点方便生成MRTG图,缺点不利于数据统计。后来引入elasticsearch来对大数据检索进行优化。

测试平台

 

cpu: AMD Opteron(tm) Processor 6136 64bit 2.4GHz   * 32
内存: 64G
硬盘:1.5T
操作系统:Red Hat Enterprise Linux Server release 6.4 (Santiago)

读取文件的目录结构:

 

[test@test001 data]$ ls
0  1  2  3

 简单测试代码:

 

 

public class FileReader
{

	private File file;
	private String splitCharactor;
	private Map<String, Class<?>> colNames;
	private static final Logger LOG = Logger.getLogger(FileReader.class);

	/**
	 * @param path
	 *            文件路径
	 * @param fileName
	 *            文件名
	 * @param splitCharactor
	 *            拆分字符
	 * @param colNames
	 *            主键名称
	 */
	public FileReader(File file, String splitCharactor, Map<String, Class<?>> colNames)
	{
		this.file = file;
		this.splitCharactor = splitCharactor;
		this.colNames = colNames;
	}

	/**
	 * 读取文件
	 * 
	 * @return
	 * @throws Exception
	 */
	public List<Map<String, Object>> readFile() throws Exception
	{
		List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
		if (!file.isFile())
		{
			throw new Exception("File not exists." + file.getName());
		}
		LineIterator lineIterator = null;
		try
		{
			lineIterator = FileUtils.lineIterator(file, "UTF-8");
			while (lineIterator.hasNext())
			{
				String line = lineIterator.next();
				String[] values = line.split(splitCharactor);
				if (colNames.size() != values.length)
				{
					continue;
				}
				Map<String, Object> map = new HashMap<String, Object>();
				Iterator<Entry<String, Class<?>>> iterator = colNames.entrySet()
						.iterator();
				int count = 0;
				while (iterator.hasNext())
				{
					Entry<String, Class<?>> entry = iterator.next();
					Object value = values[count];
					if (!String.class.equals(entry.getValue()))
					{
						value = entry.getValue().getMethod("valueOf", String.class)
								.invoke(null, value);
					}
					map.put(entry.getKey(), value);
					count++;
				}
				list.add(map);
			}
		}
		catch (IOException e)
		{
			LOG.error("File reading line error." + e.toString(), e);
		}
		finally
		{
			LineIterator.closeQuietly(lineIterator);
		}
		return list;
	}
}

 

public class StreamIntoEs
{

	public static class ChildThread extends Thread
	{

		int number;

		public ChildThread(int number)
		{
			this.number = number;
		}

		@Override
		public void run()
		{
			Settings settings = ImmutableSettings.settingsBuilder()
					.put("client.transport.sniff", true)
					.put("client.transport.ping_timeout", 100)
					.put("cluster.name", "elasticsearch").build();
			TransportClient client = new TransportClient(settings)
					.addTransportAddress(new InetSocketTransportAddress("192.168.32.228",
							9300));
			File dir = new File("/export/home/es/data/" + number);
			LinkedHashMap<String, Class<?>> colNames = new LinkedHashMap<String, Class<?>>();
			colNames.put("aa", Long.class);
			colNames.put("bb", String.class);
			colNames.put("cc", String.class);
			colNames.put("dd", Integer.class);
			colNames.put("ee", Long.class);
			colNames.put("ff", Long.class);
			colNames.put("hh", Long.class);
			int count = 0;
			long startTime = System.currentTimeMillis();
			for (File file : dir.listFiles())
			{
				int currentCount = 0;
				long startCurrentTime = System.currentTimeMillis();
				FileReader reader = new FileReader(file, "\\$", colNames);
				BulkResponse resp = null;
				BulkRequestBuilder bulkRequest = client.prepareBulk();
				try
				{
					List<Map<String, Object>> results = reader.readFile();
					for (Map<String, Object> col : results)
					{
						bulkRequest.add(client.prepareIndex("flux", "fluxdata")
								.setSource(JSON.toJSONString(col)).setId(col.get("getway")+"##"+col.get("port_info")+"##"+col.get("device_id")+"##"+col.get("collecttime")));
						count++;
						currentCount++;
					}
					resp = bulkRequest.execute().actionGet();
				}
				catch (Exception e)
				{
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				long endCurrentTime = System.currentTimeMillis();
				System.out.println("[thread-" + number + "-]per count:" + currentCount);
				System.out.println("[thread-" + number + "-]per time:"
						+ (endCurrentTime - startCurrentTime));
				System.out.println("[thread-" + number + "-]per count/s:"
						+ (float) currentCount / (endCurrentTime - startCurrentTime)
						* 1000);
				System.out.println("[thread-" + number + "-]per count/s:"
						+ resp.toString());
			}
			long endTime = System.currentTimeMillis();
			System.out.println("[thread-" + number + "-]total count:" + count);
			System.out.println("[thread-" + number + "-]total time:"
					+ (endTime - startTime));
			System.out.println("[thread-" + number + "-]total count/s:" + (float) count
					/ (endTime - startTime) * 1000);
			// IndexRequest request =
			// = client.index(request);
		}
	}

	public static void main(String args[])
	{
		for (int i = 0; i < 4; i++)
		{
			ChildThread childThread = new ChildThread(i);
			childThread.start();
		}
	}
}

 起了4个线程来做入库,每个文件解析完成进行一次批处理。

 

初始化脚本:

 

curl -XDELETE 'http://192.168.32.228:9200/twitter/'

 

curl -XPUT 'http://192.168.32.228:9200/twitter/' -d '
{
     "index" :{
          "number_of_shards" : 5,
          "number_of_replicas ": 0,
          "index.refresh_interval": "-1",
         "index.translog.flush_threshold_ops": "100000"
     }
}'

 

curl -XPUT 'http://192.168.32.228:9200/twiter/twiterdata/_mapping' -d '
{
             "twiterdata": {
                    "aa" : {"type" : "long", "index" : "not_analyzed"},
                    "bb" : {"type" : "String", "index" : "not_analyzed"},
                    "cc" : {"type" : "String", "index" : "not_analyzed"},
                    "dd" : {"type" : "integer", "index" : "not_analyzed"},
                    "ee" : {"type" : "long", "index" : "no"},
                    "ff" : {"type" : "long", "index" : "no"},
                    "gg" : {"type" : "long", "index" : "no"},
                    "hh" : {"type" : "long", "index" : "no"},
                    "ii" : {"type" : "long", "index" : "no"},
                    "jj" : {"type" : "long", "index" : "no"},
                    "kk" : {"type" : "long", "index" : "no"},
                }
}

 执行效率参考:

 

不开启refresh_interval
[test@test001 bin]$ more StreamIntoEs.out|grep total
[thread-2-]total count:1199411
[thread-2-]total time:1223718
[thread-2-]total count/s:980.1368
[thread-1-]total count:1447214
[thread-1-]total time:1393528
[thread-1-]total count/s:1038.5253
[thread-0-]total count:1508043
[thread-0-]total time:1430167
[thread-0-]total count/s:1054.4524
[thread-3-]total count:1650576
[thread-3-]total time:1471103
[thread-3-]total count/s:1121.9989
4195.1134

开启refresh_interval
[test@test001 bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:996111
[thread-2-]total count/s:1204.0938
[thread-1-]total count:1447214
[thread-1-]total time:1163207
[thread-1-]total count/s:1244.1586
[thread-0-]total count:1508043
[thread-0-]total time:1202682
[thread-0-]total count/s:1253.9
[thread-3-]total count:1650576
[thread-3-]total time:1236239
[thread-3-]total count/s:1335.1593
5037.3117

开启refresh_interval  字段类型转换
[test@test001 bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:1065229
[thread-2-]total count/s:1125.9653
[thread-1-]total count:1447214
[thread-1-]total time:1218342
[thread-1-]total count/s:1187.8552
[thread-0-]total count:1508043
[thread-0-]total time:1230474
[thread-0-]total count/s:1225.5789
[thread-3-]total count:1650576
[thread-3-]total time:1274027
[thread-3-]total count/s:1295.5581
4834.9575

开启refresh_interval  字段类型转换 设置id
[thread-2-]total count:1199411
[thread-2-]total time:912251
[thread-2-]total count/s:1314.7817
[thread-1-]total count:1447214
[thread-1-]total time:1067117
[thread-1-]total count/s:1356.1906
[thread-0-]total count:1508043
[thread-0-]total time:1090577
[thread-0-]total count/s:1382.7937
[thread-3-]total count:1650576
[thread-3-]total time:1128490
[thread-3-]total count/s:1462.6412
5516.4072

 580M的数据平均用时大概是20分钟。索引文件大约为1.76G

 

 相关测试结果可以参考这里:

 elasticsearch 性能测试

 

 

分享到:
评论
3 楼 silent1 2015-01-27  
明白了,这两个是独立的。
Fsync实际上与ElasticSearch(或者Lucene)无关,只是把磁盘缓冲区的translog同步到磁盘上。
Flush是ElasticSearch(或者Lucene)的操作,该操作把新的索引信息写入段中,同时清空内存缓冲区和translog。
2 楼 silent1 2015-01-27  
index.gateway.local.sync缺省是5秒,那么下面这个设定有意义吗?
"index.translog.flush_threshold_ops": "100000"

flush跟fsync是一回事吗?
1 楼 silent1 2015-01-23  
谢谢!准备抄一遍。

相关推荐

    Oracle数据批量导入elasticsearch脚本

    Linux环境下使用sqlplus工具将oracle中的数据导入到elasticsearch中。只需要在es_bulk_tool.properties配置sql即可实现数据的批量导入。在elasticsearch6中测试通过。shell脚本需要使用sqlplus。

    ElasticSearch官方测试数据

    Elasticsearch(ES)是一种流行的开源全文搜索引擎,它基于Lucene库构建,被广泛用于大数据分析、日志聚合、实时搜索和索引等场景。官方提供的测试数据集是检验Elasticsearch功能、性能和稳定性的关键资源,可以帮助...

    ElasticSearch数据导出

    ElasticSearch数据导出 elasticsearch单文档数据导出 支持自定义查询 导出数据Json文件

    Elasticsearch数据导出工具

    Elasticsearch数据导出工具是一种高效实用的解决方案,它允许用户方便地从Elasticsearch(ES)集群中抽取数据,并将其导出到不同的目标,如MySQL数据库或本地文件系统。这款工具尤其适用于需要进行数据迁移、备份或...

    项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)

    在本项目实战中,我们将探讨如何使用Java编程语言,结合Spark和Hive,将Hive中的数据高效地导入到ElasticSearch(ES)中,并利用ES的别名机制实现数据更新的平滑过渡。以下是对这个流程的详细解析: 1. **Hive数据...

    java语言kafka数据批量导入到Elasticsearch实例

    消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 &lt;groupId&gt;org.elasticsearch &lt;artifactId&gt;elastic...

    Elasticsearch官方提供数据案例account.json

    **Elasticsearch官方提供数据案例account.json** Elasticsearch是一个高度可扩展的开源全文搜索和分析引擎,它允许用户快速地存储、搜索和分析大量数据。在这个案例中,"account.json"是Elasticsearch官方提供的一...

    ArcGIS桌面工具--矢量数据导入elasticsearch

    在导入过程中,可能需要调整Elasticsearch的设置,如分片数量、副本数量、映射配置等,以适应大数据量的导入,并确保查询性能。 通过以上步骤,我们可以实现从ArcGIS Shapefile到Elasticsearch的有效迁移。这样的...

    elasticsearch数据结构设计文档

    Elasticsearch(以下简称 ES)是一种基于 Lucene 的开源搜索引擎,广泛应用于大数据时代的搜索、日志分析和数据集成等领域。下面是基于给定的文件信息,生成的相关知识点。 一、ES 索引结构设计 ES 索引结构设计是...

    项目实战——钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量(Java版本)

    因为你不知道将Hive的数据导入到了ElasticSearch后,数据量是否准确,所以需要钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量,注意,这个项目打包后,最好另起一个进程调用,并且开始时间为文章1或者2最大...

    ElasticSearch 官方示例测试数据

    ES 官方示例数据

    elasticSearch测试数据

    Elasticsearch(ES)是一种基于Lucene的分布式、RESTful搜索和分析引擎,常用于实时大数据的检索和分析。在本场景中,我们有三个测试数据集,分别代表不同的应用场景和数据类型,它们是logstash-*,account,以及...

    elasticsearch 8.11.3 windows安装包

    随着您的数据和查询量的增长,Elasticsearch 的分布式特性使您的部署能够随之无缝增长。Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分...

    基于Java的Elasticsearch数据同步迁移工具设计源码

    本设计源码提供了一个基于Java的Elasticsearch数据同步迁移工具。项目包含29个文件,主要使用Java和Shell编程语言。文件类型包括7个Java源代码文件、5个BAT批处理文件、5个Shell脚本文件、3个XML配置文件、2个...

    ES-HIVE数据互通

    5. **安装elasticsearch-hadoop库**:为了实现Hive与Elasticsearch之间的数据互通,需要安装elasticsearch-hadoop库。可以使用Maven或直接将jar包放置在合适的位置。 #### Hive端操作 接下来介绍如何在Hive端创建...

    Spring Boot elasticsearch7.6.2基础操作:创建索引、新增数据、查询数据

    Elasticsearch是一个强大的分布式搜索引擎,而Spring Boot是Java开发中的轻量级框架,两者结合使得开发过程中对Elasticsearch的集成变得简单高效。 首先,我们需要在Spring Boot项目中引入Elasticsearch的相关依赖...

    ES查询客户端,elasticsearch可视化工具 elasticsearch查询客户端

    Elasticsearch(简称ES)是一款强大的开源搜索引擎,广泛应用于数据检索、分析和管理。作为分布式、RESTful风格的搜索和数据分析引擎,Elasticsearch能够提供实时、高可用性以及可扩展的搜索功能。在进行日常的数据...

    由 bboss 开源的数据采集&流批一体化工具,提供数据采集、数据清洗转换处理和数据入库以及数据指标统计计算流批一体化处理功能

    完成清洗和转换后,bboss-datatran 可以将处理后的数据加载到不同的存储系统,如Hadoop HDFS、Hive、HBase、Elasticsearch、Greenplum、Oracle等。这为用户提供了灵活的数据存储选择,并且支持数据分片、分区策略,...

    ElasticSearch测试数据

    ElasticSearch测试数据

    Elasticsearch 开发手册

    在数据抽取 ELT 领域,ES 全家桶 ELK(Elasticsearch+Logstash+Kibana)赫赫有名。 Elasticsearch 基本概念: * 倒排索引:Elasticsearch 为什么快,核心设计理念就是采用了倒排索引机制。倒排索引的方式是,根据 ...

Global site tag (gtag.js) - Google Analytics