`

Solr 学习(3) —-Solr 数据导入 <一>DIH简单使用

    博客分类:
  • Solr
 
阅读更多

使用DataImportHandler进行简单数据导入还是比较有效的,特别是DIH中针对简单的数据库表,可以把完全导入和增量导入合并成一个语句,非常方便。我的使用方式如下所示

1。配置schema

 

 

<requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler">
    <lst name="defaults">
      <str name="config">/home/tomcat/bin/solr/conf/data-config.xml</str>
    </lst>
  </requestHandler>
 

 

2.添加data-config文件

data-config.xml

 

 

<dataConfig>
  <dataSource type="JdbcDataSource" 
              driver="com.mysql.jdbc.Driver"
              url="jdbc:mysql://127.0.0.1/db" 
              user="root" 
              password="pass"
              batchSize="-1"/>
  <document>
	<entity name="id" pk="id"  
		    query="select id,username,text,cat  from hot where '${dataimporter.request.clean}' != 'false' OR timestamp > '${dataimporter.last_index_time}'">
		 <field column="id" name="id"/>
		 <field column="text" name="text"/>
		 <field column="username" name="username_s"/>
 		 <field column="cat" name="cat_t"/>
	</entity>
  </document>
</dataConfig>

 

3.让DIH周期性的运行

修改dataimport.properties文件,这个是自动生成的,同在solr/conf下,添加参数

interval 间隔时间 单位 分钟

syncEnabled=1 打开周期运行

params 其实就是具体调用的url,周期运行就是周期性的访问一个url

 

 

#Wed Dec 28 09:29:42 UTC 2011
port=8983
interval=5
last_index_time=2011-12-28 09\:29\:26
syncEnabled=1
webapp=solr
id.last_index_time=2011-12-28 09\:29\:26
server=127.0.0.1
params=/select?qt\=/dataimport&command\=full-import&clean\=false&commit\=true&optimize\=false
 

 

到此还并不能周期运行,在solr的wiki中有一段实现这个功能的代码,但并没有加入到solr的发行包中,于是我们需要重新编译这段代码,打包放到webapp/solr/WEB-INF/lib中才行

 

<web-app>
   <listener>
       <listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class>
  </listener>
  ...
</web-app>
 

 

 

以下是solr wiki上周期运行的代码,我已打好包,放在附件里。

 

 

package org.apache.solr.handler.dataimport.scheduler;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;

import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolrDataImportProperties {
        private Properties properties;

        public static final String SYNC_ENABLED         = "syncEnabled";
        public static final String SYNC_CORES           = "syncCores";
        public static final String SERVER               = "server";
        public static final String PORT                 = "port";
        public static final String WEBAPP               = "webapp";
        public static final String PARAMS               = "params";
        public static final String INTERVAL             = "interval";

        private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class);

        public SolrDataImportProperties(){
//              loadProperties(true);
        }

        public void loadProperties(boolean force){
                try{
                        SolrResourceLoader loader = new SolrResourceLoader(null);
                        logger.info("Instance dir = " + loader.getInstanceDir());

                        String configDir = loader.getConfigDir();
                        configDir = SolrResourceLoader.normalizeDir(configDir);
                        if(force || properties == null){
                                properties = new Properties();

                                String dataImportPropertiesPath = configDir + "\\dataimport.properties";

                                FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
                                properties.load(fis);
                        }
                }catch(FileNotFoundException fnfe){
                        logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
                }catch(IOException ioe){
                        logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);
                }catch(Exception e){
                        logger.error("Error loading DataImportScheduler properties", e);
                }
        }

        public String getProperty(String key){
                return properties.getProperty(key);
        }
}
 

 

 

 

package org.apache.solr.handler.dataimport.scheduler;

import java.util.Calendar;
import java.util.Date;
import java.util.Timer;

import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApplicationListener implements ServletContextListener {

        private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class);

        @Override
        public void contextDestroyed(ServletContextEvent servletContextEvent) {
                ServletContext servletContext = servletContextEvent.getServletContext();

                // get our timer from the context
                Timer timer = (Timer)servletContext.getAttribute("timer");

                // cancel all active tasks in the timers queue
                if (timer != null)
                        timer.cancel();

                // remove the timer from the context
                servletContext.removeAttribute("timer");

        }

        @Override
        public void contextInitialized(ServletContextEvent servletContextEvent) {
                ServletContext servletContext = servletContextEvent.getServletContext();
                try{
                        // create the timer and timer task objects
                        Timer timer = new Timer();
                        HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer);

                        // get our interval from HTTPPostScheduler
                        int interval = task.getIntervalInt();

                        // get a calendar to set the start time (first run)
                        Calendar calendar = Calendar.getInstance();

                        // set the first run to now + interval (to avoid fireing while the app/server is starting)
                        calendar.add(Calendar.MINUTE, interval);
                        Date startTime = calendar.getTime();

                        // schedule the task
                        timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);

                        // save the timer in context
                        servletContext.setAttribute("timer", timer);

                } catch (Exception e) {
                        if(e.getMessage().endsWith("disabled")){
                                logger.info("Schedule disabled");
                        }else{
                                logger.error("Problem initializing the scheduled task: ", e);
                        }
                }
        }

}
 

 

package org.apache.solr.handler.dataimport.scheduler;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class HTTPPostScheduler extends TimerTask {
        private String syncEnabled;
        private String[] syncCores;
        private String server;
        private String port;
        private String webapp;
        private String params;
        private String interval;
        private String cores;
        private SolrDataImportProperties p;
        private boolean singleCore;

        private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class);

        public HTTPPostScheduler(String webAppName, Timer t) throws Exception{
                //load properties from global dataimport.properties
                p = new SolrDataImportProperties();
                reloadParams();
                fixParams(webAppName);

                if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled");

                if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){
                        singleCore = true;
                        logger.info("<index update process> Single core identified in dataimport.properties");
                }else{
                        singleCore = false;
                        logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: " + cores);
                }
        }

        private void reloadParams(){
                p.loadProperties(true);
                syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED);
                cores           = p.getProperty(SolrDataImportProperties.SYNC_CORES);
                server          = p.getProperty(SolrDataImportProperties.SERVER);
                port            = p.getProperty(SolrDataImportProperties.PORT);
                webapp          = p.getProperty(SolrDataImportProperties.WEBAPP);
                params          = p.getProperty(SolrDataImportProperties.PARAMS);
                interval        = p.getProperty(SolrDataImportProperties.INTERVAL);
                syncCores       = cores != null ? cores.split(",") : null;
        }

        private void fixParams(String webAppName){
                if(server == null || server.isEmpty())  server = "localhost";
                if(port == null || port.isEmpty())              port = "8080";
                if(webapp == null || webapp.isEmpty())  webapp = webAppName;
                if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = "30";
        }

        public void run() {
                try{
                        // check mandatory params
                        if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){
                                logger.warn("<index update process> Insuficient info provided for data import");
                                logger.info("<index update process> Reloading global dataimport.properties");
                                reloadParams();

                        // single-core
                        }else if(singleCore){
                                prepUrlSendHttpPost();

                        // multi-core
                        }else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){
                                logger.warn("<index update process> No cores scheduled for data import");
                                logger.info("<index update process> Reloading global dataimport.properties");
                                reloadParams();

                        }else{
                                for(String core : syncCores){
                                        prepUrlSendHttpPost(core);
                                }
                        }
                }catch(Exception e){
                        logger.error("Failed to prepare for sendHttpPost", e);
                        reloadParams();
                }
        }


        private void prepUrlSendHttpPost(){
                String coreUrl = "http://" + server + ":" + port + "/" + webapp + params;
                sendHttpPost(coreUrl, null);
        }

        private void prepUrlSendHttpPost(String coreName){
                String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params;
                sendHttpPost(coreUrl, coreName);
        }


        private void sendHttpPost(String completeUrl, String coreName){
                DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
                Date startTime = new Date();

                // prepare the core var
                String core = coreName == null ? "" : "[" + coreName + "] ";

                logger.info(core + "<index update process> Process started at .............. " + df.format(startTime));

                try{

                    URL url = new URL(completeUrl);
                    HttpURLConnection conn = (HttpURLConnection)url.openConnection();

                    conn.setRequestMethod("POST");
                    conn.setRequestProperty("type", "submit");
                    conn.setDoOutput(true);

                        // Send HTTP POST
                    conn.connect();

                    logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod());
                    logger.info(core + "<index update process> Succesfully connected to server\t" + server);
                    logger.info(core + "<index update process> Using port\t\t\t" + port);
                    logger.info(core + "<index update process> Application name\t\t\t" + webapp);
                    logger.info(core + "<index update process> URL params\t\t\t" + params);
                    logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL());
                    logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage());
                    logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode());

                    //listen for change in properties file if an error occurs
                    if(conn.getResponseCode() != 200){
                        reloadParams();
                    }

                    conn.disconnect();
                    logger.info(core + "<index update process> Disconnected from server\t\t" + server);
                    Date endTime = new Date();
                    logger.info(core + "<index update process> Process ended at ................ " + df.format(endTime));
                }catch(MalformedURLException mue){
                        logger.error("Failed to assemble URL for HTTP POST", mue);
                }catch(IOException ioe){
                        logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);
                }catch(Exception e){
                        logger.error("Failed to send HTTP POST", e);
                }
        }

        public int getIntervalInt() {
                try{
                        return Integer.parseInt(interval);
                }catch(NumberFormatException e){
                        logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);
                        return 30; //return default in case of error
                }
        }
}

 

 

 

分享到:
评论
5 楼 overflow_exception 2018-02-26  
mark.
4 楼 tfkd丶 2016-07-15  
mark.
3 楼 ethan_shan 2015-09-25  
Mark.
2 楼 sunny8zhou 2014-03-18  
requestHandler 是配置在solrconfig.xml里的吧
1 楼 GraysonHK 2012-12-11  
能否提供编译schduler的过程,非常感谢!

相关推荐

    apache-solr-dataimportscheduler-1.0.zip_official54l_solr 5.x定时生成

    在Solr中,数据导入通常通过DataImportHandler (DIH) 完成,DIH是一个插件,负责从外部数据源(如数据库或文件系统)提取数据并将其转化为Solr可以处理的格式。而DataImportScheduler则是在DIH的基础上,增加了一个...

    solr6--solr-dataimporthandler-scheduler-1.1

    在Solr6版本中,DataImportHandler(DIH)是一个非常重要的特性,它允许Solr从外部数据源导入数据并建立索引。在"solr6--solr-dataimporthandler-scheduler-1.1"这个项目中,我们关注的重点是DIH的调度功能,也就是...

    apache-solr-dataimporthandler-extras-1.4.0.jar.zip

    DataImportHandler是Apache Solr的一个特性,它允许Solr从外部数据源导入数据,如数据库、文件系统或其他Web服务。DIH的主要目的是简化索引过程,使Solr能够动态地、增量地更新其索引,而无需每次变更都完全重建索引...

    solr-4.9.0-安装部署文档

    完成以上步骤后,你将拥有一个能够从 MySQL 数据库导入数据的 Solr 4.9.0 集群。记得根据你的实际需求调整 Solr 配置,例如创建和配置新的核心(collections),以支持不同的索引和查询需求。同时,保持 Solr 服务器...

    solr-dataimport-scheduler.jar 可使用于solr7.x版本

    为了保持索引与源数据的一致性,Solr引入了DIH,这是一个内建的机制,用于从关系数据库、XML文件等外部数据源导入数据,并将其转化为Solr可以处理的索引格式。 数据导入调度器(Data Import Scheduler)是DIH的一个...

    solr-mongo-importer-1.1

    Solr MongoDB Importer 是一个非常有用的工具,它允许用户将MongoDB的数据导入到Apache Solr索引中,以便进行高效、快速的全文搜索和数据分析。这个工具的主要版本是"solr-mongo-importer-1.1",这表明它是1.1版,...

    solr增量更新架包apache-solr-dataimportscheduler.jar

    Solr使用DataImportHandler(DIH)来从关系型数据库、XML文件或其他数据源导入数据。DIH提供了一个全面的数据加载框架,支持全量导入和增量导入。 1. **全量导入**:全量导入是将所有数据从源数据库或文件一次性...

    solr-7.7.3配置详解,跟springboot整合 (二)

    - DIH允许你从外部数据源导入数据,例如数据库。配置`data-config.xml`文件定义数据源和映射规则。 **二、Spring Boot整合Solr** 1. **依赖管理** - 在Spring Boot项目中,添加Solr的Spring Data Solr依赖,如:...

    solr-dataimporthandler的jar包

    Solr 数据导入处理器(DataImportHandler,DIH)是 Apache Solr 的一个重要组件,它允许用户从各种数据源,如关系型数据库、CSV 文件等,批量导入数据到 Solr 索引中。这个功能极大地简化了数据同步和更新的过程,...

    Apache Solr(solr-8.11.1.tgz)

    它包括一个默认的配置集,以及一个简单的数据导入处理程序(DIH)示例,帮助开发者了解如何设置和使用Solr。 5. **docs 目录**:包含了Solr的文档,包括用户手册、API参考和教程,对于学习和调试Solr非常有用。 6....

    solr-dataimporthandler-scheduler-1.1

    Solr DataImportHandler Scheduler 是一个用于 Apache Solr 的插件,它允许用户自动化和调度 Solr 的 DataImportHandler(DIH)过程。这个插件的版本是 1.1,包含源码,意味着用户可以根据自己的需求对代码进行修改...

    支持solr6.1-solr-dataimport-scheduler-1.2.jar

    `Solr DataImportHandler`(DIH)是 Solr 内置的一个功能,用于从关系型数据库或其他数据源导入数据,并建立索引。DIH 提供了全量和增量数据导入的功能,使得 Solr 能够保持与源数据的同步。`solr-dataimport-...

    solr-dataimportscheduler-1.4.jar 增量定时同步数据到solr.rar

    3. **定时任务调度**: 这个rar文件中的"solr-dataimportscheduler-1.4.jar"显然包含了一个定时任务调度器,它可以定期执行DIH的数据导入任务。这通常通过集成Java的定时框架如Quartz或者Spring的TaskScheduler实现。...

    solr-dataimport-scheduler

    数据导入(DataImportHandler, DIH)是Solr的一项功能,允许从各种数据源(如关系型数据库、CSV文件等)导入数据,并创建或更新索引。DIH支持全量导入和增量导入,使得在数据发生变化时,Solr能够快速反映这些变化。...

    solr-dataimporthandler-extras-4.3.0.zip

    Solr是一个流行的开源全文搜索引擎,而Data Import Handler是Solr的一个重要特性,允许用户从关系型数据库或其他数据源导入数据进行索引。 在4.3.0版本中,DIH的“extras”部分可能包含了扩展和增强功能,比如支持...

    solr7.4数据库导入Demo(mysql数据库)

    总的来说,这个Demo涵盖了Solr 7.4与MySQL的集成、数据导入和IK分词器的使用,帮助你搭建一个能够处理中文数据的全文搜索引擎。通过实践,你可以更好地理解Solr的数据处理流程以及如何优化中文搜索体验。

    solr定时自动同步数据库需要用到的apache-solr-dataimportscheduler.jar包

    DIH允许Solr从各种数据源,如关系型数据库MySQL,导入数据。 在标题提到的"solr定时自动同步数据库需要用到的apache-solr-dataimportscheduler.jar包"中,`apache-solr-dataimportscheduler.jar`是用于实现Solr数据...

    solr6.5.1定时增量apache-solr-dataimportscheduler

    `solr-dataimporthandler-6.5.1.jar` 是Solr DataImportHandler的核心库,用于从外部数据源如数据库或文件系统导入数据。DataImportHandler(DIH)是一个强大的工具,允许Solr与各种数据源进行交互,将这些数据转换...

    solr-7.4.0.zip

    4. `example`目录:提供了一个简单的Solr示例,包括一个预配置的Solr实例,可以快速启动并开始使用Solr。 5. `docs`目录:包含了Solr的文档,包括用户手册、API参考等。 6. `contrib`目录:包含了一些社区贡献的模块...

Global site tag (gtag.js) - Google Analytics