一种在线程池中执行目标代码,异步获取返回值的方式。
仅供参考,欢迎拍砖。
/**
* 异步返回结果
* @author benyi
* @date 2017-12-9
*
*/
public class AsyResult {
private String tid;
private Object result;
private boolean finished=false;
public AsyResult(String id){
tid=id;
finished=false;
}
public String getId(){
return tid;
}
public void setResult(Object r){
result=r;
finished=true;
}
public Object getResult(){
return result;
}
public boolean isFinished(){
return finished;
}
public Exception getException(){
if(result!=null){
if(haveException()){
return (Exception)result;
}
}
return null;
}
public boolean haveException(){
if(result!=null){
if(result instanceof Exception ){
return true;
}
}
return false;
}
}
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 一个并行调用远程接口的工具
* 基本思路:调用者 添加到要调用的方法到本对象,获取回执ID,本对象代理执行方法,将返回值通知给调用者
* 调用者同步等待返回值, 或者直到超时
* @author admin
*
*/
public class ParalRunner {
private static final Logger log=LoggerFactory.getLogger(ParalRunner.class);
private ExecutorService threadPool = Executors.newCachedThreadPool();
public ParalRunner(){
//
}
public AsyResult putTask(CallTask task){
if(task==null){
return null;
}
if( !task.passCheck() ){
return null;
}
AsyResult a=new AsyResult(task.getTaskId());
boolean b=putThreadTask(task,a);
if(!b){
return null;
}
return a;
}
public static Method getMethod(Object obj,String name){
Method[] ms=obj.getClass().getMethods();
for(int i=0;i<ms.length;i++){
if(ms[i].getName().equals(name)){
return ms[i];
}
}
return null;
}
private boolean putThreadTask(CallTask task,AsyResult ret){
threadPool.execute(new TaskRunner(task,ret));
return true;
}
class TaskRunner implements Runnable {
public CallTask task;
public AsyResult result;
public TaskRunner(CallTask t,AsyResult r){
this.task=t;
this.result=r;
}
public void run(){
Method m=task.getMethod();
Object obj=task.getObj();
Object []ps=task.getParameters();
if(ps==null){
ps=new Object[0];
}
try {
Object ret=m.invoke(obj, ps);
result.setResult(ret);
} catch (Exception e) {
result.setResult(e);
log.error("执行:"+obj.getClass()+"."+m.getName()+"异常:"+e.getMessage(),e);
}
}
}
}
import java.io.BufferedReader;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URL;
import java.net.URLConnection;
/** *测试*/
public class TestParal {
private static ParalRunner runner=new ParalRunner();
private PrintStream out;
public String test() throws Exception {
URL url = new URL("http://goods.scn.wl.cn/goodsRestApi/queryGoodsList?enterpriseid=157");
URLConnection connection = url.openConnection();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
StringBuilder sb=new StringBuilder();
String current;
while((current = in.readLine()) != null){
sb.append(current);
}
in.close();
connection=null;
return sb.toString();
}
public String testPrice(String goodsids) throws Exception {
URL url = new URL("http://test.goods.scn.wl.cn/goodsRestApi/getPriceYN?enterpriseid=138&goodstype=0&goodsid="+goodsids);
URLConnection connection = url.openConnection();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
StringBuilder sb=new StringBuilder();
String current;
while((current = in.readLine()) != null){
sb.append(current);
}
in.close();
connection=null;
return sb.toString();
}
public String testStorage(String goodsids) throws Exception {
URL url = new URL("http://test.storage.scn.wl.cn/queryReport/getStockYN?enterpriseid=138&goodsid="+goodsids);
URLConnection connection = url.openConnection();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
StringBuilder sb=new StringBuilder();
String current;
while((current = in.readLine()) != null){
sb.append(current);
}
in.close();
connection=null;
return sb.toString();
}
public AsyResult putTast(String method,Object []ps){
CallTask c=new CallTask(this,runner.getMethod(this, method),ps);
AsyResult r=runner.putTask(c);
out.println("Add-id=" + r.getId() );
return r;
}
public void getReturn(AsyResult []ids){
long t1=System.currentTimeMillis();
boolean quit=false;
while(!quit){
try{
Thread.sleep(32);
}catch(Exception e){
e.printStackTrace();
}
quit=true;
for(int i=0;i<ids.length;i++){
if(ids[i]!=null){
if(ids[i].isFinished()){
Object o=ids[i].getResult();
if(ids[i].haveException()){
Exception e=ids[i].getException();
e.printStackTrace(out);
}else{
out.println("ID="+ids[i].getId()+",Return="+o);
}
out.println("ID="+ids[i].getId()+",Time="+(System.currentTimeMillis()-t1));
ids[i]=null;
}else{
quit=false;
}
}
}
}
out.println("Total Time="+(System.currentTimeMillis()-t1));
}
public static void main(String []args) throws Exception {
TestParal t=new TestParal();
t.out = new PrintStream(new FileOutputStream("/d:/test.log"));
AsyResult a1=t.putTast("testPrice",new Object[]{"139949"});
AsyResult a2=t.putTast("testStorage",new Object[]{"139949"});
AsyResult a3=t.putTast("testPrice",new Object[]{"139949"});
AsyResult a4=t.putTast("testPrice",new Object[]{"139949"});
AsyResult []ids={a1,a2,a3,a4};
t.getReturn(ids);
t.out.close();
}
}
相关推荐
在IT领域,多线程并发服务器模型是一种常见的高性能网络服务实现方式。本项目展示了三种不同的实现方式,分别涉及多线程、select系统调用以及基于链表的管理策略。以下是对这些知识点的详细阐述: 一、多线程 多...
在Java编程领域,协同程序(Coroutines)是一种轻量级的并发执行模型,它提供了一种更为高效且灵活的方式来管理程序中的并发操作。与传统的线程模型不同,协程更像是一种用户级的线程,它们不需要操作系统级别的切换...
AQS是Java并发包中用于构建锁和同步组件的核心抽象类,它基于一种FIFO(先进先出)的等待队列机制。AQS维护了一个int类型的state字段,用于表示资源的状态。当线程试图获取资源时,如果资源不可用,线程会被添加到...
在IT行业中,完成端口(IOCP,Input/Output Completion Port)是一种用于高效并发处理I/O操作的技术,尤其适用于服务器应用程序。在DELPHI环境下,通过利用完成端口,开发者可以构建出能处理大量并发连接的服务,...
在C++编程中,线程池是一种有效地管理并发任务的技术,它可以提高系统资源的利用率,减少线程创建和销毁的开销。线程池的基本思想是预先创建一定数量的线程,然后将任务放入队列中,由这些线程来执行。这种方式避免...
例如,`epoll`提供了一种更高效的方式,因为它支持边缘触发和水平触发模式,可以处理大量并发连接。 接下来,我们讨论"TCP并发服务器"。TCP(传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议...
SELECT是一种I/O多路复用技术,它允许程序在一个或多个描述符上等待事件发生。在TCP服务器中,这些描述符通常代表客户端的套接字,服务器可以通过SELECT监控这些套接字,判断哪些已经准备好进行读写操作。 在给定的...
这是一种常见的限流(Rate Limiting)机制,旨在防止恶意的请求对服务器产生不良影响。 API 调用次数限制实现可以通过多种方法来实现,例如使用 Redis 的 INCR 命令来实现简单的限流机制。但是,这种方法存在一些...
另一种实现方式是通过并发调用`accept`系统函数来实现线程预创建。在这种方法中,服务器在启动时创建多个线程,每个线程都在循环中调用`accept`函数等待新的客户端连接。当`accept`函数返回一个新的套接字句柄时,...
Linux是一种开源、免费的操作系统,它支持多任务和多线程处理,使得在同一时间可以执行多个程序。对于服务器应用,Linux因其稳定性和安全性而备受青睐。 多进程并发是指在一个程序中创建并同时运行多个子进程,每个...
本文将深入探讨一个由C#语言编写的高并发通讯组件,旨在为开发者提供一种有效处理大规模并发连接的解决方案。这个组件在作者的测试中,在普通计算机上实现了上万级别的并发性能,展示出其强大的性能潜力。 首先,...
Java 并发工具类是 Java 并发编程中的一种重要工具,用于协调线程之间的并发执行。 Java 并发工具类中有很多种,今天我们主要介绍四种:CyclicBarrier、CountDownLatch、Semaphore 和 Exchanger。 一、...
线程池是一种高效的线程管理方式,它可以复用已存在的线程,避免频繁创建和销毁线程带来的开销。C#中的`ThreadPool`类提供线程池服务,通过`QueueUserWorkItem`方法提交任务到线程池,适合执行大量短生命周期的任务...
一种是通过继承Thread类,重写其run()方法,然后实例化并调用start()方法启动线程。例如: ```java class MyThread extends Thread { @Override public void run() { // 业务逻辑 } } public class Main { ...
Java作为一种广泛使用的编程语言,其在实现高并发推送服务方面具有明显的优势。本文档探讨了使用Java实现高并发推送服务的技术和方法,重点分析了如何通过Java NIO(New I/O)技术、消息队列技术、缓存技术等多种...
系统调用是用户程序与操作系统之间交互的一种方式,用于请求操作系统提供服务。在C语言中,通常通过`#include <unistd.h>`等头文件来访问这些系统调用。 ### 3. 线程管理与同步 #### 线程创建与管理 在C语言中,...
另一种是使用协程,如gevent,它通过切换执行环境来实现类似并行的效果。此外,还可以使用异步请求框架如aiohttp。每种方法都有其优缺点,具体选择取决于具体需求和场景。 5. grequests的引入:grequests是基于...
Java并发容器CopyOnWriteArrayList实现原理及源码分析 Java并发容器...CopyOnWriteArrayList容器是一种高性能的并发容器,特别适用于读多写少的并发场景,但是需要注意其内存占用问题和实时性问题。
Asio提供了一种统一的API,用于处理I/O事件,包括网络通信。它的设计基于非阻塞I/O模型,可以实现异步操作,使得服务器能够同时处理多个客户端的请求,从而达到高并发的目的。在Asio中,主要涉及的概念有服务...
8. **原子操作类**:`java.util.concurrent.atomic`包下的原子类如`AtomicInteger`、`AtomicReference`等,提供了一种无锁编程的方式,通过CAS(Compare and Swap)操作来更新变量,保证了操作的原子性。 9. **死锁...