`

并发调用的一种实现方式

阅读更多

一种在线程池中执行目标代码,异步获取返回值的方式。

仅供参考,欢迎拍砖。


/**
 * 异步返回结果
 * @author benyi
 * @date 2017-12-9
 *
 */
public class AsyResult {
    private String tid;
    private Object result;
    private boolean finished=false;
   
    public AsyResult(String id){
        tid=id;
        finished=false;
    }
   
    public String getId(){
        return tid;
    }
   
    public void setResult(Object r){
        result=r;
        finished=true;
    }
   
    public Object getResult(){
        return result;
    }
   
    public boolean isFinished(){
        return finished;
    }
   
    public Exception getException(){
        if(result!=null){
            if(haveException()){
                return (Exception)result;
            }
        }
        return null;
    }
   
    public boolean haveException(){
        if(result!=null){
            if(result instanceof Exception ){
                return true;
            }
        }
        return false;
    }
}


import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 一个并行调用远程接口的工具
 * 基本思路:调用者 添加到要调用的方法到本对象,获取回执ID,本对象代理执行方法,将返回值通知给调用者
 * 调用者同步等待返回值, 或者直到超时
 * @author admin
 *
 */
public class ParalRunner {
    private static final Logger log=LoggerFactory.getLogger(ParalRunner.class);
   
    private ExecutorService threadPool = Executors.newCachedThreadPool();
   
    public ParalRunner(){
        //
    }
   
    public AsyResult putTask(CallTask task){
        if(task==null){
            return null;
        }
        if( !task.passCheck() ){
            return null;
        }
        AsyResult a=new AsyResult(task.getTaskId());
        boolean b=putThreadTask(task,a);
        if(!b){
            return null;
        }
        return a;
    }
   
    public static Method getMethod(Object obj,String name){
        Method[] ms=obj.getClass().getMethods();
        for(int i=0;i<ms.length;i++){
            if(ms[i].getName().equals(name)){
                return ms[i];
            }
        }
        return null;
    }
   
    private boolean putThreadTask(CallTask task,AsyResult ret){
        threadPool.execute(new TaskRunner(task,ret));
        return true;
    }
     
   
    class TaskRunner implements Runnable {
       
        public CallTask task;
        public AsyResult result;
       
        public TaskRunner(CallTask t,AsyResult r){
            this.task=t;
            this.result=r;
        }
       
        public void run(){
            Method m=task.getMethod();
            Object obj=task.getObj();
            Object []ps=task.getParameters();
            if(ps==null){
                ps=new Object[0];
            }           
            try {
                Object ret=m.invoke(obj, ps);
                result.setResult(ret); 
            } catch (Exception e) {
                result.setResult(e);
                log.error("执行:"+obj.getClass()+"."+m.getName()+"异常:"+e.getMessage(),e);
            }
        }
    }
   
}

import java.io.BufferedReader;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URL;
import java.net.URLConnection;
/** *测试*/
public class TestParal {
   
    private static ParalRunner runner=new ParalRunner();
   
    private PrintStream out;
             
    public String test() throws Exception {
        URL url = new URL("http://goods.scn.wl.cn/goodsRestApi/queryGoodsList?enterpriseid=157");
        URLConnection connection = url.openConnection();
        BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
        StringBuilder sb=new StringBuilder();
        String current;
        while((current = in.readLine()) != null){
            sb.append(current);
        }
        in.close();
        connection=null;
        return sb.toString();
    }
    public String testPrice(String goodsids) throws Exception {
        URL url = new URL("http://test.goods.scn.wl.cn/goodsRestApi/getPriceYN?enterpriseid=138&goodstype=0&goodsid="+goodsids);
        URLConnection connection = url.openConnection();
        BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
        StringBuilder sb=new StringBuilder();
        String current;
        while((current = in.readLine()) != null){
            sb.append(current);
        }
        in.close();
        connection=null;
        return sb.toString();
    }
    public String testStorage(String goodsids) throws Exception {
        URL url = new URL("http://test.storage.scn.wl.cn/queryReport/getStockYN?enterpriseid=138&goodsid="+goodsids);
        URLConnection connection = url.openConnection();
        BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
        StringBuilder sb=new StringBuilder();
        String current;
        while((current = in.readLine()) != null){
            sb.append(current);
        }
        in.close();
        connection=null;
        return sb.toString();
    }
    public AsyResult putTast(String method,Object []ps){
        CallTask c=new CallTask(this,runner.getMethod(this, method),ps);
        AsyResult r=runner.putTask(c);
        out.println("Add-id=" + r.getId() );
        return r;
    }

    public void getReturn(AsyResult []ids){
        long t1=System.currentTimeMillis();
         
        boolean quit=false;
        while(!quit){
            try{
                Thread.sleep(32);
            }catch(Exception e){
                e.printStackTrace();
            }
            quit=true;
            for(int i=0;i<ids.length;i++){
                if(ids[i]!=null){
                    if(ids[i].isFinished()){
                        Object o=ids[i].getResult();
                        if(ids[i].haveException()){
                            Exception e=ids[i].getException();
                            e.printStackTrace(out);
                        }else{
                            out.println("ID="+ids[i].getId()+",Return="+o);
                        }
                        out.println("ID="+ids[i].getId()+",Time="+(System.currentTimeMillis()-t1));   
                        ids[i]=null;
                    }else{
                        quit=false;
                    }
                }
            }  
        }
        out.println("Total Time="+(System.currentTimeMillis()-t1));
    }
   
   
    public static void main(String []args) throws Exception {
        TestParal t=new TestParal();
       
        t.out = new PrintStream(new FileOutputStream("/d:/test.log"));       

        AsyResult a1=t.putTast("testPrice",new Object[]{"139949"});
        AsyResult a2=t.putTast("testStorage",new Object[]{"139949"});
        AsyResult a3=t.putTast("testPrice",new Object[]{"139949"});
        AsyResult a4=t.putTast("testPrice",new Object[]{"139949"});
        AsyResult []ids={a1,a2,a3,a4};

        t.getReturn(ids);

        t.out.close();
       
    }
   
}

0
0
分享到:
评论

相关推荐

    自己写的多线程并发服务器模型 3种实现方式

    在IT领域,多线程并发服务器模型是一种常见的高性能网络服务实现方式。本项目展示了三种不同的实现方式,分别涉及多线程、select系统调用以及基于链表的管理策略。以下是对这些知识点的详细阐述: 一、多线程 多...

    coroutines,协同并发的纯Java实现,AKA协同程序.zip

    在Java编程领域,协同程序(Coroutines)是一种轻量级的并发执行模型,它提供了一种更为高效且灵活的方式来管理程序中的并发操作。与传统的线程模型不同,协程更像是一种用户级的线程,它们不需要操作系统级别的切换...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    AQS是Java并发包中用于构建锁和同步组件的核心抽象类,它基于一种FIFO(先进先出)的等待队列机制。AQS维护了一个int类型的state字段,用于表示资源的状态。当线程试图获取资源时,如果资源不可用,线程会被添加到...

    IOCP DELPHI实现高性能并发

    在IT行业中,完成端口(IOCP,Input/Output Completion Port)是一种用于高效并发处理I/O操作的技术,尤其适用于服务器应用程序。在DELPHI环境下,通过利用完成端口,开发者可以构建出能处理大量并发连接的服务,...

    c++ 线程池实现,应用并发处理

    在C++编程中,线程池是一种有效地管理并发任务的技术,它可以提高系统资源的利用率,减少线程创建和销毁的开销。线程池的基本思想是预先创建一定数量的线程,然后将任务放入队列中,由这些线程来执行。这种方式避免...

    多路复用IO以及TCP并发服务器的实现(练习)

    例如,`epoll`提供了一种更高效的方式,因为它支持边缘触发和水平触发模式,可以处理大量并发连接。 接下来,我们讨论"TCP并发服务器"。TCP(传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议...

    SELECT实现并发服务器

    SELECT是一种I/O多路复用技术,它允许程序在一个或多个描述符上等待事件发生。在TCP服务器中,这些描述符通常代表客户端的套接字,服务器可以通过SELECT监控这些套接字,判断哪些已经准备好进行读写操作。 在给定的...

    API 调用次数限制实现

    这是一种常见的限流(Rate Limiting)机制,旨在防止恶意的请求对服务器产生不良影响。 API 调用次数限制实现可以通过多种方法来实现,例如使用 Redis 的 INCR 命令来实现简单的限流机制。但是,这种方法存在一些...

    Linux下预创建线程并发网络服务器的两种实现方法.pdf

    另一种实现方式是通过并发调用`accept`系统函数来实现线程预创建。在这种方法中,服务器在启动时创建多个线程,每个线程都在循环中调用`accept`函数等待新的客户端连接。当`accept`函数返回一个新的套接字句柄时,...

    多进程并发服务器实现

    Linux是一种开源、免费的操作系统,它支持多任务和多线程处理,使得在同一时间可以执行多个程序。对于服务器应用,Linux因其稳定性和安全性而备受青睐。 多进程并发是指在一个程序中创建并同时运行多个子进程,每个...

    一个C#高并发通讯组件

    本文将深入探讨一个由C#语言编写的高并发通讯组件,旨在为开发者提供一种有效处理大规模并发连接的解决方案。这个组件在作者的测试中,在普通计算机上实现了上万级别的并发性能,展示出其强大的性能潜力。 首先,...

    java并发之并发工具类

    Java 并发工具类是 Java 并发编程中的一种重要工具,用于协调线程之间的并发执行。 Java 并发工具类中有很多种,今天我们主要介绍四种:CyclicBarrier、CountDownLatch、Semaphore 和 Exchanger。 一、...

    CsGo并发流程控制框架

    线程池是一种高效的线程管理方式,它可以复用已存在的线程,避免频繁创建和销毁线程带来的开销。C#中的`ThreadPool`类提供线程池服务,通过`QueueUserWorkItem`方法提交任务到线程池,适合执行大量短生命周期的任务...

    Java 模拟线程并发

    一种是通过继承Thread类,重写其run()方法,然后实例化并调用start()方法启动线程。例如: ```java class MyThread extends Thread { @Override public void run() { // 业务逻辑 } } public class Main { ...

    使用Java实现高并发推送服务.pdf

    Java作为一种广泛使用的编程语言,其在实现高并发推送服务方面具有明显的优势。本文档探讨了使用Java实现高并发推送服务的技术和方法,重点分析了如何通过Java NIO(New I/O)技术、消息队列技术、缓存技术等多种...

    线程并发拷贝程序

    系统调用是用户程序与操作系统之间交互的一种方式,用于请求操作系统提供服务。在C语言中,通常通过`#include &lt;unistd.h&gt;`等头文件来访问这些系统调用。 ### 3. 线程管理与同步 #### 线程创建与管理 在C语言中,...

    Python使用grequests(gevent+requests)并发发送请求过程解析

    另一种是使用协程,如gevent,它通过切换执行环境来实现类似并行的效果。此外,还可以使用异步请求框架如aiohttp。每种方法都有其优缺点,具体选择取决于具体需求和场景。 5. grequests的引入:grequests是基于...

    java并发容器CopyOnWriteArrayList实现原理及源码分析

    Java并发容器CopyOnWriteArrayList实现原理及源码分析 Java并发容器...CopyOnWriteArrayList容器是一种高性能的并发容器,特别适用于读多写少的并发场景,但是需要注意其内存占用问题和实时性问题。

    高并发回应服务器

    Asio提供了一种统一的API,用于处理I/O事件,包括网络通信。它的设计基于非阻塞I/O模型,可以实现异步操作,使得服务器能够同时处理多个客户端的请求,从而达到高并发的目的。在Asio中,主要涉及的概念有服务...

    java并发编程书籍

    8. **原子操作类**:`java.util.concurrent.atomic`包下的原子类如`AtomicInteger`、`AtomicReference`等,提供了一种无锁编程的方式,通过CAS(Compare and Swap)操作来更新变量,保证了操作的原子性。 9. **死锁...

Global site tag (gtag.js) - Google Analytics