`
BradyZhu
  • 浏览: 258399 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

内存队列批量导入日志到搜索引擎

 
阅读更多

最近着手处理大批量数据的任务。从文本文件中导入数据。到搜索服务器存储。

为了提升性能,用的java并发包中的阻塞双端队列LinkedBlockingDeque。

生产者线程 读取数据。 消费者 从队列中取出数据 提交到搜索引擎

package com.lubanec.cache.model;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrInputDocument;

import com.lubanec.factory.muticore.SinglerSolrServerFactory;

/**
 * @author 胡慧超
 * 消费者队列线程
 * 获取队列的数据  提交给搜索服务器
 */
public class LogConsumerThread extends Thread {
	private static Logger log=Logger.getLogger(LogConsumerThread.class);
	private SolrServer server = SinglerSolrServerFactory.getInstance().getHotwordSolrServer();
	//控制队列是否继续
	public boolean running = true;
	// 生产者消费者内存队列
	private LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque; 
	private static final AtomicInteger commitNum=new AtomicInteger();
	private static final AtomicInteger memoryNum=new AtomicInteger();
	private static final AtomicInteger tatolNum=new AtomicInteger();

	public LogConsumerThread(LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque) {
		this.linkedBlockingDeque = linkedBlockingDeque;
	}
	@Override
	public void run() {
		try {
			LogDataLock lock = LogDataLock.getInstance();
			SolrInputDocument doc = null;			
			// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。
			while ((doc=linkedBlockingDeque.poll(5,TimeUnit.SECONDS))!= null && running) {
				tatolNum.incrementAndGet();
				//server.add(doc);  --这是线程安全的
				//操作同一个单例对象时,list是线程不安全的
				synchronized (lock){	
					memoryNum.incrementAndGet();
					lock.doclist.add(doc);		
					//设置批量提交数据量为20000
					if(memoryNum.get()>20000){
						
						server.add(lock.doclist);
						//提交之后,重置list
						lock.doclist.clear();
						//重置开始时间
						memoryNum.set(0);
						commitNum.incrementAndGet();
					}
				}
			}
			synchronized (lock){	
				if(lock.doclist.size()>0){
					server.add(lock.doclist);
					//提交之后,重置list
					lock.doclist.clear();
					memoryNum.set(0);
					commitNum.incrementAndGet();
				}
			}
			log.info("consume"+Thread.currentThread().getName()+" 退出  doc: " + doc + ", running  is " + running+",提交搜索引擎次数:"+commitNum.get()+",内存队列中数据:"+memoryNum.get()+"处理总数据量:"+tatolNum.get());
			//杀死线程之前。。还原commitNum,memoryNum,tatolNum,以便在一次性web任务中重置统计。
			commitNum.set(0);
			memoryNum.set(0);
			tatolNum.set(0);
		} catch (Exception e) {
            e.printStackTrace();
		}
	}

}

package com.lubanec.cache.model;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.log4j.Logger;
import org.apache.solr.common.SolrInputDocument;

import com.lubanec.utils.Constants;
import com.lubanec.utils.JsonWs;

/**
 * @author 胡慧超
 * 生产者队列线程
 * 生产组装队列的数据  
 */
@SuppressWarnings("rawtypes")
public class LogProductThread extends Thread{
	
	private static Logger log=Logger.getLogger(LogProductThread.class);
	private final static Pattern FILTERS = Pattern.compile(Constants.USER_SEARCH_LOG_DEFAULT);
	private static final AtomicInteger lognum=new AtomicInteger();
	private static final AtomicInteger historyall=new AtomicInteger();  //历史所有日志数量
	// 生产者消费者内存队列
	private LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque; 
	// 文件夹
	private File dir ;
	private String logname;


	public LogProductThread(LinkedBlockingDeque<SolrInputDocument> linkedBlockingDeque, File dir,String logname) {
		this.linkedBlockingDeque = linkedBlockingDeque;
		this.dir=dir;
		this.logname=logname;
	}

	@Override
	public void run() {
		/*
		 * 读取该目录下面所有日志
		 */
		for (File file : dir.listFiles()) {
			if (!file.isFile()) {
				log.error("File not exists." + file.getName());
				continue;
			}
			if(!"all".equals(logname)&&!file.getName().endsWith(logname)){
				continue;
			}
			LineIterator lineIterator = null;
			try {
				lineIterator = FileUtils.lineIterator(file, "UTF-8");
				while (lineIterator.hasNext()) {
					String line = (String) lineIterator.next();
					//匹配正则
					if(!FILTERS.matcher(line).matches()){
						continue;
					}
					String timetemp=line.split("\\|")[0];		
					String searchinfo=line.substring(timetemp.length()+1);
					Map map=JsonWs.parseStringToMap(searchinfo);
					SolrInputDocument doc=new SolrInputDocument();
					doc.addField("timetemp", timetemp.replaceAll("-", "").replaceAll(" ", "").replaceAll(":", "").substring(0, 14));	
					doc.addField("keyword", map.get("keyword")!=null?map.get("keyword"):"");
					doc.addField("cate_id", map.get("cate_id")!=null?map.get("cate_id"):"");
					doc.addField("brand", map.get("brand")!=null?map.get("brand"):"");
					lognum.incrementAndGet() ;
					historyall.incrementAndGet();
					linkedBlockingDeque.add(doc);
				}
			} catch (IOException e) {
				log.error("File reading line error." + e.toString(), e);
			} finally {
				LineIterator.closeQuietly(lineIterator);
			}
		}	
		log.info("product 退出 ,读取目录"+dir.getName()+"完毕!该目录下,需读取的总日志数量:"+lognum.get()+";历史总导入日志数据量:"+historyall.incrementAndGet());		
		lognum.set(0);
	}
	
}


分享到:
评论

相关推荐

    Webmagic爬取数据导入到ES

    Elasticsearch是一种分布式、RESTful风格的搜索和分析引擎,适用于各种数据存储和检索场景。在Webmagic中,可以使用Elasticsearch的Java API来实现数据的存储。 首先,确保你的项目中已经添加了Elasticsearch的依赖...

    solr增量导入更新索引包

    Solr,作为一款流行的开源全文搜索引擎,经常被用于大规模数据的快速检索。增量导入更新索引包是Solr中的一项重要功能,它允许系统仅处理自上次完整索引以来发生改变的数据,从而大大提升了效率并降低了资源消耗。...

    ElasticSearch数据迁移与容灾实践.docx

    ElasticSearch(ES)是一款基于 Lucene 打造的分布式搜索引擎,广泛应用于搜索、日志、APM、IOT 等领域。为满足不同数据源与 ES 之间的数据导入导出需求,ES 提供了多种异构数据同步方式。 一、异构数据与 ES 同步 ...

    python消费kafka数据批量插入到es的方法

    随着大数据技术的发展与广泛应用,如何高效地将大量数据从消息队列(如Kafka)导入到搜索引擎(如Elasticsearch,简称ES)成为了一个重要的课题。本文将详细介绍如何利用Python来实现这一过程,并通过具体的代码示例...

    数据接入ElasticSearch方式培训PPT

    数据接入ElasticSearch是现代大数据处理中的重要环节,它涉及到如何高效、稳定地将各种类型的数据导入到Elasticsearch这个强大的全文搜索引擎中。Elasticsearch因其高性能、分布式、可扩展性和实时性,常被用于日志...

    电子商务网站功能模块汇总.pdf

    商品分类和品牌管理、SEO优化、商品导入导出等功能则有助于商品信息的规范化和搜索引擎优化,增强商品的可见性。 二、订单管理 订单管理涉及订单的创建、状态跟踪、标签分类、筛选导出等,便于商家高效处理订单。...

    【54】2018年最新价值799元Elasticsearch顶尖高手系列:高手进阶篇视频教程 .txt

    - **批量导入技术**:研究如何使用Bulk API高效地将大量数据导入到Elasticsearch中。 - **数据导出方法**:介绍常用的导出工具和API,便于从Elasticsearch中快速获取数据。 - **实时数据流处理**:学习如何通过Kafka...

    CDHHDPMAPRDKH星环组件比较.pdf

    8. **Sqoop**:Sqoop是一个用于在关系型数据库和Hadoop之间迁移数据的工具,简化了批量数据导入导出。 9. **Flume**:Flume是Cloudera设计的用于收集、聚合和移动大量日志数据的工具,常用于数据流入Hadoop。 10. ...

    h2 database pdf

    - **导入数据**:从 CSV 文件批量导入数据到 H2 数据库。 - **导出数据**:将查询结果导出为 CSV 文件格式。 #### 升级、备份与恢复 - **升级数据库**:通过 SQL 脚本进行数据库的升级操作。 - **备份**:使用 H2 ...

    基于SpringBoot+ElasticSearch+vue.js开发的大数据营销系统.zip

    4. 数据处理:利用SpringBoot的定时任务或者消息队列(如RabbitMQ)处理批量数据导入、实时数据分析等任务。 六、项目实战 该压缩包中的“ok_x”可能包含了项目的源代码、配置文件、数据库脚本等资源。通过这些资源...

    ansible-playbooks.7z

    2. **Elasticsearch 安装**:Elasticsearch 是一个分布式全文搜索引擎,常用于日志分析、实时数据分析等场景。`es_install` 的 playbook 可能会涵盖下载和安装 Elasticsearch,配置节点间通信、索引设置、内存和磁盘...

    Java思维导图xmind文件+导出图片

    分布式搜索引擎 应用发布与监控 应用容灾及机房规划 系统动态扩容 分布式架构策略-分而治之 从简到难,从网络通信探究分布式通信原理 基于消息方式的系统间通信 理解通信协议传输过程中的序列化和反序列化...

    大数据图标大全.docx

    10. **Sqoop**: 用于在Hadoop和关系数据库之间高效传输批量数据的工具,支持导入导出操作。 11. **Elasticsearch**: 分布式搜索和分析系统,常用于日志分析、信息检索和实时分析。 12. **Logstash**: 数据收集、...

    大数据学习路线+知识点大梳理.pdf

    以上内容覆盖了大数据领域中的关键技术知识点,包括数据存储技术、搜索引擎、分布式文件系统、分布式数据库、数据仓库、数据采集以及数据迁移等方面。这些知识点对于理解和掌握大数据领域的技术体系至关重要。

Global site tag (gtag.js) - Google Analytics