Akka学习(五)消息传递的方式-创新互联
- 一 消息传递方式
- 1.1 消息不可变
- 1.2 ASK消息模式
- 1.3 Tell消息模式
- 1.4 Forward消息模式
- 1.4 Pipe消息模式
有4种核心的Actor消息模式:tell、ask、forward和pipe。我们已经了解过tell和ask,不过sender()都不是Actor。创新互联建站是专业的爱民网站建设公司,爱民接单;提供网站建设、成都做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行爱民网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!一 消息传递方式
在这里,将从Actor之间发送消息的角度来介绍所有关于消息传递的概念。
● Ask:向Actor发送一条消息,返回一个Future。当Actor返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
● Tell。向Actor发送一条消息。所有发送至sender()的响应都会返回给发送消息的Actor。● Forward:将接收到的消息再发送给另一个Actor。所有发送至sender()的响应都会返回给原始消息的发送者。
● Pipe:用于将Future的结果返回给sender()或另一个Actor。如果正在使用Ask或是处理一个Future,那么使用Pipe可以正确地返回Future的结果。
- 前面提到过,消息应该是不可变的,由于Akka基于JVM,而Java和Scala都支持可变类型,所以是有可能发送可变消息的。
- 不过如果这么做,就可能会失去Akka在消除共享状态方面提供的诸多益处,一旦有了可变消息,就引入了一种风险:开发者可能会在某些时候开始修改消息,而他们修改消息的方式可能会破坏应用程序的运行。
- 当然,如果不修改消息的状态,那么要安全地使用可变消息也不是不可能,不过最好还是使用不可变消息,确保不会因为未来的变化而引入错误。
可变消息定义
// Java
public class Message {public StringBuffer mutableBuffer;
public Message(StringBuffer: mutableBuffer) {this.mutableBuffer = mutableBuffer;
}
}
// scala
class Message(var mutableBuffer: StringBuffer = new StringBuffer);
Message message = new Message(new StringBuffer("original"));
message.mutableBuffer = new StringBuffer("new");
val message = new Message(new StringBuffer("original"))
message.mutableBuffer = new StringBuffer("new")
这个例子新建了一条消息,然后修改mutableBuffer引用,使之指向另一个新建的StringBuffer,消息创建时传入的是StringBuffer (“original”),后来被修改成了StringBuffer(“new”)。这就是通过修改引用来修改消息的方法。
消息不可变
public class Message {// final
public final StringBuffer mutableBuffer;
Message(StringBuffer mutableBuffer) {this.mutableBuffer = mutableBuffer;
}
}
- 在Scala中,如果没有在声明中给出任何访问修饰符,成员变量的引用默认就是不可变的val。
- 现在我们Java中加入Final关键字,可以使引用设置为不可变,但是会出现问题,StringBuffer是可变的,因此可以修改修改StringBuffer对象本身。
public class ImmutableMessage{public final String immutableType;
public ImmutableMessage(String immutableType) {this.immutableType = immutableType;
}
}
class ImmutableMessage(immutableType: String)
- 由于String是不可变类型,因此可以通过使用String代替StringBuffer使得消息不可变。
总结
理解不可变性不仅仅对于Akka消息是至关重要的,对于如何进行安全的通用并发编程也是必不可少的,因此,无论什么时候,只要需要在线程之间共享数据,就应该首先考虑将数据定义为不可变。
1.2 ASK消息模式![epub_22651331_27.jpg](https://img-blog.csdnimg.cn/img_convert/2a7e44802270d65d74f4b6588fff8522.jpeg
- 在调用ask向Actor发起请求时,Akka实际上会在Actor系统中创建一个临时Actor。
- 接收请求的Actor在返回响应时使用的sender()引用就是这个临时Actor。
- 当一个Actor接收到ask请求发来的消息并返回响应时,这个临时Actor会使用返回的响应来完成Future。
- 因为sender()引用就指向临时Actor的路径,所以Akka知道要用哪个消息来完成Future。
要求
Ask模式要求定义一个超时参数,如果对方没有在超时参数限定的时间内返回这个ask的响应,那么Future就会返回失败。ask/?方法要求提供的超时参数可以是长整型的毫秒数,也可以是akka.util.Timeout,这种类型提供了更丰富的时间表达方式。
![epub_22651331_29.jpg](https://img-blog.csdnimg.cn/img_convert/50dcf9f3caa3978e58df33c9ff7b4690.jpeg#averageHue=#f0f0f0&clientId=uac482df2-4a9a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=278&id=ue3790d29&margin=[object Object]&name=epub_22651331_29.jpg&originHeight=347&originWidth=1198&originalType=binary&ratio=1&rotation=0&showTitle=false&size=44177&status=done&style=none&taskId=uc3ef683b-00ce-4632-86a0-8a7a71984e3&title=&width=958.4)
- Java模式
static import akka.pattern.Patterns.ask;
Timeout timeout = new akka.util.Timeout(
1,
java.util.concurrent.TimeUnit.SECONDS
);
Future future = ask(actor, message, timeout);
- Scala
import scala.concurrent.duration._
import akka.pattern.ask
// 设置超时时间
implicit val timeout = akka.util.Timeout(1 second)
val future = actorRef ? "message"
案例:请参考前面的案例3
缺点
Ask模式看上去很简单,不过它是有隐藏的额外性能开销的,首先,ask会导致Akka在/temp路径下新建一个临时Actor。
这个临时Actor会等待从接收ask消息的Actor返回的响应,其次,Future也有额外的性能开销。
Ask会创建Future,由临时Actor负责完成,这个开销并不大,但是如果需要非常高频地执行ask操作,那么还是要将这一开销考虑在内的。
Ask很简单,不过考虑到性能,使用tell是更高效的解决方案。
- Tell是ActorRef/ActorSelection类的一个方法,它也可以接受一个响应地址作为参数,接收消息的Actor中的sender()其实就是这个响应地址。
- 在Scala中,默认情况下,sender会被隐式定义为发送消息的Actor,如果没有sender(如在Actor外部发起请求),那么响应地址不会默认设置为任何邮箱(叫做DeadLetters)。
- Tell在语义上是用于将一条消息发送至另一个Actor,并将响应地址设置为当前的Actor。而Forward和邮件转发非常类似:初始发送者保持不变,只不过新增了一个收件人。
- 在使用tell时,我们指定了一个响应地址,或是将响应地址隐式设为发送消息的Actor。而使用forward传递消息时,响应地址就是原始消息的发送者
- 有时候我们需要将接受到的消息传递给另一个Actor来处理,而最终的处理结果需要传回给初始发起请求的一方。
//Java
actor.forward(result, getContext());
//Scala
actor forward message
1.4 Pipe消息模式很多情况下,需要将Actor中的某个Future返回给请求发送者。上文介绍过sender()是一个方法,所以要在Future的回调函数中访问sender(),我们必须存储一个指向sender()的引用:
//Java
final ActorRef senderRef = sender();
future.map(x ->{senderRef.tell(x, ActorRef.noSender())});
//Scala
val senderRef = sender();
future.map(x =>senderRef ! x);(原文为future.map(x =>senderRef ! ActorRef.noSender),有误。)
//Java
pipe(future, system.dispatcher()).to(sender());
//Scala
future pipeTo sender()
pipe(future) to sender()
pipe接受Future的结果作为参数,然后将其传递给所提供的Actor引用。在上面的例子中,因为sender()执行在当前线程上,所以我们可以直接调用sender(),而不用干一些奇怪的事情(比如把sender()引用存储在一个变量中),执行结果一切正确。这就好多了!
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
本文名称:Akka学习(五)消息传递的方式-创新互联
网站地址:http://ybzwz.com/article/dooois.html