`
lemonhandsome
  • 浏览: 13259 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

线程池总结

 
阅读更多

# 高效的JDK多任务执行框架

 在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承Thread类还是实现Runnable接口,都无法保证获取到之前的执行结果。通过实现Callback接口,并用Future可以来接收多线程的执行结果。

Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。

## 常规多任务执行方式

举个例子:互联网业务下,app获取一批订单的状态,使用多线程并行执行,并获取订单状态结果,需要用到线程池及Future;

```java
//自定义线程
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
        10,
        20,
        30,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(100)
);
//一批订单号
List<Long> orderList = Stream.of(10000L, 10001L).collect(Collectors.toList());
//提交任务
List<Future<String>> orderFutureList = orderList.stream().map(orderId -> poolExecutor.submit(() -> {
    // rpc 调用订单服务
    return "订单号:" + orderId + "已完成";
})).collect(Collectors.toList());
// 获取订单状态
List<String> orderStatus = orderFutureList.stream().map(future -> {
    try {
        //获取执行结果 阻塞获取执行结果
        return future.get();
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    }
    return null;
}).collect(Collectors.toList());
```

### 问题

 - 阻塞获取执行结果

   future.get() 是阻塞调用,只有等RPC返回执行结果或RPC超时才能获取到执行结果

- 不能保证接口响应时间

- 线程池存在打满风险

  待执行任务过多,导致线程池线程都处在阻塞状态,导致线程池线程打满,直至拒绝服务

 

### 优化方案1-非阻塞获取结果

```java
List<String> orderStatus = orderFutureList.stream().map(future -> {
    try {
            // 非阻塞获取返回结果 最大阻塞100ms
        return future.get(100,TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    } catch (TimeoutException e) {
    }
    return null;
}).collect(Collectors.toList());
```

#### 解决问题

- 非阻塞获取获取执行结果,最长等待时间为100ms

#### 存在的问题

- 仍然不能保证接口响应时间

  如果为单一任务,最大阻塞时间为100ms,若为批量任务,最大阻塞时间为==任务数*100ms==

- 线程池存在打满风险

  虽然获取执行任务为非阻塞,但任务提交后,仍然在执行,并未取消,因此仍然占有线程池线程

### 优化方案2-主动取消超时任务

```java
List<String> orderStatus = orderFutureList.stream().map(future -> {
    try {
        // 非阻塞获取返回结果 最大阻塞100ms
        return future.get(100,TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    } catch (TimeoutException e) {
    }finally {
        //设成false话,不允许在线程运行时中断,设成true的话就允许运行中中断
        future.cancel(true);
    }
    return null;
}).collect(Collectors.toList());
```

#### 解决问题:

- 任务执行已超时(超过100ms),取消任务,释放线程

#### 存在的问题

- 仍然不能保证接口响应时间

  如果为单一任务,最大阻塞时间为100ms,若为批量任务,最大阻塞时间为任务数*100ms

### 优化方案3-解决多任务阻塞时间累加问题

```java
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
        10,
        20,
        30,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(100)
);
List<Long> orderList = Stream.of(10000L, 10001L).collect(Collectors.toList());

List<OrderCallable> orderCallableList = orderList.stream().map(orderId -> new OrderCallable(orderId)).collect(Collectors.toList());

List<Future<String>> orderFutureList = null;
try {
    // 设置100ms超时时间
    orderFutureList = poolExecutor.invokeAll(orderCallableList,100,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {

}
if (orderFutureList == null) {
    return;
}
// 获取订单状态
List<String> orderStatus = orderFutureList.stream().map(future -> {
    try {
        // 此处不用增加超时时间处理 invokeAll内部处理
        return future.get();
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    }
//  此处不用取消任务,invokeAll内部处理
//  finally {
//        future.cancel(true);
//    }
    return null;
}).collect(Collectors.toList());
```

#### 解决问题

- 保证接口响应时间

  任务最大阻塞时间为100ms

- 线程池存在打满风险

  未完成的任务会执行取消,释放线程

#### invokeAll方法简述

- invokeAl存在有超时时间方法和无超时时间阻塞方法,可查看JDK源码

  - invokeAl带超时时间方法,内部会处理每次任务的执行时间,以此保证整体任务超时时间在设定的100ms内

## 非常规多任务执行方式

举个例子:互联网业务下,app获取商品数据,其中包含商品基本信息(商品名称、商品图片),商品价格,商品库存等信息,此处为提高接口响应能力,肯定是使用多线程并行处理,但每个业务都来自于不同的RPC,且入参及返回结果均为不同类型对象

### 问题

- invokeAll常规多任务执行方式将不适用

  invokeAll只能处理入参及返回结果类型相同数据

- invokeAll+充血模式

  将入参和出参为处理为相同类型,使用invokeAll处理,可以解决问题,但是代码结构会很混乱,增加代码维护负担,不利于需求迭代

### 方案1-submit+Callable+Future

使用submit+Callable+Future实现,future获取返回结果直接增加超时时间

```java
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
        10,
        20,
        30,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(100)
);
// 获取商品名称
Future<String> skuNameFuture = poolExecutor.submit(() -> {
    // 商品RPC服务
    return "商品名称";
});
// 获取库存数量
Future<Integer> skuStockFuture =poolExecutor.submit(() -> {
    // 库存RPC服务
    return 100;
});
// 获取价格信息
Future<Long> skuPriceFuture = poolExecutor.submit(() -> {
    // 价格RPC服务
    return 9999L;
});
//逐个获取返回结果
try {
    // 商品名称
    String skuName = skuNameFuture.get(100,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} catch (TimeoutException e){
}
try {
    // 库存数量
    Integer skuStockNum = skuStockFuture.get(100,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}catch (TimeoutException e){
}
try {
    // 商品价格
    Long skuPrice = skuPriceFuture.get(100,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}catch (TimeoutException e){
}
}
```

#### 存在的问题

- 不能保证整体接口响应时间

- 线程池存在打满风险

  待执行任务过多,导致线程池线程都处在阻塞状态,导致线程池线程打满,直至拒绝服务


### 方案2-JDK8的CompletableFuture

```java
public static Boolean allOf(Long timeout, TimeUnit unit, CompletableFuture... futures) {
        CompletableFuture<Void> allFinished = CompletableFuture.allOf(futures);
        Boolean done = false;
        try {
           // 控制整体任务超时时间
            allFinished.get(timeout, unit);
            done = true;
        } catch (InterruptedException ignore) {
        } catch (ExecutionException ignore) {
        } catch (TimeoutException e) {
        }
        return done;
    }
```

#### 解决问题

- 使用CompletableFuture的get方法增加超时时间,保证同一批任务最长等待时间为timeout

#### 存在的问题

- 线程池存在打满风险

    未完成的任务不会执行取消,不会释放线程

#### 测试代码

```java
public void allOf() {
        CompletableFuture<List<String>> task1 = MultiTaskExecute.buildCompletableFuture(() -> {
            try {
                Thread.sleep(1000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1执行完成");
            return new ArrayList<>();
        }, poolExecutor);
        // 可以执行并行的其他操作 比如组装参数,异构数据
        task1.thenAccept(list -> {
            System.out.println("task1-accept");
        });
        CompletableFuture<Boolean> task2 = MultiTaskExecute.buildCompletableFuture(() -> {
            try {
                Thread.sleep(7);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2执行完成");
            return false;
        }, poolExecutor);
        try {
            Boolean done =MultiTaskExecuteUtils.allOf(10L,TimeUnit.MILLISECONDS,task1,task2);
            System.out.println("总任务执行是否执行完整:"+done);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("开始获取结果");
        //阻塞方法获取
//        List<String> list = task1.get();
        //超时时间方式获取
//        List<String> list = task1.get(10,TimeUnit.MILLISECONDS);
        //立刻获取
        List<String> list = task1.getNow(null);
        System.out.println("task1结果:"+JSON.toJSONString(list));
        Boolean aBoolean = task2.getNow(null);
        System.out.println("task2结果:"+aBoolean);
        System.out.println("over");
    }
```



### 方案3-重写invokeAll

```java
public static Boolean invokeAll(ExecutorService executorService,
        Long timeout,
        TimeUnit unit,
        CallableTuple... callableTuples) {
        List<Future> futures = new ArrayList<>(callableTuples.length);
        for (CallableTuple t : callableTuples) {
            futures.add(executorService.submit(t.getCallable()));
        }
        boolean done = true;
        //控制超时时间
        long nanos = unit.toNanos(timeout);
        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();
        for (int i = 0; i < size; i++) {
            Future f = futures.get(i);
            if (!f.isDone()) {
                if (nanos <= 0L) {
                    done = false;
                    continue;
                }
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException ignore) {
                    done = false;
                }
               // 超时时间
                nanos = deadline - System.nanoTime();
            }
        }
        if (!done) {
            for (int j = 0; j < size; j++) {
                // 任务取消
                futures.get(j).cancel(true);
            }
        }
        for (int i = 0; i < futures.size(); i++) {
            Future future = futures.get(i);
            CallableTuple callableTuple = callableTuples[i];
            if (!future.isCancelled()) {
                try {
                   // 直接获取结果
                    callableTuple.setResult(future.get());
                } catch (InterruptedException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        return done;
    }
```

#### 测试代码

```java
public void invokeAll() {
        MultiTaskExecute.CallableTuple<ArrayList<String>> task1 = MultiTaskExecute.CallableTuple.builder().callable(() -> {
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
//                e.printStackTrace();
            }
            System.out.println("task1执行完成");
            return new ArrayList<String>();
        }).build();

        MultiTaskExecute.CallableTuple<Boolean> task2 = MultiTaskExecute.CallableTuple.builder().callable(() -> {
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
//                e.printStackTrace();
            }
            System.out.println("task2执行完成");
            return false;
        }).build();


        MultiTaskExecute.CallableTuple<String> task3 = new MultiTaskExecute.CallableTuple<>(() -> {
            try {
                Thread.sleep(6);
            } catch (InterruptedException e) {
//                e.printStackTrace();
            }
            System.out.println("task3执行完成");
            return "task3";
        });

        try {
            Boolean done = MultiTaskExecute.invokeAll(poolExecutor, 10L, TimeUnit.MILLISECONDS, task1, task2, task3);
            System.out.println("总任务执行是否执行完整:"+done);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("开始获取结果");
        List<String> list = task1.get();
        System.out.println("task1结果:"+JSON.toJSONString(list));
        Boolean aBoolean = task2.get();
        System.out.println("task2结果:"+aBoolean);
        System.out.println("task3结果:"+task3.get());
        System.out.println("over");
    }
```

#### 解决问题

- 超时任务主动调用cancel方法,取消未执行及进行中任务,释放线程

#### 存在的问题

- 获取执行结果是按照构建task的顺序,可能存在已执行完成但由于未及时获取结果导致整体任务无结果返回

### 方案4-最完美的invokeAll

使用CompletionService的poll方法,实现任务执行完成立即返回的目的

```java
    public static Boolean perfectInvokeAll(ExecutorService executorService, Long timeout, TimeUnit unit, CallableTuple... callableTuples) {
        CompletionService completionService = new ExecutorCompletionService(executorService);
        Map<Future, CallableTuple> futureCallableTupleMap = new HashMap<>(callableTuples.length * 4 / 3 + 1);
        for (CallableTuple t : callableTuples) {
            futureCallableTupleMap.put(completionService.submit(t.getCallable()), t);
        }
        boolean done = true;
        long nanos = unit.toNanos(timeout);
        final long deadline = System.nanoTime() + nanos;
        for (int i = 0; i < futureCallableTupleMap.size(); i++) {
            try {
                Future f = completionService.poll(nanos, TimeUnit.NANOSECONDS);
                if (null != f) {
                    futureCallableTupleMap.get(f).setResult(f.get());
                } else {
                    done = false;
                }
            } catch (InterruptedException ignore) {

            } catch (ExecutionException ignore) {

            }
            nanos = deadline - System.nanoTime();
        }
        if (!done) {
            futureCallableTupleMap.forEach((future, c) -> {
                future.cancel(true);
            });
        }
        return done;
    }
```

#### 解决问题

- 避免顺序获取结果导致整体任务无结果问题



## 工具类

[MultiTaskExecute.java](http://storage.360buyimg.com/o2o-cms-daojia.com/MultiTaskExecute.java)

## 总结

- 若存在任务间依赖可使用方案2,但需要使用单独线程池
- 若全部为并行调用且需要控制整体任务超时时间,建议使用方案4

分享到:
评论

相关推荐

    MYSQL线程池总结

    线程池是Mysql5.6的一个核心功能,对于服务器应用而言,无论是web应用服务还是DB服务,高并发请求始终是一个绕不开的话题。当有大量请求并发访问时,一定伴随着资源的不断创建和释放,导致资源利用率低,降低了服务...

    简单线程池与线程池检查的实现

    总结来说,这个主题涵盖了线程池的基本概念、实现原理以及线程池的检查和维护,对于理解和优化多线程环境下的程序性能至关重要。通过源码分析和定制工具,开发者可以更好地控制和监控线程池,提升系统的稳定性和效率...

    VC++ 线程池(ThreadPool)实现

    总结: 线程池在VC++中通过Windows API实现,提供了高效且灵活的多线程处理方式。它可以帮助开发者优化程序性能,同时降低资源消耗。理解线程池的工作原理以及如何在VC++中正确使用它,对于编写高性能的多线程应用至...

    Quartz 线程池

    总结,Quartz 线程池是实现高效、可靠任务调度的关键组件。理解其工作原理并合理配置,能够帮助我们更好地利用系统资源,提升应用的响应速度和稳定性。在开发过程中,对线程池的监控和优化是确保系统性能的重要环节...

    Java8并行流中自定义线程池操作示例

    Java8并行流中自定义线程池操作示例 Java8并行流中自定义线程池操作示例主要介绍了Java8并行流中自定义线程池操作,结合实例形式分析了并行流的相关概念、定义及自定义线程池的相关操作技巧。 1. 概览 Java8引入了...

    线程池管理线程demo

    总结来说,线程池是并发编程中不可或缺的一部分,通过合理管理和配置线程池,可以显著提高程序的并发性能,同时降低系统资源的消耗。在实际应用中,我们需要根据任务特性选择合适的线程池类型,并正确地关闭线程池,...

    windows线程池,使用Windows自带的线程池api功能,比你写的线程池性能好得多

    总结,Windows线程池API是开发高并发、高性能应用的重要工具,它提供了强大的并发控制和资源管理能力,使得开发者能够专注于任务逻辑,而无需关心底层线程的管理和调度细节。通过熟练掌握和恰当使用Windows线程池,...

    DELPHI的ThreadPool的线程池DEMO

    7. **总结** DELPHI的线程池是实现并发处理的有效工具,通过模仿.NET的ThreadPool,为开发者提供了一种灵活、高效的并发编程方式。正确使用线程池能够提升程序性能,降低系统资源消耗,同时简化并发编程的复杂性。...

    线程池原理及创建(C++实现)

    #### 六、总结 线程池是一种有效提高系统性能和资源利用率的技术。通过预创建一组线程并在这些线程之间分配任务,可以显著减少线程创建和销毁的开销,提高服务器的响应速度和吞吐量。在设计线程池时,需要考虑...

    vc++线程池的经典实例

    总结来说,VC++线程池的经典实例展示了如何利用线程池优化多线程程序的性能,通过合理调度线程,减少系统资源的浪费。学习和理解线程池的实现与使用,对于提升VC++程序的并发处理能力和系统效率至关重要。

    火山安卓编程线程池例子

    总结来说,"火山安卓编程线程池例子"是一个关于Android应用开发中利用火山编程框架进行多线程管理和优化的实践案例。通过对这个例子的学习,开发者能够深入理解线程池的工作原理和优势,提升其在Android并发编程中的...

    线程池管理多线程上传

    总结来说,线程池通过统一管理和复用线程,提高了多线程环境下的效率和稳定性。在文件上传场景中,线程池能有效利用系统资源,加速大文件的上传速度,同时通过合理的任务调度和等待机制,确保上传过程的可靠性和一致...

    TOMCAT的线程池源码

    总结来说,Tomcat的线程池源码设计精巧,兼顾了性能与可管理性。通过深入学习和理解这部分源码,开发者不仅可以掌握线程池的工作原理,还能更好地优化自己的Web应用,提升服务的并发处理能力和稳定性。对于希望提升...

    用线程池异步加载

    总结起来,这个项目是关于如何在Android应用中使用线程池进行图片的异步加载,以优化性能并提高用户体验。通过合理配置线程池,我们可以有效地控制并发任务的数量,同时避免了主线程被阻塞,保证了应用的流畅运行。...

    C++简单线程池设计

    总结起来,C++的线程池设计是一个涉及线程同步、任务调度和资源管理的复杂过程。结合pthread库提供的线程操作和curl库的网络功能,我们可以构建出一个能够高效处理并发任务的线程池,适应各种并发场景的需求。

    线程池的简单实现

    总结来说,线程池是一种优化多线程处理的方式,通过复用线程和控制并发量,提高了系统的效率和稳定性。在Java中,我们可以使用`ThreadPoolExecutor`来创建线程池,并通过`ExecutorService`接口进行管理和操作。通过...

    concurrent线程池的实现技术分析

    **总结** 线程池通过合理管理和调度线程,提高了系统的并发性能和资源利用率。`concurrent`包的线程池实现结合了线程池和通道技术,提供了灵活的阻塞行为策略,使其能适应不同场景的需求。深入理解线程池的工作原理...

    多线程编程线程池

    #### 十、总结 通过合理使用线程池,开发者可以显著提升应用程序的性能和稳定性。无论是处理大量短时任务还是实现后台任务的调度和监控,线程池都是一个强大的工具。然而,在使用线程池时也需要考虑其局限性和潜在的...

    线程池.zip,互斥锁+条件变量+队列,实现线程池,包括线程池的创建,塞任务,和销毁线程池

    总结起来,这个项目通过线程池优化了多线程的调度,使用互斥锁保证了线程安全,通过条件变量实现了线程间的同步,队列则作为任务调度的基础。这些技术的结合使得程序能够高效、稳定地处理并发任务,是多线程编程中的...

    Qt实现线程池开发实例

    **总结** 通过Qt实现线程池,我们可以更有效地管理并发任务,提升程序性能。结合QTcpSocket,我们可以构建一个能够处理网络请求的服务器,同时利用线程池执行复杂的后台任务。这个实例适合Qt初学者,有助于理解...

Global site tag (gtag.js) - Google Analytics