akka 编程入门 - 同步请求

发表于2019-11-20,长度2579, 182个单词, 7分钟读完
Flag Counter

akka是专门为分布式环境而生的,所以基本上所有的请求都是无等待的(wait-free)的。不过场景所限,一定还需要同步等待的逻辑。这里讲一下akka中的同步处理能力。

本文代码存放在:https://gitee.com/somefuture/akka-java-101/tree/master/manxi-ask

一、主线程阻塞

actor提供给我们tell方法给其他actor(当然也可以是自己)发消息,消息到达对方信箱就返回,主线程继续执行。如果希望得到对方的回信该怎么办?

前面说过可以使用ask方法。其实ask方法并非actor提供的,我们来看一个例子:

class AnswerActor extends AbstractActor {
    public AnswerActor(String name) {
        // 仅提供带actor name参数的构造器
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Integer.class, i -> {
                    Thread.sleep(2000);
                    getSender().tell(i * i + i, getSelf());
                })
                .build();
    }

}

先创建一个actor,它接收一个整数,并经过“长时间”的运算后通知sender计算结果。

然后我们创建一个actor给他发消息:

import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.Future;

public class AskTest extends AbstractActor {

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .matchEquals("", m -> {
                    // 你也可以使用matchAny
                    ActorRef answer = getContext().system().actorOf(Props.create(AnswerActor.class, "answer009"));
                    Future<Object> ask = Patterns.ask(answer, 3, 2000);
                    Object result = Await.result(ask, Timeout.create(Duration.ofMillis(2100)).duration());
                    System.out.println(result);
                })
                .build();
    }
}

逻辑很简单,你能看到askActor给answerActor发送了消息3,并设置2秒超时。这个会立即结束,并执行下一句。下一句我们使用了Await来等待结果并打印出来。

二、回调

这种方式有点low,在结果返回前或超时前,我们会一直等待。但是即使使用Java,我们也可以选择completableFuture。所以akka也提供了类似的功能:当结果返回后通知另一个actor进行处理:

class WorkerActor extends AbstractActor {
    public WorkerActor(String s) {
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .matchAny(i -> {
                    System.out.println(i);
                })
                .build();
    }

}

我们先创建了一个workerActor用于处理返回结果。在主线程中,我们将结果通知它:

@Override
public Receive createReceive() {
    return receiveBuilder()
            .matchEquals("", m -> {
                ActorRef answer = getContext().system().actorOf(Props.create(AnswerActor.class, "answer"));
                Future<Object> ask = Patterns.ask(answer, 3, 2000);

                ActorRef worker = getContext().system().actorOf(Props.create(WorkerActor.class, "worker"));
                Patterns.pipe(ask, getContext().dispatcher()).to(worker);

                System.out.println("这里会立即执行");
            })
            .build();
}

这样,answerActor返回的12会由workerActor打印出来。而主线程已经执行完毕。


更多例子代码:https://gitee.com/somefuture/akka-java-101

Written on November 20, 2019
分类: dev, 标签: java akka
如果你喜欢,请赞赏! davelet