`
xiefeifeihu
  • 浏览: 99255 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Akka2使用探索6(Futures)——实现并发和异步

阅读更多

 

Future用来获取某个并发操作的结果,这个结果可以同步(阻塞)或异步(非阻塞)的方式访问。

 

执行上下文

Future 需要一个ExecutionContext, 它与java.util.concurrent.Executor 很相像. 如果你在作用域内有一个 ActorSystem , 它可以用system.dispatcher()作 ExecutionContext。你也可以用ExecutionContext 伴生对象提供的工厂方法来将 ExecutorsExecutorServices 进行包裹, 或者甚至创建自己的实例.

   //执行上下文可以自己指定线程池类型
        ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool())
Future的创建方法
        Future<String> f1 = Futures.successful("f1", ec);

        Future<String> f2 = Futures.future(new Callable() {
            @Override
            Object call() {
                return "f2"
            }
        }, ec)
        Future<String> f3 = Futures.successful("f3", ActorSystem.create("test").dispatcher());
    //使用actor的ask方法发送消息是也能创建一个Future
    Futuref4 = akka.pattern.Patterns.ask(actor, "msg", 1000 * 60)
函数式 Future
Akka 的 Future 有一些与Scala集合所使用的非常相似的 monadic 方法. 这使你可以构造出结果可以传递的‘管道’ 或 ‘数据流’.

map(对未来返回的结果进行处理)

Future以函数式风格工作的第一个方法是 map. 它需要一个函数来对Future的结果进行处理, 返回一个新的结果。map 方法的返回值是包含新结果的另一个 Future:

static void map() throws Exception {
        ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        Future<String> f1 = Futures.successful("fof1o", ec);
        //map的作用是:对Futrue:f1的返回结果进行处理,返回一个新的结果
        Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
            public Integer apply(String s) {
                return s.length();
            }
        });  //这里对未来f1返回的字符串计算其长度

    对Future完成结果的处理方法
    //System.out.println(Await.result(f2, Duration.create(5, "s")));     //阻塞式,当前线程在此等待

        //下面是非阻塞式,异步返回
        f2.onComplete(new OnComplete<Integer>() {
            @Override
            public void onComplete(Throwable failure, Integer success) {
                System.out.println("f2返回结果:" + success + ",failure:" + failure);
            }
        });

        f2.onSuccess(new OnSuccess<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                System.out.println("f2返回结果:" + result);
            }
        });

        f2.onFailure(new OnFailure() {
            @Override
            public void onFailure(Throwable failure) {
                System.out.println("f2返回failure:" + failure);
            }
        });
    }

 

flatMap(对多个Future返回的结果进行处理)

static void flatMap() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        Future<String> f1 = Futures.successful("hello", ec);

        //
        Future<Integer> fr = f1.flatMap(new Mapper<String, Future<Integer>>() {
            public Future<Integer> apply(final String s) {
                return Futures.future(new Callable<Integer>() {
                    public Integer call() {
                        return s.length();
                    }
                }, ec);
            }
        });  //
        System.out.println(Await.result(fr, Duration.create(5, "s")));     //阻塞式
    }

    //对两个Future的结果处理
    static void flatMap_concat2() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        final Future<String> f1 = Futures.successful("hello", ec);
        final Future<String> f2 = Futures.successful("world", ec);

        //如果要对多个Future的结果进行处理,需要用flatMap
        //本例中对f1和f2返回的结果用空格连接成“hello world”
        Future<String> fr = f1.flatMap(new Mapper<String, Future<String>>() {
            public Future<String> apply(final String s) {
                return f2.map(new Mapper<String, String>() {
                    @Override
                    public String apply(String v) {
                        return s + " " + v;
                    }
                });
            }
        });
        System.out.println(Await.result(fr, Duration.create(5, "s")));     //阻塞式
    }

    //对三个Future的结果处理
    static void flatMap_concat3() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        final Future<String> f1 = Futures.successful("How", ec);
        final Future<String> f2 = Futures.successful("are", ec);
        final Future<String> f3 = Futures.successful("you", ec);

        //如果要对多个Future的结果进行处理,需要用flatMap
        //本例中对f1、f2、f3返回的结果用空格连接成“How are you”
        Future<String> fr = f1.flatMap(new Mapper<String, Future<String>>() {
            public Future<String> apply(final String s) {
                return f2.flatMap(new Mapper<String, Future<String>>() {
                    @Override
                    public Future<String> apply(final String s2) {
                        return f3.map(new Mapper<String, String>() {
                            @Override
                            public String apply(String s3) {
                                return s + " " + s2 + " " + s3;
                            }
                        });
                    }
                });
            }
        });  /*用java写比较繁琐,用scala的话就简单多了
        val future1 = for {
            a: String <- actor ? "How" // returns How
            b: String <- actor ? "are" // returns "are"
            c: String <- actor ? "you" // returns "you"
        } yield a + " " + b + "" + c*/
        System.out.println(Await.result(fr, Duration.create(5, "s")));     //阻塞式
    }

filter(对Future进行条件筛选)

static void filter() throws Exception {
        ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        Future<String> f1 = Futures.successful("fof1o", ec);
        Future<String> f2 = Futures.successful("fo", ec);
        //map的作用是:对Futrue:f1的返回结果进行处理,返回一个新的结果
        Future<String> fs = f1.filter(Filter.filterOf(new Function<String, Boolean>() {
            @Override
            public Boolean apply(String param) {
                return param.length() == 5;
            }
        }));
        System.out.println(Await.result(fs, Duration.create(5, "s")));

        Future<String> ff = f2.filter(Filter.filterOf(new Function<String, Boolean>() {
            @Override
            public Boolean apply(String param) {
                return param.length() == 5;
            }
        }));  //不匹配的话会抛scala.MatchError异常
        System.out.println(Await.result(ff, Duration.create(5, "s")));
    }

 


组合Futures

如果Future的数目较多,用flatMap组合的话代码就过于复杂。可以使用sequencetraverse。

sequence(将 T[Future[A]] 转换为 Future[T[A]])

public static void sequence() throws Exception {
        //将 T[Future[A]] 转换为 Future[T[A]]
        //简化了用flatMap来组合
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        final Future<String> f1 = Futures.successful("How", ec);
        final Future<String> f2 = Futures.successful("are", ec);
        final Future<String> f3 = Futures.successful("you", ec);

        List<Future<String>> futureList = new ArrayList<Future<String>>();
        futureList.add(f1);
        futureList.add(f2);
        futureList.add(f3);

        //这里将List<Future<String>> 组合成一个Future:Future<Iterable<String>>
        Future<Iterable<String>> future = Futures.sequence(futureList, ec);

        Future<String> fr = future.map(new Mapper<Iterable<String>, String>() {
            @Override
            public String apply(Iterable<String> parameter) {
                String result = "";
                for (String s : parameter) {
                    result += s + " ";
                }
                return result;
            }
        });
        System.out.println(Await.result(fr, Duration.create(5, "s")));
    }


traverse(将 T[A] 转换为 Future[T[A]])

public static void traverse() throws Exception {
        //将 T[A] 转换为 Future[T[A]]
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());


        Iterable<String> list = Arrays.asList("How", "are", "you");

        //这里将List<String> 组合成一个Future:Future<Iterable<String>> ,对list中的每个元素做加工处理
        Future<Iterable<String>> future = Futures.traverse(list, new Function<String, Future<String>>() {
            @Override
            public Future<String> apply(final String param) {
                return Futures.future(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        return param.toUpperCase();
                    }
                }, ec);
            }
        }, ec);

        Future<String> fr = future.map(new Mapper<Iterable<String>, String>() {
            @Override
            public String apply(Iterable<String> parameter) {
                String result = "";
                for (String s : parameter) {
                    result += s + " ";
                }
                return result;
            }
        });

        System.out.println(Await.result(fr, Duration.create(5, "s")));
    }


fold(从一个初始值开始递归地对Future序列进行处理(它将sequence和map操作合并成一步了))

public static void fold() throws Exception {
        //fold从一个初始值开始递归地对Future序列进行处理(它将sequence和map操作合并成一步了)
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        final Future<String> f1 = Futures.successful("How", ec);
        final Future<String> f2 = Futures.successful("are", ec);
        final Future<String> f3 = Futures.successful("you", ec);

        List<Future<String>> futureList = new ArrayList<Future<String>>();
        futureList.add(f1);
        futureList.add(f2);
        futureList.add(f3);

        //本例从初始值“Init”开始,递归地对futureList的返回值用"_"连接,返回“Init_How_are_you”
        Future<String> fr = Futures.fold("Init", futureList, new Function2<String, String, String>() {
            @Override
            public String apply(String arg1, String arg2) {
                System.out.println("arg1----" + arg1);    //第一次为Init,第二次为Init_How ,第三次为Init_How_are
                System.out.println("arg2----" + arg2);    //第一次为How ,第二次为are        第三次为you
                return arg1 + "_" + arg2;
            }
        }, ec);

        //如果futureList为空列表,则返回初始值“Init”
        System.out.println(Await.result(fr, Duration.create(5, "s")));  //结果为Init_How_are_you
    }
reduce(如果不想从给定的初始值开始递归,而想从future序列的第一个开始,则用reduce(它将sequence和map合并成一步了))
public static void reduce() throws Exception {
        //如果不想从给定的初始值开始递归,而想从future序列的第一个开始,则用reduce(它将sequence和map合并成一步了)
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        final Future<String> f1 = Futures.successful("How", ec);
        final Future<String> f2 = Futures.successful("are", ec);
        final Future<String> f3 = Futures.successful("you", ec);

        List<Future<String>> futureList = new ArrayList<Future<String>>();
        futureList.add(f1);
        futureList.add(f2);
        futureList.add(f3);

        //本例从初始值“How”开始,递归地对futureList的返回值用"_"连接,返回“How_are_you”
        Future<String> fr = Futures.reduce(futureList, new Function2<String, String, String>() {
            @Override
            public String apply(String arg1, String arg2) {
                System.out.println("arg1----" + arg1);    //第一次为How ,第二次为How_are
                System.out.println("arg2----" + arg2);    //第一次为are ,第二次为you
                return arg1 + "_" + arg2;
            }
        }, ec);

        //如果futureList为空列表,则返回初始值“Init”
        System.out.println(Await.result(fr, Duration.create(5, "s")));  //结果为Init_How_are_you
    }

 

andThen(由于回调的执行是无序的,而且可能是并发执行的, 当你需要一组有序操作的时候需要一些技巧。)

public static void andThen() throws Exception {
        //如果要对Future的结果分多次依次处理,需要使用andThen
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());

        Future<String> future = Futures.successful("hello", ec).andThen(new OnComplete<String>() {
            @Override
            public void onComplete(Throwable failure, String success) {
                System.out.println("先收到:" + success);
            }
        }).andThen(new OnComplete<String>() {
            @Override
            public void onComplete(Throwable failure, String success) {
                System.out.println("又收到:" + success);
            }
        }).andThen(new OnSuccess<Either<Throwable, String>>() {
            @Override
            public void onSuccess(Either<Throwable, String> result) {
                System.out.println("收到onSuccess:" + result);
            }
        });
    }


fallbackTo(将两个 Futures 合并成一个新的 Future, 如果第一个Future失败了,它将持有第二个 Future 的成功值)

public static void fallbackTo() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        Future<String> f1 = Futures.failed(new RuntimeException("ex1"), ec);
        Future<String> f2 = Futures.failed(new RuntimeException("ex2"), ec);
        Future<String> f3 = Futures.successful("ok", ec);
        //fallbackTo 将两个 Futures 合并成一个新的 Future, 如果第一个Future失败了,它将持有第二个 Future 的成功值
        Future<String> fr = f1.fallbackTo(f2).fallbackTo(f3);

        System.out.println(Await.result(fr, Duration.create(5, "s")));
    }

 

zip(操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果)

public static void zip() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        Future<String> f1 = Futures.future(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("f1---" + Thread.currentThread().getName());
                Thread.sleep(1000 * 10);
                return "hello";
            }
        }, ec);
        Future<String> f2 = Futures.future(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("f2---" + Thread.currentThread().getName());
                Thread.sleep(1000 * 5);
                return "world";
            }
        }, ec);
        //zip操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果
        Future<String> fr = f1.zip(f2).map(new Mapper<Tuple2<String, String>, String>() {
            @Override
            public String apply(Tuple2<String, String> ziped) {
                System.out.println("zip---" + Thread.currentThread().getName());
                return ziped._1() + " " + ziped._2();   //f1和f2的返回结果包含在zipped对象中
            }
        });
        System.out.println("主线程----" + Thread.currentThread().getName());
        System.out.println(Await.result(fr, Duration.create(15, "s")));
    }

    public static void zip2() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        Future<String> f1 = Futures.successful("hello", ec);
        Future<String> f2 = Futures.future(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("f2---" + Thread.currentThread().getName());
                Thread.sleep(1000 * 10);
                return (1 / 0) + "";
            }
        }, ec);
        //zip操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果
        Future<String> fr = f1.zip(f2).map(new Mapper<Tuple2<String, String>, String>() {
            @Override
            public String apply(Tuple2<String, String> ziped) {
                System.out.println("zip----" + Thread.currentThread().getName());
                return ziped._1() + " " + ziped._2();   //f1和f2的返回结果包含在zipped对象中
            }
        });

        System.out.println("主线程----" + Thread.currentThread().getName());
        System.out.println(Await.result(fr, Duration.create(15, "s")));
    }

 

recover(对Future的异常进行处理,相当于try..catch中对捕获异常后的处理)

public static void recover() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        //recover对Future的异常进行处理,相当于try..catch中对捕获异常后的处理
        Future<Integer> future = Futures.future(new Callable<Integer>() {
            public Integer call() {
                return 1 / 0;
            }
        }, ec).recover(new Recover<Integer>() {
            public Integer recover(Throwable problem) throws Throwable {
                System.out.println("捕获到异常:" + problem);
//                if (problem instanceof RuntimeException) {
//                    return 0;
//                } else {
//                    throw problem;
//                }
                return -2;    //这里捕获到异常后直接返回新值了,并没有再抛出异常,所以后面的recover不会再收到异常
            }
        }).recover(new Recover<Integer>() {
            public Integer recover(Throwable problem) throws Throwable {
                System.out.println("捕获到异常:" + problem);
                if (problem instanceof ArithmeticException) {  //捕获异常并处理,捕获到后,后面得到的result将会是-1
                    return -1;
                } else {
                    throw problem;
                }
            }
        });
        int result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
        System.out.println("result----" + result);
    }

 

recoverWith(和recover很类似,只是捕获到异常后返回Future,使其能够异步并发处理)

public static void recoverWith() throws Exception {
        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
        //recoverWith和recover很类似,只是捕获到异常后返回Future,使其能够异步并发处理
        Future<Integer> future = Futures.future(new Callable<Integer>() {
            public Integer call() {
                return 1 / 0;
            }
        }, ec).recoverWith(new Recover<Future<Integer>>() {
            @Override
            public Future<Integer> recover(Throwable failure) throws Throwable {
                if (failure instanceof ArithmeticException) {
                    return Futures.future(new Callable<Integer>() {
                        @Override
                        public Integer call() throws Exception {
                            return 0;
                        }
                    }, ec);
                } else throw failure;

            }
        });
        int result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
        System.out.println("result----" + result);
    }


 

0
3
分享到:
评论

相关推荐

    Akka Scala 学习高清原版pdf

    Akka是一个用Scala和Java编写的库,用于构建并发、分布式以及容错的事件驱动应用。Akka框架借鉴了Erlang的并发模型,但它是建立在JVM之上,并且提供了丰富的抽象和工具,能够简化开发工作。 标题“Akka Scala 学习...

    akka的教学

    Futures 和 Agents 是 Akka 中用于处理异步计算和状态管理的重要概念。 ##### 5.1 Futures Futures 代表了一个可能在未来完成的计算结果。本节介绍了 Futures 的概念、创建方法以及如何处理 Futures 的结果。 ####...

    AkkaJava.pdf

    文档接下来会介绍 Futures 和 Agents,这些是Akka处理异步编程和并发操作的工具。Futures用于表示可能尚未完成的计算结果,而Agents提供了一种管理共享可变状态的方法。 在网络部分,文档将讲解Akka如何支持集群...

    AKKA (java) 精讲

    深入探讨 Actor 的概念和实现细节,包括: - Actor 的生命周期。 - 如何创建和销毁 Actor。 - 如何定义 Actor 的行为。 ##### 3.2 Typed Actors Typed Actors 是一种更高级的 Actor 实现方式,它使用类型安全的 API...

    Akka Java文档

    Akka 提供了一种易于理解和使用的模型来处理现代软件系统中的高并发需求。随着硬件的发展,多核处理器已经成为标准配置,因此如何有效地利用这些核心成为了一个重要的问题。Akka 通过 Actor 模型来实现这一点,Actor...

    akka 2.0 文档

    Futures 是 Akka 提供的一种异步编程模型,可以帮助开发者编写简洁高效的并发代码。这部分内容介绍了如何使用 Futures 来处理异步结果。 **4.7 Dataflow Concurrency (Scala)** Dataflow 模型是一种用于处理数据流...

    Scala并发编程程.rar

    在Scala中,并发主要依赖于两个核心概念:Actor模型和 Futures。Actor模型是Erlang语言引入的概念,Scala对其进行了进一步的发展。Actor是一个独立运行的单元,它可以接收消息、处理消息并发送新消息。每个Actor都有...

    akka java document

    ### Akka Java 文档知识点概览 #### 一、引言 **1.1 Akka 是什么?** Akka 是一个面向并发、分布式系统开发的...- **I/O 层设计实践:** 如何设计和实现 I/O 层。 **10.4 开发者指南** - **编码规范:** 介绍 Akka ...

    akka-2.3.14.doc

    这部分介绍了如何使用Futures和Agents来处理异步操作和状态更新。 #### 6. Networking **6.1 Cluster Specification** 这部分介绍了如何使用Akka Cluster来构建分布式系统。包括集群的配置、成员发现等。 **6.2 ...

    Akka官方开发文档

    这部分会讲解Akka中用于异步编程的工具,如Future和Agent,它们是处理并发任务和状态共享的有力工具。 5.1网络集群规范(Networking Cluster Specification) 集群规范定义了如何建立和管理分布式Akka集群,以及...

    Akka Scala Documentation Release 2.4

    此外,Akka还支持Futures和Promises,使得异步编程变得更加简单和直观。 其次,Akka的路由机制是其强大功能的一部分。它可以将消息智能地分发到一组子Actor中,例如广播、轮询或根据特定规则选择接收者。这种路由...

    Akka Concurrency Framework

    - **高并发性**:Akka利用非阻塞I/O和异步处理技术,能够支持大量的并发操作。 - **分布式计算**:Akka允许轻松地将应用程序部署到多台机器上,从而实现分布式计算。 - **容错性**:通过内置的监督和故障恢复机制,...

    Akka Scala文档

    Akka 是一个面向并发、分布式系统开发的工具包,它基于 Actor 模型并支持 Scala 和 Java 语言。Akka 的设计目标是简化并发编程,并通过 Actor 模型提供高度容错性和弹性。 ##### 1.2 为什么选择 Akka? - **高并发...

    Akka说明文档

    Akka提供的Actor模型能够很好地解决这些问题,它通过异步消息传递实现Actor间的通信,使得系统能够充分利用多核处理器的能力,同时避免了线程间的竞争条件和死锁问题。此外,Akka还提供了高度的容错性和弹性,能够在...

    Learning Concurrent Programming in Scala

    通过阅读本书,开发者将能够熟练地利用Scala的并发特性来设计和实现高效的并发应用程序,同时也能应对并发编程中可能出现的挑战,如死锁、竞态条件和资源争用等问题。这是一本对Scala并发感兴趣的开发者不可多得的...

    PDF-ScalaCookbook-英文版.rar

    书中会介绍如何使用Akka来处理高并发和容错问题。 通过阅读《Scala Cookbook》,开发者不仅可以掌握Scala的基本用法,还能学习到高级特性的应用,从而在实际开发中发挥Scala的最大潜力。这本书适合有一定编程经验,...

Global site tag (gtag.js) - Google Analytics