`
y806839048
  • 浏览: 1133932 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

多线程webservie处理大量数据

阅读更多

因工作原因,需要将一个表(tbA)中的所有数据,根据user_id,去请求webserive获取相关的数据,然后插入到另外的一张表(tbB)中,供他人使用。不过这个表中的数据不少有78万条左右,而这样的大批量数据操作,还不能白天执行。只能在夜里,等服务器负荷低的时候进行执行。考虑如果webservice的效率不高的时候,需要对数据进行分批执行操作。经过综合考虑,最后采用多线程技术(不过最后经过测试,效率还是不错78万条数据,使用10个线程操作,8个多小时就可以了)。

    首先在tbA表中,追加一个字段,deal_tag,这个字段为处理标志字段。该字段的数据默认为'0',如果需要分批处理,则可以设计部分数据中的该字段值为'1'。这个时候就可以读取了。考虑到服务器的负荷问题,采用的方案是读取一部分数据到缓冲区中,同时更改已经进入缓冲区中待处理的数据的deal_tag字段值为‘B’,标识该数据已经进入缓冲区中。对数据的处理,需要请求webservice。也就有可能是因为因为种种原因,webservice没有正常的返回数据,或者出现异常,而为了不让异常影响数据的处理,则每次处理完一条数据,将该数据放入另外的缓冲区中,如果请求WebService成功,标识数据的字段为'F',请求成功。如果因为WebService或者其他的原因导致失败,则标志该数据字段的deal_tag字段值为'E',等待下一次的调用调用处理。另外为了不增加数据库的负荷,等到缓冲区中的数据达到一定的数据,一次提交。而不采用处理一条数据,提交一条数据,这样效率太慢。核心代码如下(因不想透漏具体的表名,采用tbA,tbB,tbC代替):

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;



import com.epro.DBUtility.CommonHelper;

import com.epro.DBUtility.DBHelper;

import com.epro.DBUtility.BatchData;

public class FetchData implements Runnable {

   

    private int READ_COUNT = 500; //一次读取数据库多少条记录到缓冲区中


    private int FLASH_BUFFER_COUNT = 2000; //处理后的数据,多少条进行提交到数据库



    private List<UserInfo> userList = null;

    private List<String> userStateSqlList = null;

    private List<BatchData> commitDataList = null;

   

    public FetchData(){

        userList = Collections.synchronizedList(new ArrayList<UserInfo>());

        userStateSqlList = new ArrayList<String>();

        commitDataList = new ArrayList<BatchData>();

    }

   

    //读取数据



    private synchronized int readData(){

        int result = 0;

        String strSql = "SELECT user_id, serial_number, cust_name, eparchy_code, " +

            "detail_install_address, link_phone, service_code, cust_type, rate, " +

            "in_date, deal_tag " +

            "FROM tbA " +

            "WHERE deal_tag in ('1','E') AND rownum <= " + READ_COUNT;
      

        ResultSet rs = null;

        DBHelper db = new DBHelper();       
        try {
            rs = db.executeQuery(strSql);

            while(rs.next()){

                UserInfo user = new UserInfo();
                user.setUserId(rs.getString("user_id"));
                user.setSerialNumber(rs.getString("serial_number"));

                user.setCustName(rs.getString("cust_name"));

                user.setEparchyCode(rs.getString("eparchy_code"));

                user.setDetailInstallAddress(rs.getString("detail_install_address"));

                user.setLinkPhone(rs.getString("link_phone"));

                user.setServiceCode(rs.getString("service_code"));

                user.setCustType(rs.getString("cust_type"));

                user.setRate(rs.getString("rate"));

                user.setInDate(rs.getString("in_date"));
                user.setDealTag(rs.getString("deal_tag"));

               
                userList.add(user); //放入队列中               



                ++result;

            }

        } catch (SQLException e) {

            e.printStackTrace();

        }finally{

            try {

                rs.close();

            } catch (SQLException e) {

                // TODO Auto-generated catch block



                e.printStackTrace();
            }
            finally{
                rs = null;

            }

            db.Close();

        }

        userIntoBuffer(); //将查询出来的数据,更改标志位为'B'



        return result;

    }

   

    private void userIntoBuffer(){

        int buffersize = this.userList.size();
        String sql = "";
        String userID = "";

        List<String> sqlList = new ArrayList<String>();

        if(buffersize > 0){

            DBHelper db = new DBHelper();   
            for(int i=0; i<buffersize; i++){
                userID = userList.get(i).getUserId();
                sql = this.getUserStateSql(userID, "B");
                sqlList.add(sql);               

            }
            db.doBatch(sqlList); //批量进行数据处理

            db.Close();

        }
    }
  
    private String getUserStateSql(String userID, String dealTag){
        String result = "";

        result = "UPDATE tbA SET deal_tag = '" + dealTag +
            "' WHERE user_id = " + userID;

       
        return result;
    }

   

    public synchronized UserInfo getUserInfo(){

        UserInfo result = null;

        if(userList.size() > 0){
           result = userList.remove(0);
        }

        return result;

    }
   
    //开始进行数据的处理


    public void beginDealData(){

        ResInfo resInfo = null;

        while(true){

           UserInfo user = this.getUserInfo();
            if(user != null && !"".equals(user.getUserId().trim())){ //当没有数据时,跳出循环

               //进行WebService请求,获取生产环境的资源数据,对数据进行解析,返回资源对象

                resInfo = getResInfo(user.getUserId());
            }
            else if(user == null){
                break;
            }
            else if(user != null && "".equals(user.getUserId().trim())){
                resInfo = new ResInfo();
                resInfo.out_err_id = "-1";
               resInfo.ln_line_flag = "0";
                resInfo.out_err_msg = "beginDealData is null.";
                System.out.println("user id is null.");
            }

            else{
               ;
            }
            userFinish(user, resInfo); //将用户的数据和资源的数据进行合并操作


        }
        int datacount = readData(); //读取表,看是否还有要处理的数据


       if(datacount > 0){

           beginDealData(); //递归调用,对需要处理的数据,进行继续读取


        }
        this.flashBuffer(); //做最后一次的提交处理后的buffer操作

        System.out.println("finish at " + CommonHelper.getCurrentDateTime());
    }
   
    public ResInfo getResInfo(String productID){
        ResInfoReader myReader = new ResInfoReader();
        return myReader.beginRequest(productID);
    }
   
    //获取地区名称


    public String getAreaName(String areaCode){

        String result = "";

        String strSql = "SELECT tb1.area_name area_name from " +

       "tbC tb1 " +

        "where tb1.area_code = ?";
        String[] param = { areaCode };
        ResultSet rs = null;
        DBHelper db = new DBHelper();
        rs = db.executeQuery(strSql, param);
        try {
            while(rs.next()){
                result = rs.getString("area_name");
            }
        } catch (SQLException e) {

            // TODO Auto-generated catch block

           e.printStackTrace();

       } finally{
            try {
               rs.close();
            } catch (SQLException e) {
                // TODO Auto-generated catch block

                e.printStackTrace();
            }
            db.Close();

        }
       
        return result;       
    }
   
    private synchronized int userFinish(UserInfo user, ResInfo resInfo){
        int result = 0;
        String strSql = "";
        String strAreaName = ""; //地区名称

        String[] param = null;
        if("".equals(user.getServiceCode())){
            strAreaName = this.getAreaName(user.getEparchyCode());
        }
        else{
            strAreaName = this.getAreaName(user.getServiceCode());
        }
        strSql = "INSERT INTO tbB(USER_ID, SERIAL_NUMBER, " +
            "CUST_NAME, DEAL_TYPE, EPARCHY_CODE, DETAIL_INSTALL_ADDRESS, LINK_PHONE, " +
            "EXCH_ID, LINEBOX_SEQ, LINEBOX, LINEBOX_ADDR, ROW_ID, COL_ID, VCOL_SEQ, " +
            "PCABLE, PCABLE_SEQ, OCABLE, OCABLE_SEQ, FPCONNECT_ROW_ID, FPCONNECT_COL_ID, " +
            "FPCONNECT_SEQ, FCONNECT, FCONNECT_ADDR, FCONNECT_NAME, FOCONNECT_ROW_ID, " +
            "FOCONNECT_COL_ID, FOCONNECT_SEQ, SPCONNECT_ROW_ID, SPCONNECT_COL_ID, " +
            "SPCONNECT_SEQ, SCONNECT, SCONNECT_ADDR, SCONNECT_NAME, SOCONNECT_ROW_ID, " +
            "SOCONNECT_COL_ID, SOCONNECT_SEQ, LN_FAC_CODE, LS_SWITCH_ID, LN_SWITCH_NAME, " +
            "LS_SWITCH_MODE, LN_FAC_TYPE, LS_NODE_CODE, LS_NODE_ID, LS_MAC_CODE, LS_FRAME_CODE, " +
            "LN_DHCOL_CODE, LS_FRAME_ID, LS_SLOT_ID, LS_PORT_ID, LN_DHCOL_SEQ, LN_LINE_ID, " +
            "LN_LINE_FLAG, RELA_PRODUCT_NO, MEASURE_NAME, MEASURE_CODE, LINE_TYPE, REMOVE_FLAG, " +
            "IN_TIME, OUT_TIME, EXCH_NAME, TRADE_ID, CUST_TYPE, RATE) VALUES(?, ?, ?, ?, ?, ?, " +
            "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
            "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, sysdate, null, " +
            "?, ?, ?, ?)";
        param = new String[]{
                user.getUserId(),
                user.getSerialNumber(),
                user.getCustName(),
                "",
                user.getEparchyCode(),
                user.getDetailInstallAddress(),
                user.getLinkPhone(),
                user.getServiceCode(),
                resInfo.linebox_seq,
                resInfo.linebox,
                resInfo.linebox_addr,
                resInfo.row_id,
                resInfo.col_id,
                resInfo.vcol_seq,
                resInfo.pcable,
                resInfo.pcable_seq,
                resInfo.ocable,
                resInfo.ocable_seq,
                resInfo.fpconnect_row_id,
                resInfo.fpconnect_col_id,
                resInfo.fpconnect_seq,
                resInfo.fconnect,
                resInfo.fconnect_addr,
                resInfo.fconnect_name,
                resInfo.foconnect_row_id,
                resInfo.foconnect_col_id,
                resInfo.foconnect_seq,
                resInfo.fpconnect_row_id,
                resInfo.fpconnect_col_id,
                resInfo.fpconnect_seq,
                resInfo.sconnect,
                resInfo.sconnect_addr,
                resInfo.sconnect_name,
                resInfo.soconnect_row_id,
                resInfo.soconnect_col_id,
                resInfo.soconnect_seq,
                resInfo.ln_fac_code,
                resInfo.ls_switch_id,
                resInfo.ln_switch_name,
                resInfo.ls_switch_mode,
                resInfo.ln_fac_type,
                resInfo.ls_node_code,
                resInfo.ls_node_id,
                resInfo.ls_mac_code,
                resInfo.ls_frame_code,
                resInfo.ln_dhcol_code,
                resInfo.ls_frame_id,
                resInfo.ls_slot_id,
                resInfo.ls_port_id,
                resInfo.ln_dhcol_seq,
                resInfo.ln_line_id,
                resInfo.ln_line_flag,
                resInfo.rela_product_no,
                resInfo.measure_name,
                resInfo.measure_code,
                resInfo.line_type,
               "0",
                strAreaName,
                "",
                user.getCustType(),
                user.getRate()   
        };       

        if("0".equals(resInfo.out_err_id)){
            BatchData data = new BatchData(strSql, param);
            commitDataList.add(data); //提交处理后的数据,到缓冲区

           
            result = 1;
            strSql = this.getUserStateSql(user.getUserId(), "F");
        }
        else{
            //当调用webservice后,不能正常的获取数据,则修改标志位为 'E'

            strSql = this.getUserStateSql(user.getUserId(), "E");
       }
        userStateSqlList.add(strSql); //提交用户状态数据到缓冲区
        checkBuffer();
       //System.out.println("deal user id: " + user.getUserId() + " at " + CommonHelper.getCurrentDateTime());

      return result;
    }
   
    private void checkBuffer(){

        if(userStateSqlList.size() >= FLASH_BUFFER_COUNT){
            this.flashBuffer();
        }
    }
   
    private synchronized void flashBuffer(){
        List<String> stateList = new ArrayList<String>();
        List<BatchData> commitList = new ArrayList<BatchData>();
        DBHelper db = new DBHelper();
       
        while(commitDataList.size()>0){
            BatchData data112 = commitDataList.remove(0);
            commitList.add(data112);
        }
        if(commitList.size() > 0){
            db.doBatchData(commitList);
        }
       
        while(userStateSqlList.size() > 0){
            stateList.add(userStateSqlList.remove(0));
        }
        if(stateList.size() > 0){
            db.doBatch(stateList);
        }
       
        db.Close();
        System.out.println("flash buffer at " + CommonHelper.getCurrentDateTime());
    }

    public void run() {       
        this.beginDealData();       
    }
}



其中webservice的操作,其中需要jar包,请下载apache.axis,XStream包,并加入到库中。其中axis主要是用来进行webServie请求用,而XStream主要使用将返回的XML数据,翻译为java对象。

    因为是java project项目,因此少不了main函数了。代码如下:


public class Shell {   

public static void main(String[] args){

  boolean isthread = true;

       FetchData fetch = new FetchData();   

       if(isthread){           

           new Thread(fetch).start();
           new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
        }else{
            ResInfo resInfo = fetch.getResInfo("productID");
            System.out.println("out_err_id: " + resInfo.out_err_id);
            System.out.println("out_err_msg: " + resInfo.out_err_msg);

            System.out.println("linebox: " + resInfo.linebox);
            System.out.println("linebox_addr: " + resInfo.linebox_addr);
            System.out.println("pcable: " + resInfo.pcable);
            System.out.println("ocable: " + resInfo.ocable);
        }

25.    }

26.}

分享到:
评论

相关推荐

    WebServie 加减乘除

    Web服务(WebService)是一种基于互联网的标准方式,允许应用程序之间进行交互和数据交换。它使用XML(可扩展标记语言)作为数据格式,并...在实际项目中,还需要考虑更多的细节,如错误处理、事务管理、服务治理等。

    WebServie创建部署调用

    4. **异常处理**:Web服务调用可能会抛出异常,需要捕获并适当地处理,例如网络错误、服务不可用或数据验证失败等。 ### 四、源码与工具 在实际开发中,源码和工具的使用至关重要。源码可以帮助理解服务的内部工作...

    webservie开发技术

    2. 易于集成:通过XML进行数据交换,易于解析和处理。 3. 可扩展性:支持多种安全机制(如SSL/TLS,WS-Security)和事务处理(如WS-AtomicTransaction)。 五、现代WebService发展趋势 1. RESTful API:相比于...

    JAVA WebServie Client.docx

    1. **异常处理**:在调用WebService时可能会遇到网络问题、SOAP错误等异常情况,因此需要对这些情况进行适当的异常处理。 2. **安全性考虑**:在实际应用中,还需要考虑数据加密、身份验证等安全措施,以保护敏感...

    axis1.4 部署解析webservie

    1.TOMCAT+AXIS的安装配置 首先机子上应该安装JDK1.5版本以上(带有XML解析包)。我这里是1.5.0.6 ...下载版本要在4.0以上。这里用的是5.0版本。...以下用TOMCAT_HOME表示TOMCAT的安装主目录。 解压缩AXIS软件包(这里为1.4...

    webservice实例 CXF的JAXWS和JAXRS实现 及JAXB标准接口实现带jar包

    CXF集成了JAXB,使得在处理XML消息时,无需编写大量的转换代码。 在提供的压缩包“ws_test”中,很可能包含了使用CXF实现的Web服务示例,包括JAX-WS和JAX-RS的服务端点,以及使用JAXB进行数据交换的类。这个实例...

    测试调用webservice接口客户端所开发的接口示例

    - 处理响应:接收服务返回的数据,可能需要对结果进行解析或验证。 7. 常见问题与解决策略: - 连接问题:检查网络设置,确保能够访问到Web服务的URL。 - 编码问题:确认数据编码与服务端匹配,避免乱码。 - ...

    webservice入门学习代码笔记

    SOAP消息通常包含三部分:Header(头部,用于提供处理消息的附加信息)、Body(主体,包含实际的业务数据)和Envelope(封装整个消息)。 2. **WSDL(Web Services Description Language)**:WSDL是一种XML格式,...

    《论企业应用集成的主要技术》软考论文【文字可复制】

    应用集成,在业务逻辑上对应用系统进行集成,是黑盒集成,常用技术有WebServie、CSB、ESB;业务流程集成,由一系列基于标准的、统一数据格式的工作流组成,常见技术就是WFMS(工作流管理系统)。本文主要着重从界面...

    webservice的完整示例demo

    SpringBoot内置了Tomcat服务器,并自动配置了许多Spring组件,减少了大量配置工作。 CXF是Apache项目中的一个开源服务框架,它支持多种Web服务标准,包括SOAP和RESTful API。CXF使得在SpringBoot应用中实现和调用...

    WebService客户端调用WebService服务示例代码——java

    WebService客户端调用WebService服务示例代码,java代码,纯手工,包括直接httpClient直接发送Saop报文调用和利用wsimport -keep 生成客户端代码后调用两种方式的示例。因涉及商务账户密码隐私,因此代码中的有些账号...

    MyEclipse开发WebService教程

    Web Service是一种基于标准的、跨平台的、可互操作的服务,它允许不同系统之间的数据交换。MyEclipse作为Java EE开发工具,提供了方便的Web Service开发支持。 首先,我们从创建一个Web Service工程开始。在...

    WebServices基础培训视频与源码

    资源名称:WebServices基础培训视频与源码资源目录:【】1....使用xfire开发webServie操作不同的数据类型【】13.使用xfire与Spring整合开发webServie.part 资源太大,传百度网盘了,链接在附件中,有需要的同学自取。

    kettle web service xml

    kettle读取web service 分析并插入数据库

    maximo6-JAVA调用WEBSERVICE

    WEBSERVICE是一种基于网络的、松散耦合的服务提供和消费方式,它允许不同平台的应用程序之间进行数据交换。在JAVA中,通常使用SOAP(Simple Object Access Protocol)协议和WSDL(Web Services Description Language...

    这是链接SOAP的所需要的jar包

    使用webservie所用到的jar包,方便使用webservice类

    mvn+spring+mybatis+java6ws

    综上所述,这个项目使用了现代化的Java Web开发技术,通过Maven构建和管理,Spring提供业务逻辑和架构,MyBatis处理数据访问,而Java 6的WebService接口则向外提供服务。"ui1"文件夹可能包含项目的前端部分,负责...

    NC65开发webservice

    UPM文件中包含了接口和实现类的路径,以及扩展类`nc.uap.ws.deploy.OxbWSExtensionProcessor`,该扩展类用于处理WebService的部署。 4. **生成WSDL文件**: 生成WSDL(Web Service Description Language)文件是...

    webServiceDemo.zip

    工作中应用的webservice服务,一直以来做webservie的客户端比较多,这次领导让做webservice的服务端,网上找了很多的方案,如axis,jws和cxf,比较了一下,觉得cxf的webservice的服务是最好的,所以搞了个demo,以供...

    jaxws ri 2.2.7包webservice所需要全部包

    webservie发布所需要的包 @WebService public class JwsServiceHello { public String greeting(String name){ return "Welcome " + name; }

Global site tag (gtag.js) - Google Analytics