转载自: http://mogu.io/search-instance-pre-treatment-24
搜索平台化中预热的需求描述为:当线上集群需要水平扩容时,新部署实例的机器需要用已存在的日志进行预查询,其目的是将用户的常用查询加载到扩容机内存中,最终对外开放时以提高缓存命中率,减少搜索RT
比如现在要用A机器上的日志对B机器进行预热,实现很简单:
步骤一、拿到A机器上的日志,%TOMCAT_HOME%/logs/catalina.2014-06-21.log
默认取昨天生成好的日志文件,不存在就向前推,直到找不到报错。然后对日志文件过滤
awk -F'{|}' '{/param/; if (count>100000) exit; if ($2 != "") {count++; print $2 > "preload.log";};}' catalina.2014-06-21.log
其中用关键字“param”过滤行文本,再以分隔符“{”或“}”解析后将第二个字段重定向到preload.log文件下,取前10万行处理结果得到preload.log如下
步骤二、将A机器上生成的preload.log文件scp到平台机,通过平台机读取文件并向B机器查询,代码如
- 首先读取文件到到内存
private List readPreloadFile(File preloadLog) {
List ret = Lists.newArrayList();
BufferedReader br = null;
String s = null;
try {
br = new BufferedReader(new FileReader(preloadLog));
while((s = br.readLine()) != null) {
ret.add(s);
}
} catch (IOException e) {
} finally {
if (br != null) {
IOUtils.closeQuietly(br);
}
}
return ret;
}
考虑到通过NIO中的MappedByteBuffer直接将整个文件映射到内存可以避免一条条read的io开销,尝试如下
private List readPreloadFile(File preloadLog) {
List logs = Lists.newArrayList();
FileChannel fc = null;
MappedByteBuffer fout = null;
byte[] ret = null;
try {
fc = new RandomAccessFile(preloadLog, "r").getChannel();
fout = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
ret = new byte[(int)fc.size()];
fout.get(ret);
} catch (FileNotFoundException e) {
} catch (IOException e) {
} finally {
fout = null;
if (fc != null) {
IOUtils.closeQuietly(fc);
}
}
logs = Arrays.asList(StringUtils.split(new String(ret), '\n'));
return logs;
}
实际对10万数据测试结果显示后者452ms,比前者305ms还差,发现主要还是耗在将byte数组转化为字符串(243ms)和split动作(164)ms
注意,以上两种方式都是一次性将日志文件的内容读到内存,现在10万条线上日志大约40M,考虑到后续会加大预热量以及同时预热多台机器,这样可直接导致OOM,所以更佳的方式应该从文件中读一条预热一条
-
将第一步返回的结果分解给多个线程去查询
1)定义callable类型的线程,返回该线程总共处理的查询次数class PreloadHandler implements Callable { private String name; private String solrPath; private List subQueryLogs;
private ThreadLocal<Integer> localDoneNum = new ThreadLocal<Integer>(){ @Override protected Integer initialValue() { return 0; } }; private ThreadLocal<Integer> localErrNum = new ThreadLocal<Integer>(){ @Override protected Integer initialValue() { return 0; } }; PreloadHandler(String name, String solrPath, List<String> subQueryLogs) { this.name = name; this.solrPath = solrPath; this.subQueryLogs = subQueryLogs; } public Integer call() throws Exception { for (String queryLog : subQueryLogs) { if (!isRunning) { break; } String url = solrPath + queryLog; try { HttpUtil.httpGet(url); //真正查询处理 } catch (Exception e) { errNum.getAndIncrement(); int _errNum = localErrNum.get(); localErrNum.set(++_errNum); } doneNum.getAndIncrement(); int _doneNum = localDoneNum.get(); localDoneNum.set(++_doneNum); } return localDoneNum.get(); }
构造函数中传入三个参数,分别是线程名称,如“搜索实例名_机器B_handler_线程序号”;查询路径solrPath,如“http://机器B的ip:端口号port/solr/搜索实例名/select?”;分解后的子日志队列
localErrNum和localDoneNum是线程处理失败和完成的查询计数;errNum和doneNum是多个线程总的处理失败和完成的查询计数。前者定义为threadlocal,后者必须是atomic类型的
2)主线程中进行分解和子任务提交
3)因为异步的,页面触发action后就直接返回了。所以主线程中需要初始化一些状态信息并保存下来以供ajax查询public void run() { isRunning = true; String solrPath = String.format("http://%s:%s/solr/%s/select?",host.getInnerIp(), port, instance.getInstanceName()); List<list> subList = Lists.partition(queryLogs, queryLogs.size() / handlers); futures = Lists.newArrayList(); for (int i = 0; i < handlers; i++) { futures.add(threadPool.submit( new PreloadHandler(Joiner.on("_").join(instance.getInstanceName(), host.getHostName(), "handler", (i + 1)), solrPath, subList.get(i)))); } }
private static Map<string, preload=""> statCache = Maps.newConcurrentMap();
public static void registor(String key, Preload preload) {
statCache.put(key, preload);
}public static void unRegistor(String key) {
statCache.remove(key);
}public void init() throws IOException {
queryLogs = readPreloadFile(preloadLog);
this.threadPool = Executors.newFixedThreadPool(handlers);
totalNum = new AtomicInteger(queryLogs.size());
doneNum = new AtomicInteger(0);
errNum = new AtomicInteger(0);registor( Joiner.on("_").join(instance.getInstanceName(), host.getHostName()), this);
}
并提供取消正在执行任务的能力public void cancel() { isRunning = false; for (Future f : futures) { f.cancel(true); } }
public void close() {
isRunning = false;
if (threadPool != null) {
threadPool.shutdown();
}
}
这里的isRunning变量必须是volatile的,因为当停止主线程时可以保证对子任务可见从而退出正在执行的子任务 -
主线程类和“搜索实例+预热机器”进行绑定,外界通过如下查询预热状态
public static Preload getPreload(String instanceName, String hostName) { return statCache.get(Joiner.on("_").join(instanceName, hostName)); }
综上所述便是整个Preload过程,下面给出预热一台机器的演示结果:
总共50万条日志,5个线程并发预热,结果显示了每个线程处理的日志和错误数以及总的处理进度
相关推荐
通过本教程,用户可以系统地学习如何使用Solr搭建企业搜索引擎,并通过一系列实例和方法对Solr搜索引擎进行调优和问题排查。Apache Solr不仅可以帮助企业快速实现搜索功能,还能够通过定制和优化,满足不同企业的...
- **多核(MultiCore)配置**:Solr的多核配置允许在一个Solr实例中运行多个独立的核心,每个核心可以有自己的索引和配置,适合处理不同类型的搜索需求。 #### 配置文件说明 - **schema.xml**:定义了索引的字段...
10. **性能优化**:性能优化方面,可以通过调整索引设置、合理分配节点资源、使用倒排索引、预热缓存等方式提升搜索效率。 11. **安全与监控**:Elasticsearch提供X-Pack插件(现已被Kibana的Security和Monitoring...
使用SolrJ时,可调用搜索接口实例,用SolrJ添加数据和读取数据,创建查询,以及使用SolrJ创建索引。同时也涉及SolrJ包的结构说明。 在实际应用测试报告中,包含了线下压力测试报告和线上环境运行报告。 性能调优...
- 搜索接口的调用实例:实际演示如何调用Solr的搜索接口。 - Solrj的使用说明:介绍了如何使用SolrJ添加数据、直接添加POJO到Solr和从Solr读取数据。 - 创建查询:说明了如何创建查询并使用SolrJ进行索引的创建。 5...
**4.1 搜索接口的调用实例** 示例代码展示如何使用SolrJ API执行搜索操作。 **4.2 Solrj的使用说明** - **4.2.1 AddingDatatoSolr** 通过SolrJ API添加数据到Solr。 - **4.2.2 DirectlyaddingPOJOstoSolr** 将...
- **预热机制**:提前将热门商品加入缓存中减少实际秒杀时的数据加载时间; - **验证码验证**:增加图形验证码或滑动验证码环节过滤恶意刷单程序; - **快速失败响应**:当库存不足时立即返回失败结果而不是让用户...
8. **数据结构与算法**:熟悉Python内置的数据结构,如列表、字典和集合的操作,并学习基本的排序和搜索算法,如冒泡排序、选择排序、快速排序、二分查找等。 9. **标准库的使用**:如Numpy用于数值计算,Pandas...
10. 预热干预机制:预先处理和优化商情信息,提高B2B平台的友好度,使得信息推送更为有效,同时提升搜索引擎的友好性。 11. 智能识别功能:具备先进的验证码识别技术,能自动识别大部分验证码和智能提示问题,自动...
在企业级应用中,为了提升用户体验并更好地处理大量数据的检索需求,选择合适的搜索引擎解决方案至关重要。通常有几种常见的方案: 1. **基于Lucene自封装实现站内搜索**:这种方式需要较大的开发投入,并且在后续...
##### 4.1 搜索接口的调用实例 - **4.2 Solrj的使用说明**:通过示例代码展示如何使用Solrj。 - **4.2.1 AddingData to Solr**:如何使用Solrj添加数据到索引。 - **4.2.2 Directly adding POJOs to Solr**:如何...
在IT领域,Elasticsearch(ES)是一种广泛使用的开源全文搜索引擎,它以其高效、可扩展性和实时性而闻名。在处理大数据时,批量写入数据是优化性能的关键操作。本篇文章将深入探讨如何批量写入数据到Elasticsearch,...
8. **多字段类型**:Solr允许为不同的字段定义不同的类型,如text字段用于全文搜索,date字段用于时间戳处理,提高搜索精度和灵活性。 9. **复制和恢复**:SolrCloud支持数据的自动复制和恢复机制,确保在节点故障...
3. **Search搜索服务**: 提供高效搜索功能,支持海量数据检索。 4. **Network Selector网络选择器**: 用于优化网络通信,确保数据传输效率。 5. **News Feed新闻动态**: Facebook的核心功能之一,基于用户兴趣推荐...
1. **节点(Node)**: Elasticsearch集群中的单个实例,可以是分布式系统中的任意一台服务器。 2. **集群(Cluster)**: 由一个或多个节点组成,共享相同的集群名称,共同存储数据并处理请求。 3. **索引(Index)**: 类似...