最近着手处理大批量数据的任务。
现状是这样的,一个数据采集程序承载大批量数据的存储和检索。后期可能需要对大批量数据进行统计。
数据分布情况
13个点定时生成采集结果到4个文件(小文件生成周期是5分钟)
名称 大小(b) gather_1_2014-02-27-14-50-0.txt 568497 gather_1_2014-02-27-14-50-1.txt 568665 gather_1_2014-02-27-14-50-2.txt 568172 gather_1_2014-02-27-14-50-3.txt 568275
同步使用shell脚本对四个文件入到sybase_iq库的一张表tab_tmp_2014_2_27中.
每天数据量大概是3亿条,所以小文件的总量大概是3G。小文件数量大,单表容量大执行复合主键查询,由原来2s延时变成了,5~10分钟。
针对上述情况需要对目前的储存结构进行优化。
才是看了下相关系统 catior使用的是环状数据库,存储相关的数据优点方便生成MRTG图,缺点不利于数据统计。后来引入elasticsearch来对大数据检索进行优化。
测试平台
cpu: AMD Opteron(tm) Processor 6136 64bit 2.4GHz * 32 内存: 64G 硬盘:1.5T 操作系统:Red Hat Enterprise Linux Server release 6.4 (Santiago)
读取文件的目录结构:
[test@test001 data]$ ls 0 1 2 3
简单测试代码:
public class FileReader { private File file; private String splitCharactor; private Map<String, Class<?>> colNames; private static final Logger LOG = Logger.getLogger(FileReader.class); /** * @param path * 文件路径 * @param fileName * 文件名 * @param splitCharactor * 拆分字符 * @param colNames * 主键名称 */ public FileReader(File file, String splitCharactor, Map<String, Class<?>> colNames) { this.file = file; this.splitCharactor = splitCharactor; this.colNames = colNames; } /** * 读取文件 * * @return * @throws Exception */ public List<Map<String, Object>> readFile() throws Exception { List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); if (!file.isFile()) { throw new Exception("File not exists." + file.getName()); } LineIterator lineIterator = null; try { lineIterator = FileUtils.lineIterator(file, "UTF-8"); while (lineIterator.hasNext()) { String line = lineIterator.next(); String[] values = line.split(splitCharactor); if (colNames.size() != values.length) { continue; } Map<String, Object> map = new HashMap<String, Object>(); Iterator<Entry<String, Class<?>>> iterator = colNames.entrySet() .iterator(); int count = 0; while (iterator.hasNext()) { Entry<String, Class<?>> entry = iterator.next(); Object value = values[count]; if (!String.class.equals(entry.getValue())) { value = entry.getValue().getMethod("valueOf", String.class) .invoke(null, value); } map.put(entry.getKey(), value); count++; } list.add(map); } } catch (IOException e) { LOG.error("File reading line error." + e.toString(), e); } finally { LineIterator.closeQuietly(lineIterator); } return list; } }
public class StreamIntoEs { public static class ChildThread extends Thread { int number; public ChildThread(int number) { this.number = number; } @Override public void run() { Settings settings = ImmutableSettings.settingsBuilder() .put("client.transport.sniff", true) .put("client.transport.ping_timeout", 100) .put("cluster.name", "elasticsearch").build(); TransportClient client = new TransportClient(settings) .addTransportAddress(new InetSocketTransportAddress("192.168.32.228", 9300)); File dir = new File("/export/home/es/data/" + number); LinkedHashMap<String, Class<?>> colNames = new LinkedHashMap<String, Class<?>>(); colNames.put("aa", Long.class); colNames.put("bb", String.class); colNames.put("cc", String.class); colNames.put("dd", Integer.class); colNames.put("ee", Long.class); colNames.put("ff", Long.class); colNames.put("hh", Long.class); int count = 0; long startTime = System.currentTimeMillis(); for (File file : dir.listFiles()) { int currentCount = 0; long startCurrentTime = System.currentTimeMillis(); FileReader reader = new FileReader(file, "\\$", colNames); BulkResponse resp = null; BulkRequestBuilder bulkRequest = client.prepareBulk(); try { List<Map<String, Object>> results = reader.readFile(); for (Map<String, Object> col : results) { bulkRequest.add(client.prepareIndex("flux", "fluxdata") .setSource(JSON.toJSONString(col)).setId(col.get("getway")+"##"+col.get("port_info")+"##"+col.get("device_id")+"##"+col.get("collecttime"))); count++; currentCount++; } resp = bulkRequest.execute().actionGet(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } long endCurrentTime = System.currentTimeMillis(); System.out.println("[thread-" + number + "-]per count:" + currentCount); System.out.println("[thread-" + number + "-]per time:" + (endCurrentTime - startCurrentTime)); System.out.println("[thread-" + number + "-]per count/s:" + (float) currentCount / (endCurrentTime - startCurrentTime) * 1000); System.out.println("[thread-" + number + "-]per count/s:" + resp.toString()); } long endTime = System.currentTimeMillis(); System.out.println("[thread-" + number + "-]total count:" + count); System.out.println("[thread-" + number + "-]total time:" + (endTime - startTime)); System.out.println("[thread-" + number + "-]total count/s:" + (float) count / (endTime - startTime) * 1000); // IndexRequest request = // = client.index(request); } } public static void main(String args[]) { for (int i = 0; i < 4; i++) { ChildThread childThread = new ChildThread(i); childThread.start(); } } }
起了4个线程来做入库,每个文件解析完成进行一次批处理。
初始化脚本:
curl -XDELETE 'http://192.168.32.228:9200/twitter/'
curl -XPUT 'http://192.168.32.228:9200/twitter/' -d ' { "index" :{ "number_of_shards" : 5, "number_of_replicas ": 0, "index.refresh_interval": "-1", "index.translog.flush_threshold_ops": "100000" } }'
curl -XPUT 'http://192.168.32.228:9200/twiter/twiterdata/_mapping' -d ' { "twiterdata": { "aa" : {"type" : "long", "index" : "not_analyzed"}, "bb" : {"type" : "String", "index" : "not_analyzed"}, "cc" : {"type" : "String", "index" : "not_analyzed"}, "dd" : {"type" : "integer", "index" : "not_analyzed"}, "ee" : {"type" : "long", "index" : "no"}, "ff" : {"type" : "long", "index" : "no"}, "gg" : {"type" : "long", "index" : "no"}, "hh" : {"type" : "long", "index" : "no"}, "ii" : {"type" : "long", "index" : "no"}, "jj" : {"type" : "long", "index" : "no"}, "kk" : {"type" : "long", "index" : "no"}, } }
执行效率参考:
不开启refresh_interval [test@test001 bin]$ more StreamIntoEs.out|grep total [thread-2-]total count:1199411 [thread-2-]total time:1223718 [thread-2-]total count/s:980.1368 [thread-1-]total count:1447214 [thread-1-]total time:1393528 [thread-1-]total count/s:1038.5253 [thread-0-]total count:1508043 [thread-0-]total time:1430167 [thread-0-]total count/s:1054.4524 [thread-3-]total count:1650576 [thread-3-]total time:1471103 [thread-3-]total count/s:1121.9989 4195.1134 开启refresh_interval [test@test001 bin]$ more StreamIntoEs.out |grep total [thread-2-]total count:1199411 [thread-2-]total time:996111 [thread-2-]total count/s:1204.0938 [thread-1-]total count:1447214 [thread-1-]total time:1163207 [thread-1-]total count/s:1244.1586 [thread-0-]total count:1508043 [thread-0-]total time:1202682 [thread-0-]total count/s:1253.9 [thread-3-]total count:1650576 [thread-3-]total time:1236239 [thread-3-]total count/s:1335.1593 5037.3117 开启refresh_interval 字段类型转换 [test@test001 bin]$ more StreamIntoEs.out |grep total [thread-2-]total count:1199411 [thread-2-]total time:1065229 [thread-2-]total count/s:1125.9653 [thread-1-]total count:1447214 [thread-1-]total time:1218342 [thread-1-]total count/s:1187.8552 [thread-0-]total count:1508043 [thread-0-]total time:1230474 [thread-0-]total count/s:1225.5789 [thread-3-]total count:1650576 [thread-3-]total time:1274027 [thread-3-]total count/s:1295.5581 4834.9575 开启refresh_interval 字段类型转换 设置id [thread-2-]total count:1199411 [thread-2-]total time:912251 [thread-2-]total count/s:1314.7817 [thread-1-]total count:1447214 [thread-1-]total time:1067117 [thread-1-]total count/s:1356.1906 [thread-0-]total count:1508043 [thread-0-]total time:1090577 [thread-0-]total count/s:1382.7937 [thread-3-]total count:1650576 [thread-3-]total time:1128490 [thread-3-]total count/s:1462.6412 5516.4072
580M的数据平均用时大概是20分钟。索引文件大约为1.76G
相关测试结果可以参考这里:
elasticsearch 性能测试
相关推荐
Linux环境下使用sqlplus工具将oracle中的数据导入到elasticsearch中。只需要在es_bulk_tool.properties配置sql即可实现数据的批量导入。在elasticsearch6中测试通过。shell脚本需要使用sqlplus。
Elasticsearch(ES)是一种流行的开源全文搜索引擎,它基于Lucene库构建,被广泛用于大数据分析、日志聚合、实时搜索和索引等场景。官方提供的测试数据集是检验Elasticsearch功能、性能和稳定性的关键资源,可以帮助...
ElasticSearch数据导出 elasticsearch单文档数据导出 支持自定义查询 导出数据Json文件
Elasticsearch数据导出工具是一种高效实用的解决方案,它允许用户方便地从Elasticsearch(ES)集群中抽取数据,并将其导出到不同的目标,如MySQL数据库或本地文件系统。这款工具尤其适用于需要进行数据迁移、备份或...
**Elasticsearch官方提供数据案例account.json** Elasticsearch是一个高度可扩展的开源全文搜索和分析引擎,它允许用户快速地存储、搜索和分析大量数据。在这个案例中,"account.json"是Elasticsearch官方提供的一...
在本项目实战中,我们将探讨如何使用Java编程语言,结合Spark和Hive,将Hive中的数据高效地导入到ElasticSearch(ES)中,并利用ES的别名机制实现数据更新的平滑过渡。以下是对这个流程的详细解析: 1. **Hive数据...
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...
Elasticsearch(以下简称 ES)是一种基于 Lucene 的开源搜索引擎,广泛应用于大数据时代的搜索、日志分析和数据集成等领域。下面是基于给定的文件信息,生成的相关知识点。 一、ES 索引结构设计 ES 索引结构设计是...
因为你不知道将Hive的数据导入到了ElasticSearch后,数据量是否准确,所以需要钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量,注意,这个项目打包后,最好另起一个进程调用,并且开始时间为文章1或者2最大...
Elasticsearch(简称ES)是一款强大的开源搜索引擎,广泛应用于数据检索、分析和管理。作为分布式、RESTful风格的搜索和数据分析引擎,Elasticsearch能够提供实时、高可用性以及可扩展的搜索功能。在进行日常的数据...
ES 官方示例数据
在导入过程中,可能需要调整Elasticsearch的设置,如分片数量、副本数量、映射配置等,以适应大数据量的导入,并确保查询性能。 通过以上步骤,我们可以实现从ArcGIS Shapefile到Elasticsearch的有效迁移。这样的...
Elasticsearch(ES)是一种基于Lucene的分布式、RESTful搜索和分析引擎,常用于实时大数据的检索和分析。在本场景中,我们有三个测试数据集,分别代表不同的应用场景和数据类型,它们是logstash-*,account,以及...
随着您的数据和查询量的增长,Elasticsearch 的分布式特性使您的部署能够随之无缝增长。Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分...
本设计源码提供了一个基于Java的Elasticsearch数据同步迁移工具。项目包含29个文件,主要使用Java和Shell编程语言。文件类型包括7个Java源代码文件、5个BAT批处理文件、5个Shell脚本文件、3个XML配置文件、2个...
5. **安装elasticsearch-hadoop库**:为了实现Hive与Elasticsearch之间的数据互通,需要安装elasticsearch-hadoop库。可以使用Maven或直接将jar包放置在合适的位置。 #### Hive端操作 接下来介绍如何在Hive端创建...
在描述中提到,“es+springboot+mysql 实现mysql数据同步es,然后查询es数据各种demo实现”,这表明项目旨在实现在MySQL数据库和Elasticsearch之间进行数据同步,同时提供了查询Elasticsearch数据的各种示例代码。...
Elasticsearch测试数据,3160条商品数据 数据导入命令: curl -XPOST 'localhost:9200/pditems/_bulk' -H 'Content-Type:application/json' --data-binary @pditems.json
Elasticsearch是一个强大的分布式搜索引擎,而Spring Boot是Java开发中的轻量级框架,两者结合使得开发过程中对Elasticsearch的集成变得简单高效。 首先,我们需要在Spring Boot项目中引入Elasticsearch的相关依赖...
ElasticSearch测试数据