`
raymond.chen
  • 浏览: 1441347 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

CompletableFuture的使用

 
阅读更多

CompletableFuture是 java1.8 提供的一个新类,是对Future的增强,吸收了guava异步线程的特点,可以实现一系列的异步线程操作。CompletableFuture可以简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

 

CompletableFuture合适每个操作很复杂需要花费很长时间的的场景。

 

范例一:

/**
 * supplyAsync: 异步开始一个任务,并返回结果
 * thenApply: 处理上一步的执行结果
 * 
 * allOf: 所有任务完成后才会返回
 * anyOf: 任一任务完成就返回
 */
private static void test2() throws Exception {
	List<String> list = new ArrayList<>();
	ExecutorService executorService = Executors.newFixedThreadPool(10);
	List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
	
	Long start = System.currentTimeMillis();
	CompletableFuture[] arr = taskList.stream()
		.map(x -> CompletableFuture.supplyAsync(() -> x, executorService) //对每个元素异步做处理,并返回一个CompletableFuture对象
			.thenApply(y -> Integer.toString(y)) //处理结果进一步处理
			.whenComplete((r, e) -> { //异步处理完成后,获取结果
				 list.add(r);
			})
		)
		.toArray(CompletableFuture[]::new); //所有CompletableFuture对象转成一个数组

	boolean all = false;
	if(all){
		CompletableFuture.allOf(arr).join(); //等待所有任务完成后才会继续往下执行
	}else{
		Object result = CompletableFuture.anyOf(arr).get(); //只要有一个任务完成就继续往下执行
		System.out.println("result=" + result);
	}
	
	System.out.println("list="+list+", 耗时="+(System.currentTimeMillis()-start));
	executorService.shutdown();
}

 

范例二:

/**
 * thenCompose:对两个任务进行串行执行,第一个完成后,将其结果作为参数传递给第二个
 */
public static void test3() throws Exception {
	CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
		try {
			System.out.println("task1 doing...");
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "result1";
	});

	CompletableFuture<String> completableFuture2 = completableFuture1.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
		try {
			System.out.println("prvo result=" + result);
			System.out.println("task2 doing...");
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "result2";
	}));

	System.out.println(completableFuture2.get());
}

 

范例三:

/**
 * thenCombine:组合两个执行结果。两个任务是并行执行
 * thenAccept:链末消费:接收上一阶段的输出作为本阶段的输入,执行任务
 */
public static void test4() throws Exception {		
	CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
		try {
			System.out.println("task1 doing start");
			Thread.sleep(1000);
			System.out.println("task1 doing end");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return 100;
	});
	completableFuture1.thenAccept(result -> System.out.println("result1=" + result));
	
	CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
		try {
			System.out.println("task2 doing start");
			Thread.sleep(3000);
			System.out.println("task2 doing end");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return 300;
	});
	completableFuture2.thenAccept(result -> System.out.println("result2=" + result));
	
	CompletableFuture<Integer> completableFuture3 = completableFuture2.thenCombine(completableFuture1,
		//合并函数
		(result1, result2) -> result1 + result2
	);

	System.out.println(completableFuture3.get());
}

 

范例四:

/**
 * 递归做 thenCombine 处理
 */
public static void test5() throws Exception {
	long s = System.currentTimeMillis();
	
	CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
		try {
			System.out.print("开始两两合并处理:0 + ");
			Thread.sleep(10);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return 0;
	});
	
	for(int i=1; i<=7; i++){
		completableFuture1 = completableFuture1.thenCombine(
			CompletableFuture.supplyAsync(() -> {
				int x = 0;
				try {
					Thread.sleep(3000);
					x = count.incrementAndGet();
					System.out.print(x + " + ");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return x;
			})
			, (r1, r2) -> r1 + r2
		);
	}
	
	System.out.println("0 = " + completableFuture1.get());
	System.out.println(System.currentTimeMillis() - s);
}

 

范例五:

/**
 * runAsync: 异步执行,无返回
 * supplyAsync: 异步执行,有返回
 */
public static void test6(){
	System.out.println("start doing...");
	
	CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
		try {
			Thread.sleep(3000);
			int result = 100/0;
		} catch (Exception ex) {
			throw new RuntimeException(ex);
		}
	});
	
	try {
		future1.get();
	} catch (Exception ex) {
		System.out.println("future1 error: " + ex.toString());
	}
	
	CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
		try {
			Thread.sleep(3000);
			int result = 100/0;
			return result;
		} catch (Exception ex) {
			throw new RuntimeException(ex);
		}
	});
	
	try {
		System.out.println(future2.get());
	} catch (Exception ex) {
		System.out.println("future2 error: " + ex.toString());
	}
}

 

范例六:

/**
 * applyToEither: 应用两者中的任一,哪个先返回就用哪个
 * runAfterBoth: 两个任务都完成后执行函数
 * runAfterEither: 其中一个任务完成后执行函数,另一个任务忽略
 */
public static void test7(){
	String original = "Message";
	CompletableFuture<String> f1 = CompletableFuture.completedFuture(original).thenApplyAsync(s -> {
		try {
			int sleep = ThreadLocalRandom.current().nextInt(5000);
			System.out.println("f1: " + sleep);
			TimeUnit.MILLISECONDS.sleep(sleep);
			System.out.println("f1 ok");
		} catch (Exception e) {
			e.printStackTrace();
		}
		return s.toUpperCase();
	});
	
	CompletableFuture<String> f2 = CompletableFuture.completedFuture(original).thenApplyAsync(s -> {
		try {
			int sleep = ThreadLocalRandom.current().nextInt(5000);
			System.out.println("f2: " + sleep);
			TimeUnit.MILLISECONDS.sleep(sleep);
			System.out.println("f2 ok");
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		int i = 1/0;
		return s.toLowerCase();
	});
	
//	    CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s + " from applyToEither");
//	    System.out.println(f3.join());
	
//	    CompletableFuture<Void> f3 = f1.runAfterBoth(f2, () -> {
//	    	System.out.println("all ok");
//	    });
//	    f3.join();
	
//	    CompletableFuture<Void> f3 = f1.runAfterEither(f2, () -> {
//	    	System.out.println("one ok");
//	    });
//	    f3.join();
}

 

范例七:

/**
 * handle: 任务完成后或者抛出异常时,调用handle方法处理结果
 * exceptionally: 捕获异常
 */
public static void test8(){
	CompletableFuture<String> f = CompletableFuture.completedFuture("Massges").thenApplyAsync(s -> {
		try {
			int sleep = ThreadLocalRandom.current().nextInt(3000);
			System.out.println("f: " + sleep);
			TimeUnit.MILLISECONDS.sleep(sleep);
			System.out.println("f ok");
		} catch (Exception ex) {
			ex.printStackTrace();
		}
		
		int i = 1/0;
		return s.toLowerCase();
	});
	
	CompletableFuture<String> f3 = f.handle((r, ex) -> {
		System.out.println(r + ", " + ex);
		int i = 1/0;
		return r;
	}).exceptionally(s -> {
		System.out.println("error: " + s);
		return null;
	});
	System.out.println(f3.join());
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics