- 浏览: 183828 次
- 性别:
- 来自: 武汉
最新评论
使用DataImportHandler进行简单数据导入还是比较有效的,特别是DIH中针对简单的数据库表,可以把完全导入和增量导入合并成一个语句,非常方便。我的使用方式如下所示
1。配置schema
<requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler"> <lst name="defaults"> <str name="config">/home/tomcat/bin/solr/conf/data-config.xml</str> </lst> </requestHandler>
2.添加data-config文件
data-config.xml
<dataConfig> <dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver" url="jdbc:mysql://127.0.0.1/db" user="root" password="pass" batchSize="-1"/> <document> <entity name="id" pk="id" query="select id,username,text,cat from hot where '${dataimporter.request.clean}' != 'false' OR timestamp > '${dataimporter.last_index_time}'"> <field column="id" name="id"/> <field column="text" name="text"/> <field column="username" name="username_s"/> <field column="cat" name="cat_t"/> </entity> </document> </dataConfig>
3.让DIH周期性的运行
修改dataimport.properties文件,这个是自动生成的,同在solr/conf下,添加参数
interval 间隔时间 单位 分钟
syncEnabled=1 打开周期运行
params 其实就是具体调用的url,周期运行就是周期性的访问一个url
#Wed Dec 28 09:29:42 UTC 2011 port=8983 interval=5 last_index_time=2011-12-28 09\:29\:26 syncEnabled=1 webapp=solr id.last_index_time=2011-12-28 09\:29\:26 server=127.0.0.1 params=/select?qt\=/dataimport&command\=full-import&clean\=false&commit\=true&optimize\=false
到此还并不能周期运行,在solr的wiki中有一段实现这个功能的代码,但并没有加入到solr的发行包中,于是我们需要重新编译这段代码,打包放到webapp/solr/WEB-INF/lib中才行
<web-app> <listener> <listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class> </listener> ... </web-app>
以下是solr wiki上周期运行的代码,我已打好包,放在附件里。
package org.apache.solr.handler.dataimport.scheduler; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Properties; import org.apache.solr.core.SolrResourceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SolrDataImportProperties { private Properties properties; public static final String SYNC_ENABLED = "syncEnabled"; public static final String SYNC_CORES = "syncCores"; public static final String SERVER = "server"; public static final String PORT = "port"; public static final String WEBAPP = "webapp"; public static final String PARAMS = "params"; public static final String INTERVAL = "interval"; private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class); public SolrDataImportProperties(){ // loadProperties(true); } public void loadProperties(boolean force){ try{ SolrResourceLoader loader = new SolrResourceLoader(null); logger.info("Instance dir = " + loader.getInstanceDir()); String configDir = loader.getConfigDir(); configDir = SolrResourceLoader.normalizeDir(configDir); if(force || properties == null){ properties = new Properties(); String dataImportPropertiesPath = configDir + "\\dataimport.properties"; FileInputStream fis = new FileInputStream(dataImportPropertiesPath); properties.load(fis); } }catch(FileNotFoundException fnfe){ logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe); }catch(IOException ioe){ logger.error("Error reading DataImportScheduler dataimport.properties file", ioe); }catch(Exception e){ logger.error("Error loading DataImportScheduler properties", e); } } public String getProperty(String key){ return properties.getProperty(key); } }
package org.apache.solr.handler.dataimport.scheduler; import java.util.Calendar; import java.util.Date; import java.util.Timer; import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ApplicationListener implements ServletContextListener { private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class); @Override public void contextDestroyed(ServletContextEvent servletContextEvent) { ServletContext servletContext = servletContextEvent.getServletContext(); // get our timer from the context Timer timer = (Timer)servletContext.getAttribute("timer"); // cancel all active tasks in the timers queue if (timer != null) timer.cancel(); // remove the timer from the context servletContext.removeAttribute("timer"); } @Override public void contextInitialized(ServletContextEvent servletContextEvent) { ServletContext servletContext = servletContextEvent.getServletContext(); try{ // create the timer and timer task objects Timer timer = new Timer(); HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer); // get our interval from HTTPPostScheduler int interval = task.getIntervalInt(); // get a calendar to set the start time (first run) Calendar calendar = Calendar.getInstance(); // set the first run to now + interval (to avoid fireing while the app/server is starting) calendar.add(Calendar.MINUTE, interval); Date startTime = calendar.getTime(); // schedule the task timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval); // save the timer in context servletContext.setAttribute("timer", timer); } catch (Exception e) { if(e.getMessage().endsWith("disabled")){ logger.info("Schedule disabled"); }else{ logger.error("Problem initializing the scheduled task: ", e); } } } }
package org.apache.solr.handler.dataimport.scheduler; import java.io.IOException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Timer; import java.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HTTPPostScheduler extends TimerTask { private String syncEnabled; private String[] syncCores; private String server; private String port; private String webapp; private String params; private String interval; private String cores; private SolrDataImportProperties p; private boolean singleCore; private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class); public HTTPPostScheduler(String webAppName, Timer t) throws Exception{ //load properties from global dataimport.properties p = new SolrDataImportProperties(); reloadParams(); fixParams(webAppName); if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled"); if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){ singleCore = true; logger.info("<index update process> Single core identified in dataimport.properties"); }else{ singleCore = false; logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: " + cores); } } private void reloadParams(){ p.loadProperties(true); syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED); cores = p.getProperty(SolrDataImportProperties.SYNC_CORES); server = p.getProperty(SolrDataImportProperties.SERVER); port = p.getProperty(SolrDataImportProperties.PORT); webapp = p.getProperty(SolrDataImportProperties.WEBAPP); params = p.getProperty(SolrDataImportProperties.PARAMS); interval = p.getProperty(SolrDataImportProperties.INTERVAL); syncCores = cores != null ? cores.split(",") : null; } private void fixParams(String webAppName){ if(server == null || server.isEmpty()) server = "localhost"; if(port == null || port.isEmpty()) port = "8080"; if(webapp == null || webapp.isEmpty()) webapp = webAppName; if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = "30"; } public void run() { try{ // check mandatory params if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){ logger.warn("<index update process> Insuficient info provided for data import"); logger.info("<index update process> Reloading global dataimport.properties"); reloadParams(); // single-core }else if(singleCore){ prepUrlSendHttpPost(); // multi-core }else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){ logger.warn("<index update process> No cores scheduled for data import"); logger.info("<index update process> Reloading global dataimport.properties"); reloadParams(); }else{ for(String core : syncCores){ prepUrlSendHttpPost(core); } } }catch(Exception e){ logger.error("Failed to prepare for sendHttpPost", e); reloadParams(); } } private void prepUrlSendHttpPost(){ String coreUrl = "http://" + server + ":" + port + "/" + webapp + params; sendHttpPost(coreUrl, null); } private void prepUrlSendHttpPost(String coreName){ String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params; sendHttpPost(coreUrl, coreName); } private void sendHttpPost(String completeUrl, String coreName){ DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS"); Date startTime = new Date(); // prepare the core var String core = coreName == null ? "" : "[" + coreName + "] "; logger.info(core + "<index update process> Process started at .............. " + df.format(startTime)); try{ URL url = new URL(completeUrl); HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestMethod("POST"); conn.setRequestProperty("type", "submit"); conn.setDoOutput(true); // Send HTTP POST conn.connect(); logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod()); logger.info(core + "<index update process> Succesfully connected to server\t" + server); logger.info(core + "<index update process> Using port\t\t\t" + port); logger.info(core + "<index update process> Application name\t\t\t" + webapp); logger.info(core + "<index update process> URL params\t\t\t" + params); logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL()); logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage()); logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode()); //listen for change in properties file if an error occurs if(conn.getResponseCode() != 200){ reloadParams(); } conn.disconnect(); logger.info(core + "<index update process> Disconnected from server\t\t" + server); Date endTime = new Date(); logger.info(core + "<index update process> Process ended at ................ " + df.format(endTime)); }catch(MalformedURLException mue){ logger.error("Failed to assemble URL for HTTP POST", mue); }catch(IOException ioe){ logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe); }catch(Exception e){ logger.error("Failed to send HTTP POST", e); } } public int getIntervalInt() { try{ return Integer.parseInt(interval); }catch(NumberFormatException e){ logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e); return 30; //return default in case of error } } }
- apache-solr-dataimporthandler-scheduler.jar (7.5 KB)
- 下载次数: 229
评论
5 楼
overflow_exception
2018-02-26
mark.
4 楼
tfkd丶
2016-07-15
mark.
3 楼
ethan_shan
2015-09-25
Mark.
2 楼
sunny8zhou
2014-03-18
requestHandler 是配置在solrconfig.xml里的吧
1 楼
GraysonHK
2012-12-11
能否提供编译schduler的过程,非常感谢!
发表评论
-
Solr 学习(7) —- Solr Facet
2011-12-30 11:44 35289一、Facet介绍 solr facet 是solr ... -
Solr 学习(6) —- Solr的PHP客户端
2011-12-28 20:57 13888solr查询返回只是xml格式或是json格式,并不像我们平 ... -
Solr 学习(5) —- Solr查询语法和参数
2011-12-28 20:07 697241、查询地址 建立好solr的索引后,可以通过管理界 ... -
Solr 学习(4) —- Solr数据导入 <二>SolrJ
2011-12-28 17:43 8827DIH虽然有不写程序就 ... -
Solr 学习(2) ——Solr配置
2011-12-28 13:34 8619solr配置通过两个文件,一个是solrconfig.xml ... -
Solr 学习(1) —— 搭建环境
2010-11-10 13:11 13753写在前面 2010年参加了一个全文搜索的项目,开始 ...
相关推荐
在Solr中,数据导入通常通过DataImportHandler (DIH) 完成,DIH是一个插件,负责从外部数据源(如数据库或文件系统)提取数据并将其转化为Solr可以处理的格式。而DataImportScheduler则是在DIH的基础上,增加了一个...
在Solr6版本中,DataImportHandler(DIH)是一个非常重要的特性,它允许Solr从外部数据源导入数据并建立索引。在"solr6--solr-dataimporthandler-scheduler-1.1"这个项目中,我们关注的重点是DIH的调度功能,也就是...
DataImportHandler是Apache Solr的一个特性,它允许Solr从外部数据源导入数据,如数据库、文件系统或其他Web服务。DIH的主要目的是简化索引过程,使Solr能够动态地、增量地更新其索引,而无需每次变更都完全重建索引...
完成以上步骤后,你将拥有一个能够从 MySQL 数据库导入数据的 Solr 4.9.0 集群。记得根据你的实际需求调整 Solr 配置,例如创建和配置新的核心(collections),以支持不同的索引和查询需求。同时,保持 Solr 服务器...
为了保持索引与源数据的一致性,Solr引入了DIH,这是一个内建的机制,用于从关系数据库、XML文件等外部数据源导入数据,并将其转化为Solr可以处理的索引格式。 数据导入调度器(Data Import Scheduler)是DIH的一个...
Solr MongoDB Importer 是一个非常有用的工具,它允许用户将MongoDB的数据导入到Apache Solr索引中,以便进行高效、快速的全文搜索和数据分析。这个工具的主要版本是"solr-mongo-importer-1.1",这表明它是1.1版,...
Solr使用DataImportHandler(DIH)来从关系型数据库、XML文件或其他数据源导入数据。DIH提供了一个全面的数据加载框架,支持全量导入和增量导入。 1. **全量导入**:全量导入是将所有数据从源数据库或文件一次性...
- DIH允许你从外部数据源导入数据,例如数据库。配置`data-config.xml`文件定义数据源和映射规则。 **二、Spring Boot整合Solr** 1. **依赖管理** - 在Spring Boot项目中,添加Solr的Spring Data Solr依赖,如:...
Solr 数据导入处理器(DataImportHandler,DIH)是 Apache Solr 的一个重要组件,它允许用户从各种数据源,如关系型数据库、CSV 文件等,批量导入数据到 Solr 索引中。这个功能极大地简化了数据同步和更新的过程,...
它包括一个默认的配置集,以及一个简单的数据导入处理程序(DIH)示例,帮助开发者了解如何设置和使用Solr。 5. **docs 目录**:包含了Solr的文档,包括用户手册、API参考和教程,对于学习和调试Solr非常有用。 6....
Solr DataImportHandler Scheduler 是一个用于 Apache Solr 的插件,它允许用户自动化和调度 Solr 的 DataImportHandler(DIH)过程。这个插件的版本是 1.1,包含源码,意味着用户可以根据自己的需求对代码进行修改...
`Solr DataImportHandler`(DIH)是 Solr 内置的一个功能,用于从关系型数据库或其他数据源导入数据,并建立索引。DIH 提供了全量和增量数据导入的功能,使得 Solr 能够保持与源数据的同步。`solr-dataimport-...
3. **定时任务调度**: 这个rar文件中的"solr-dataimportscheduler-1.4.jar"显然包含了一个定时任务调度器,它可以定期执行DIH的数据导入任务。这通常通过集成Java的定时框架如Quartz或者Spring的TaskScheduler实现。...
数据导入(DataImportHandler, DIH)是Solr的一项功能,允许从各种数据源(如关系型数据库、CSV文件等)导入数据,并创建或更新索引。DIH支持全量导入和增量导入,使得在数据发生变化时,Solr能够快速反映这些变化。...
Solr是一个流行的开源全文搜索引擎,而Data Import Handler是Solr的一个重要特性,允许用户从关系型数据库或其他数据源导入数据进行索引。 在4.3.0版本中,DIH的“extras”部分可能包含了扩展和增强功能,比如支持...
总的来说,这个Demo涵盖了Solr 7.4与MySQL的集成、数据导入和IK分词器的使用,帮助你搭建一个能够处理中文数据的全文搜索引擎。通过实践,你可以更好地理解Solr的数据处理流程以及如何优化中文搜索体验。
DIH允许Solr从各种数据源,如关系型数据库MySQL,导入数据。 在标题提到的"solr定时自动同步数据库需要用到的apache-solr-dataimportscheduler.jar包"中,`apache-solr-dataimportscheduler.jar`是用于实现Solr数据...
`solr-dataimporthandler-6.5.1.jar` 是Solr DataImportHandler的核心库,用于从外部数据源如数据库或文件系统导入数据。DataImportHandler(DIH)是一个强大的工具,允许Solr与各种数据源进行交互,将这些数据转换...
4. `example`目录:提供了一个简单的Solr示例,包括一个预配置的Solr实例,可以快速启动并开始使用Solr。 5. `docs`目录:包含了Solr的文档,包括用户手册、API参考等。 6. `contrib`目录:包含了一些社区贡献的模块...