`
baobeituping
  • 浏览: 1068676 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

多线程并发处理数据的问题

阅读更多

在现在的项目中遇到的一个问题。

我所做的短信平台,原来只是单线程发送短信的,但是由于公司的应用范围的扩大,短信的发送量成倍的增多,一批插入的短信量达到5W数据,如果按照以前的方式,发送过程十分缓慢。因为我们所用的第三方短信提供商只提供给我们10个并发的限制,所以我们采用10条线程进行读取。一次发10条,等10条发送完成以后再发送另外10条。

以下是程序:

 

private  void sendSmsLoop() throws Exception {
   
  
  for (int i=0; i< threads.length; i++){
   threads[i] = null;
  }  
  String sql =""
   +"select top 10 "
   +"smsSend.id "
   +",smsSend.phoneNumber "
   +",smsSend.content "
   +",smsSend.subcode  "
   +",smsSend.clientID  "
   +",smsSend.SID  "
   +"from  "
   +"smsSend "
   +"where "
   +"(smsSend.handleFlag = 0 "
   +"and datediff(minute, smsSend.registerDate, getDate()) < 30 "
   +"and left(smsSend.phoneNumber,3)  in ('130','131','132','155','156','186','134','135','136','137','138','139','150','151','152','154','157','158','159','187','188'))  or (ClientID='1' and sendDate is NULL)";
   
   //System.out.println(sql);
   Stmt = DBConn.createStatement();
   Rs = Stmt.executeQuery(sql);
   int inc = 0;
   while(Rs.next())
   {
    String ID = Rs.getString("id");
    String phoneNumber = Rs.getString("phoneNumber");//手机号
    String content = StrUtil.replaceIllegalString(Rs.getString("content")); //信息
    String subcode = Rs.getString("subcode");
    String clientID = Rs.getString("clientID");
    String sid = Rs.getString("SID");
   //在这里开始开启线程,因为我所选的是TOP10,所以一次可以启动10条线程
    SmsThread st = new SmsThread(ID,phoneNumber,content,subcode,clientID,sid);
    threads[inc] = st;
    threads[inc].start();
    inc++;
   }
   
   //循环判断10个线程的FLAG信息,判断FLAG必须都为TRUE了。才可以进行下一轮扫描。【判断为TRUE的结果就是只要该线程执行完毕后FLAG就赋值为TRUE】
   while(true)
   {
    boolean _flag = false;
    for (int i=0; i< threads.length; i++){
     if (threads[i] != null){
      SmsThread st = (SmsThread) threads[i];
      if(!st.flag)
      {
       _flag = true;
        break;
      }
     }
    }
    if (_flag){
     Thread.sleep(1000);
    }else{
     break;
    }
   }
   
   
 }
 class SmsThread extends Thread
 {
  public boolean flag = false;
  public String ID;
  public String phoneNumber;
  public String content;
  public String subcode;
  public String clientID;
  public String sid;
  public SmsThread()
  {
   
  }
  public SmsThread(String ID,String phoneNumber,String content,String subcode,String clientID,String sid)
  {
   this.ID = ID;
   this.phoneNumber =phoneNumber;
   this.content=content;
   this.subcode = subcode;
   this.clientID = clientID;
   this.sid = sid;
  }
  public void run()
  {
   try {
    sendSmsByStrategy(ID, phoneNumber, content, subcode, clientID,
      sid);
   } catch (Exception e) {
    e.printStackTrace();
   }

//每个线程执行完成以后都会将自己的FLAG设置为TRUE,所以我们在循环取数据的时候要保证所有的线程方法都执行了,才开始下一次的数据读取。
   flag = true;   
  }
 }
 private void sendSmsByStrategy(String id, String phoneNumber, String content,String SubCode,String clientID,String sid) throws Exception
 {
    SmsSendBatch send = new SmsSendBatch(DBConn);
    send.sendSms(id, phoneNumber, content, SubCode);
   
 }

 

 

 

----------------------------------------------------

对以上方法的改良版本V1.0。上面程序存在的一个性能问题就是,每次发完10条信息都会要重新去数据库取出一次数据,这样对性能也会造成一定的影响,现在对齐进行改良,因为我们根据每次并发8条线程对于发送短信是最快的【主要是短信提供商那边提供给我们虽然有10 个并发,但是我们自己测试8个并发是最好的,10个并发会有连接异常发生】。我改变读取策略,一次从数据库中读取80条信息,放在集合中,然后用8个线程分别每天处理10条信息进行发送,等发送完毕这80条信息,然后再去取另外的80条信息。为什么不一次性取多点数据呢,因为我们还有其他的数据任务在操作短信表,如果数据量一次取太大,会对数据库造成死锁,所以取的够用就可以了。

具体代码实现:

private  void sendSmsLoop() throws Exception {
   
  
  threads.clear(); 
  smslist.clear();
  String sql =""
   +"select top 80 "
   +"smsSend.id "
   +",smsSend.phoneNumber "
   +",smsSend.content "
   +",smsSend.subcode  "
   +",smsSend.clientID  "
   +",smsSend.SID  "
   +"from  "
   +"smsSend_temp_tuping as smsSend "
   +"where "
   +"(smsSend.handleFlag = 0 "
   //+"and datediff(minute, smsSend.registerDate, getDate()) < 30 "
   //+"and left(smsSend.phoneNumber,3)  in ('130','131','132','155','156','186','134','135','136','137','138','139','150','151','152','154','157','158','159','187','188'))  or (ClientID='1' and sendDate is NULL) order by Priority desc,registerdate asc";
   +"and left(smsSend.phoneNumber,3)  in ('130','131','132','155','156','186','134','135','136','137','138','139','150','151','152','154','157','158','159','187','188'))  or (ClientID='1' and sendDate is NULL) order by registerdate asc";
  
     // System.out.println(sql);
   Stmt = DBConn.createStatement();
   Rs = Stmt.executeQuery(sql);
   int inc = 0;
   while(Rs.next())
   {
    String ID = Rs.getString("id");
    String phoneNumber = Rs.getString("phoneNumber");//手机号
    String content = StrUtil.replaceIllegalString(Rs.getString("content")); //信息
    String subcode = Rs.getString("subcode");
    String clientID = Rs.getString("clientID");
    String sid = Rs.getString("SID");
    
    SmsBean sb = new SmsBean();
    sb.setID(ID);
    sb.setPhoneNumber(phoneNumber);
    sb.setContent(content);
    sb.setSubcode(subcode);
    sb.setClientID(clientID);
    sb.setSid(sid);
    smslist.add(sb);
    inc++;
   }
   
   
   for(int i=0;i<smslist.size();i++)
   {
    SmsBean sb = (SmsBean)smslist.get(i);
    SmsThread ut = new SmsSendClientBatchNew().new SmsThread(sb.getID(),sb.getPhoneNumber(),sb.getContent(),sb.getSubcode(),sb.getClientID(),sb.getSid());
    threads.add(ut);
    ut.start();

 //判断每开启8个线程就开始等待,等待到该8个线程执行完毕,然后将该8个线程清空,然后再开启后面的8个。

   if(inc>=10)
   {
    System.out.println("信息发送量估计会大于10条信息,接着发!");
   }
   else
   {
    System.out.println("信息发送量小于10条信息,休息5秒!");
    Thread.sleep(5000);
   }
 }

 

该方法比第一种方法的效率要高点,同时也减少了对数据库的操作。

------------------------------------------------------

对以上方法的改良,上一种方式也存在缺点,比如如果有24条数据,第一条线程处理8条,第2条处理8条,第三条处理6条,如果第一条线程提比后面两条处理的都快很多,那么先处理完成的线程将被浪费,我们可以采用线程池的概念。

与数据库连接池类似的是,线程池会启动大量的空闲线程,程序将一个RUNNABLE对象的线程给线程池,线程池就会启动一挑线程来执行该对象的RUN方法,当RUN方法执行结束后,该线程不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个RUNABLE对象的RUN方法。

JDK1.5提供了一个EXECUTORS工厂类来产生线程池,该工厂类包含如下几个静态工厂方法来创建线程池:

1、newCachedThreadPool(): 创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。

2.newFixedThreadPool(int n):创建一个可重用的具有固定线程数的线程池。

3.newSingleThreadExecutor():创建一个只有单线程的线程池,他相当于newFixedThreadPool方法时传入参数为1

4.newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,他可以再制定延迟后执行线程任务,corePoolSize指池中所保存的线程数,即使线程是空闲的也被保存在线程池内。

5.newSingleThreadScheduledExecutor()创建一个只有一条线程的线程池,它可以再制定延迟后执行线程任务。

 

以上5个方法前三个返回一个 ExecutorService对象,该对象代表一个线程池,他可以执行RUNNABLE对象或CALLABLE对象所代表的线程。

EXECUTORSERVICE代表尽快执行线程的线程池(只要线程池中有空闲线程立即执行线程任务),程序只要将一个RUNNABLE或CALLABLE对象提交给线程池即可,该线程池就会尽快执行该任务。他提供了三个方法:

1. Future<?> submit(Runnable task):将一个RUNNABLE对象提交给指定的线程池,线程池将在有空闲线程时执行对象所代表的任务,其中FUTURE对象代表RUNNABLE人物的返回值,但RUN方法没有返回值,所以FUTURE对象将在RUN方法执行结束后返回NULL,但可以调用FUTURE的isDone(),isCancelled()方法获得 runnable对象的执行状态。

2.Future<T> submit(Runnable task,T result):将一个RUNNABLE对象提交给指定的线程池,线程池将在空闲线程时候执行对象的任务,RESULT显示指定线程执行结束后的返回值,所以FUTURE对象将在RUN方法执行结束后返回RESULT。

3.Future<T> submit(Callable<T> task):将一个Callable对象提交给指定的线程池,线程池将在空闲线程时候执行对象的任务,FUTURE代表将在RUN方法执行结束后返回值。

 

代码:

首先开启具有8个固定线程的线程池。

public ExecutorService pool = Executors.newFixedThreadPool(8);

 

private  void sendSmsLoop() throws Exception {
   
  
  threads.clear(); 
  smslist.clear();
  String sql =""
   +"select top 80 "
   +"smsSend.id "
   +",smsSend.phoneNumber "
   +",smsSend.content "
   +",smsSend.subcode  "
   +",smsSend.clientID  "
   +",smsSend.SID  "
   +"from  "
   +"smsSend_temp_tuping as smsSend "
   +"where "
   +"(smsSend.handleFlag = 0 "
   //+"and datediff(minute, smsSend.registerDate, getDate()) < 30 "
   +"and left(smsSend.phoneNumber,3)  in ('130','131','132','155','156','186','134','135','136','137','138','139','150','151','152','154','157','158','159','187','188'))  or (ClientID='1' and sendDate is NULL) order by registerdate asc";
   
     // System.out.println(sql);
   Stmt = DBConn.createStatement();
   Rs = Stmt.executeQuery(sql);
   int inc = 0;
   while(Rs.next())
   {
    String ID = Rs.getString("id");
    String phoneNumber = Rs.getString("phoneNumber");//手机号
    String content = StrUtil.replaceIllegalString(Rs.getString("content")); //信息
    String subcode = Rs.getString("subcode");
    String clientID = Rs.getString("clientID");
    String sid = Rs.getString("SID");
    
    SmsBean sb = new SmsBean();
    sb.setID(ID);
    sb.setPhoneNumber(phoneNumber);
    sb.setContent(content);
    sb.setSubcode(subcode);
    sb.setClientID(clientID);
    sb.setSid(sid);
    smslist.add(sb);
    inc++;
   }
   

   for(int i=0;i<smslist.size();i++)
   {
    SmsBean sb = (SmsBean)smslist.get(i);
    SmsThread ut = new SmsSendClientBatchPool().new SmsThread(sb.getID(),sb.getPhoneNumber(),sb.getContent(),sb.getSubcode(),sb.getClientID(),sb.getSid());

//开启一个线程,并将该线程放入到线程池中,然后将该线程的返回值放入到一个List<Future> 的结合中,用来保存所开启线程的返回结果。

   
   
   if(inc>=10)
   {
    System.out.println("信息发送量估计会大于10条信息,接着发!");
   }
   else
   {
    System.out.println("信息发送量小于10条信息,休息5秒!");
    Thread.sleep(5000);
   }
   
   
 }

 

用了线程池以后,如果一次取80条数据,原来每次要用8个线程来发,但是线程池每次也许只用5-6个就可以用来发送8条信息了。因为有可能第一个用完的线程在发送第7条信息的时候又被拿出来用。

分享到:
评论

相关推荐

    多线程并发处理的简单实现

    在编程领域,多线程并发处理是一种常见的优化技术,它能充分利用多核处理器的资源,提高程序的执行效率。在给定的标题“多线程并发处理的简单实现”中,我们可以深入探讨如何构建这样的系统。 首先,多线程并发处理...

    java 多线程并发实例

    在Java编程中,多线程并发是提升程序执行效率、充分利用多核处理器资源的重要手段。本文将基于"java 多线程并发实例"这个主题,深入探讨Java中的多线程并发概念及其应用。 首先,我们要了解Java中的线程。线程是...

    java多线程处理数据库数据

    通过以上方法,我们可以在Java中有效地利用多线程处理数据库数据,提高程序的并发能力和效率。记得在设计时充分考虑线程间的协作与同步,以及数据库连接的管理和优化,以确保程序的稳定性和性能。

    WEBAPI多线程并发测试工具

    标题"WEBAPI多线程并发测试工具"指出,这是一个专门针对Web API进行多线程并发测试的工具。Web API通常指的是应用程序接口,它们允许不同的服务之间进行通信,以实现数据交换和功能整合。多线程并发测试则是验证在多...

    C++ 并发多线程日志处理

    在C++编程中,多线程日志处理是一项重要的任务,尤其在高并发环境中,能够有效地记录、管理和分析系统运行时的信息。C++11引入了标准库中的`&lt;thread&gt;`,使得多线程编程变得更加方便,同时也为日志处理带来了新的挑战...

    基于Qt的多线程并发服务器

    "基于Qt的多线程并发服务器"是一个典型的解决方案,它利用了Qt库的强大功能,特别是其对多线程的支持,来处理来自多个客户端的并发请求。下面我们将深入探讨这个主题。 首先,Qt是一个跨平台的应用程序开发框架,...

    多线程处理数据(工具)样例

    本工具设计的多线程并发处理数据流,很可能是针对这样的场景,每个线程负责处理一部分数据,从而减少了等待I/O操作完成的时间。 实现多线程处理时,需要注意以下几个关键点: 1. **线程安全**:当多个线程访问共享...

    多线程并发技术

    在现代软件开发中,多线程并发技术是提高程序性能和响应速度的关键因素之一。随着计算机硬件的性能持续提升,程序也必须能够有效地利用多核处理器来实现真正的并行处理。JVM(Java虚拟机)作为Java应用程序的运行...

    qtconcurrent 多线程并发处理

    在编程领域,多线程并发处理是提升程序执行效率的关键技术之一。Qt库提供了一个强大的模块——qtconcurrent,使得开发者能够轻松地在Qt应用程序中实现多线程并行计算。本文将深入探讨qtconcurrent模块,以及如何在Qt...

    Tesseract OCR多线程并发识别案例

    在处理大量图像或需要快速响应时间的应用场景中,多线程并发识别可以显著提升效率。以下将详细介绍如何利用Tesseract OCR实现多线程并发识别,以及可能涉及的相关技术点。 首先,理解Tesseract OCR的基本工作原理是...

    多线程并发处理

    在计算机科学中,多线程并发处理是一种编程技术,它允许多个线程在同一时间执行,从而提升程序的效率和响应速度。特别是在现代处理器架构中,多线程并发处理是利用多核处理器资源的关键手段。以下是对给定文件中可能...

    模拟摄像头libuv支持多线程并发

    在IT行业中,模拟摄像头与多线程并发处理是两个重要的技术领域,特别是在视频通信和网络编程中。本文将深入探讨“模拟摄像头libuv支持多线程并发”这一主题,结合“模拟IPC”这一标签,以及文件名称“IpcSimulate”...

    多线程 高并发

    这里,我们主要探讨的是如何通过编写多线程并发程序来优化应用程序的性能,提高系统的处理能力。 首先,多线程是指在一个进程中同时执行多个线程。线程是操作系统调度的基本单位,它允许程序同时执行多个任务。多...

    c#IPCO多线程并发业务处理

    总的来说,"c# IPCO多线程并发业务处理"DEMO是一个展示C# Socket通信和多线程并发处理能力的实践案例,适用于学习和研究高并发网络服务的开发。通过深入分析和理解这个DEMO,开发者可以提升在并发处理和网络编程方面...

    并发服务器-多线程服务器详解

    一种典型的多线程并发服务器架构如下: - **主监听线程**:负责接收客户端连接请求。 - **工作线程池**:由多个工作线程组成,负责处理具体的客户端请求。 **3. 示例代码片段** 下面是一个简单的多线程服务器示例...

    多线程并发服务器(毕业设计)

    - **并发处理**:服务器需要能够同时处理多个客户端的连接请求,这就需要用到多线程并发处理技术。每个新连接都会启动一个新线程,使得服务器可以同时处理多个客户端的数据传输。 4. **缓冲区管理**: 服务器维护...

    JAVA多线程并发编程

    但同时,多线程并发也会引入一些问题,如数据竞争和同步问题。 为了解决这些问题,Java提供了多种同步机制。`synchronized`关键字用于控制对共享资源的访问,确保同一时间只有一个线程可以执行特定代码块,从而避免...

    Linux下基于socket多线程并发通信的实现.pdf

    在Linux下基于socket多线程并发通信的实现中,服务器端和客户端都是使用多线程技术来处理客户的请求。在服务器端,一旦出现客户发出的请求,服务器可以开启一个子线程来接受、处理客户的相关请求,从而获取来自客户...

    java多线程实现大批量数据导入源码

    此外,需要注意的是,多线程环境下可能存在竞态条件、死锁等问题,需要合理使用`synchronized`关键字或`Lock`接口来保证数据的一致性和安全性。另外,适当的错误处理和日志记录也是必不可少的,以确保程序的健壮性。...

Global site tag (gtag.js) - Google Analytics