最近着手处理大批量数据的任务。从文本文件中导入数据。到搜索服务器存储。
为了提升性能,用的java并发包中的阻塞双端队列LinkedBlockingDeque。
生产者线程 读取数据。 消费者 从队列中取出数据 提交到搜索引擎
package com.lubanec.cache.model;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrInputDocument;
import com.lubanec.factory.muticore.SinglerSolrServerFactory;
/**
* @author 胡慧超
* 消费者队列线程
* 获取队列的数据 提交给搜索服务器
*/
public class LogConsumerThread extends Thread {
private static Logger log=Logger.getLogger(LogConsumerThread.class);
private SolrServer server = SinglerSolrServerFactory.getInstance().getHotwordSolrServer();
//控制队列是否继续
public boolean running = true;
// 生产者消费者内存队列
private LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque;
private static final AtomicInteger commitNum=new AtomicInteger();
private static final AtomicInteger memoryNum=new AtomicInteger();
private static final AtomicInteger tatolNum=new AtomicInteger();
public LogConsumerThread(LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque) {
this.linkedBlockingDeque = linkedBlockingDeque;
}
@Override
public void run() {
try {
LogDataLock lock = LogDataLock.getInstance();
SolrInputDocument doc = null;
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。
while ((doc=linkedBlockingDeque.poll(5,TimeUnit.SECONDS))!= null && running) {
tatolNum.incrementAndGet();
//server.add(doc); --这是线程安全的
//操作同一个单例对象时,list是线程不安全的
synchronized (lock){
memoryNum.incrementAndGet();
lock.doclist.add(doc);
//设置批量提交数据量为20000
if(memoryNum.get()>20000){
server.add(lock.doclist);
//提交之后,重置list
lock.doclist.clear();
//重置开始时间
memoryNum.set(0);
commitNum.incrementAndGet();
}
}
}
synchronized (lock){
if(lock.doclist.size()>0){
server.add(lock.doclist);
//提交之后,重置list
lock.doclist.clear();
memoryNum.set(0);
commitNum.incrementAndGet();
}
}
log.info("consume"+Thread.currentThread().getName()+" 退出 doc: " + doc + ", running is " + running+",提交搜索引擎次数:"+commitNum.get()+",内存队列中数据:"+memoryNum.get()+"处理总数据量:"+tatolNum.get());
//杀死线程之前。。还原commitNum,memoryNum,tatolNum,以便在一次性web任务中重置统计。
commitNum.set(0);
memoryNum.set(0);
tatolNum.set(0);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.lubanec.cache.model;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.log4j.Logger;
import org.apache.solr.common.SolrInputDocument;
import com.lubanec.utils.Constants;
import com.lubanec.utils.JsonWs;
/**
* @author 胡慧超
* 生产者队列线程
* 生产组装队列的数据
*/
@SuppressWarnings("rawtypes")
public class LogProductThread extends Thread{
private static Logger log=Logger.getLogger(LogProductThread.class);
private final static Pattern FILTERS = Pattern.compile(Constants.USER_SEARCH_LOG_DEFAULT);
private static final AtomicInteger lognum=new AtomicInteger();
private static final AtomicInteger historyall=new AtomicInteger(); //历史所有日志数量
// 生产者消费者内存队列
private LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque;
// 文件夹
private File dir ;
private String logname;
public LogProductThread(LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque, File dir,String logname) {
this.linkedBlockingDeque = linkedBlockingDeque;
this.dir=dir;
this.logname=logname;
}
@Override
public void run() {
/*
* 读取该目录下面所有日志
*/
for (File file : dir.listFiles()) {
if (!file.isFile()) {
log.error("File not exists." + file.getName());
continue;
}
if(!"all".equals(logname)&&!file.getName().endsWith(logname)){
continue;
}
LineIterator lineIterator = null;
try {
lineIterator = FileUtils.lineIterator(file, "UTF-8");
while (lineIterator.hasNext()) {
String line = (String) lineIterator.next();
//匹配正则
if(!FILTERS.matcher(line).matches()){
continue;
}
String timetemp=line.split("\\|")[0];
String searchinfo=line.substring(timetemp.length()+1);
Map map=JsonWs.parseStringToMap(searchinfo);
SolrInputDocument doc=new SolrInputDocument();
doc.addField("timetemp", timetemp.replaceAll("-", "").replaceAll(" ", "").replaceAll(":", "").substring(0, 14));
doc.addField("keyword", map.get("keyword")!=null?map.get("keyword"):"");
doc.addField("cate_id", map.get("cate_id")!=null?map.get("cate_id"):"");
doc.addField("brand", map.get("brand")!=null?map.get("brand"):"");
lognum.incrementAndGet() ;
historyall.incrementAndGet();
linkedBlockingDeque.add(doc);
}
} catch (IOException e) {
log.error("File reading line error." + e.toString(), e);
} finally {
LineIterator.closeQuietly(lineIterator);
}
}
log.info("product 退出 ,读取目录"+dir.getName()+"完毕!该目录下,需读取的总日志数量:"+lognum.get()+";历史总导入日志数据量:"+historyall.incrementAndGet());
lognum.set(0);
}
}
分享到:
相关推荐
Elasticsearch是一种分布式、RESTful风格的搜索和分析引擎,适用于各种数据存储和检索场景。在Webmagic中,可以使用Elasticsearch的Java API来实现数据的存储。 首先,确保你的项目中已经添加了Elasticsearch的依赖...
Solr,作为一款流行的开源全文搜索引擎,经常被用于大规模数据的快速检索。增量导入更新索引包是Solr中的一项重要功能,它允许系统仅处理自上次完整索引以来发生改变的数据,从而大大提升了效率并降低了资源消耗。...
ElasticSearch(ES)是一款基于 Lucene 打造的分布式搜索引擎,广泛应用于搜索、日志、APM、IOT 等领域。为满足不同数据源与 ES 之间的数据导入导出需求,ES 提供了多种异构数据同步方式。 一、异构数据与 ES 同步 ...
随着大数据技术的发展与广泛应用,如何高效地将大量数据从消息队列(如Kafka)导入到搜索引擎(如Elasticsearch,简称ES)成为了一个重要的课题。本文将详细介绍如何利用Python来实现这一过程,并通过具体的代码示例...
数据接入ElasticSearch是现代大数据处理中的重要环节,它涉及到如何高效、稳定地将各种类型的数据导入到Elasticsearch这个强大的全文搜索引擎中。Elasticsearch因其高性能、分布式、可扩展性和实时性,常被用于日志...
商品分类和品牌管理、SEO优化、商品导入导出等功能则有助于商品信息的规范化和搜索引擎优化,增强商品的可见性。 二、订单管理 订单管理涉及订单的创建、状态跟踪、标签分类、筛选导出等,便于商家高效处理订单。...
- **批量导入技术**:研究如何使用Bulk API高效地将大量数据导入到Elasticsearch中。 - **数据导出方法**:介绍常用的导出工具和API,便于从Elasticsearch中快速获取数据。 - **实时数据流处理**:学习如何通过Kafka...
8. **Sqoop**:Sqoop是一个用于在关系型数据库和Hadoop之间迁移数据的工具,简化了批量数据导入导出。 9. **Flume**:Flume是Cloudera设计的用于收集、聚合和移动大量日志数据的工具,常用于数据流入Hadoop。 10. ...
- **导入数据**:从 CSV 文件批量导入数据到 H2 数据库。 - **导出数据**:将查询结果导出为 CSV 文件格式。 #### 升级、备份与恢复 - **升级数据库**:通过 SQL 脚本进行数据库的升级操作。 - **备份**:使用 H2 ...
4. 数据处理:利用SpringBoot的定时任务或者消息队列(如RabbitMQ)处理批量数据导入、实时数据分析等任务。 六、项目实战 该压缩包中的“ok_x”可能包含了项目的源代码、配置文件、数据库脚本等资源。通过这些资源...
2. **Elasticsearch 安装**:Elasticsearch 是一个分布式全文搜索引擎,常用于日志分析、实时数据分析等场景。`es_install` 的 playbook 可能会涵盖下载和安装 Elasticsearch,配置节点间通信、索引设置、内存和磁盘...
分布式搜索引擎 应用发布与监控 应用容灾及机房规划 系统动态扩容 分布式架构策略-分而治之 从简到难,从网络通信探究分布式通信原理 基于消息方式的系统间通信 理解通信协议传输过程中的序列化和反序列化...
10. **Sqoop**: 用于在Hadoop和关系数据库之间高效传输批量数据的工具,支持导入导出操作。 11. **Elasticsearch**: 分布式搜索和分析系统,常用于日志分析、信息检索和实时分析。 12. **Logstash**: 数据收集、...
以上内容覆盖了大数据领域中的关键技术知识点,包括数据存储技术、搜索引擎、分布式文件系统、分布式数据库、数据仓库、数据采集以及数据迁移等方面。这些知识点对于理解和掌握大数据领域的技术体系至关重要。