Akka
Table of Contents
1 Actors
import akka.actor.{ ActorRef, ActorSystem, Props, Actor, Inbox } import scala.concurrent.duration._ // Three messages, one is object, two are case classes case object Greet case class WhoToGreet(who: String) case class Greeting(message: String) class Greeter extends Actor { var greeting = "" def receive = { case WhoToGreet(who) => greeting = s"hello, $who" case Greet => sender ! Greeting(greeting) // Send the current greeting back to the sender } } object HelloAkkaScala extends App { // Create the 'helloakka' actor system val system = ActorSystem("helloakka") // Create the 'greeter' actor val greeter = system.actorOf(Props[Greeter], "greeter") // Create an "actor-in-a-box" val inbox = Inbox.create(system) // Tell the 'greeter' to change its 'greeting' message greeter.tell(WhoToGreet("akka"), ActorRef.noSender) // Ask the 'greeter for the latest 'greeting' // Reply should go to the "actor-in-a-box" inbox.send(greeter, Greet) // Wait 5 seconds for the reply with the 'greeting' message val Greeting(message1) = inbox.receive(5.seconds) println(s"Greeting: $message1") // Change the greeting and ask for it again greeter.tell(WhoToGreet("typesafe"), ActorRef.noSender) inbox.send(greeter, Greet) val Greeting(message2) = inbox.receive(5.seconds) println(s"Greeting: $message2") val greetPrinter = system.actorOf(Props[GreetPrinter]) // after zero seconds, send a Greet message every second to the greeter with a sender of the greetPrinter system.scheduler.schedule(0.seconds, 1.second, greeter, Greet)(system.dispatcher, greetPrinter) } // prints a greeting class GreetPrinter extends Actor { def receive = { case Greeting(message) => println(message) } }
- Sending messages
actor ! message
oractor.tell(message, sender)
.inbox.send(sender, message)
system.scheduler.schedule(initDelay, interval, receiver, message)(executor, sender)
actor ? message
, it means “ask”, which resturns aFuture
representing a possible reply. When you implement ask function, you have to catch the exception and send a Failure to the sendertry { val result = operation() sender() ! result } catch { case e: Exception => sender() ! akka.actor.Status.Failure(e) throw e }
You can set a timeout period for the
Future
by implicit valueimplicit val timeout = Timeout(5 seconds)
or using curry functionmyActor.ask(msg)(5 seconds)
.
- Receiving messages
receive
defined method in actor will be called when message comes. Attention:, thesender()
method returns the current sender when the method is called, DO NOT use this function in callbacks since the sender method may returns wrong sender (deadletter in many cases) when the callback is actually invoked.inbox.receive
- Set timeout period for receiving method,
context.setReceiveTimeout(30 milliseconds)
. Then you can catch this event by receiving messagecase ReceiveTimeout
.
- Forward message
target forward message
, it will keep the sender unchanged. A send msg to B, B forward msg to C, the sender for C is still A instead of B - Stop actor
akka.actor.PoisonPill
message will stop the actor.gracefulStop
is useful to wait for termination or compose ordered termination of several actors. It is a pattern, not precoded, it sends a user defined message to the actor and returns aFuture
, you canAwait
this future for a duration.actor ! Kill
will also stop this actor, but it causes the actor to throw aActorKilledException
. This actor will suspend and like the supervisor to decide how to handle the failure. - Become/Unbecome
context.become
method in Actor can setting the receive function, hence the behavior of Actor is changed.unbecome
lets the actor returns to previews behavior.You can use
stash
andunstash
to save mails into a queue in one behavior and get them back in another behavior. Of course, you have to mix the traitStash
to be able to use it. - Extend actor and intialization patterns
Use
PartialFunction#orElse
to chain thereceive
function of the extended actor.Three init patterns
- constructor
- preStart
- Message
1.1 Falt toleration
Example code from document.
Counter
, increment and save count to storage, return current count numberStorage
, simulate database, store (String, Long) pairs.CounterService
, override supervisorStrategy byOneForOneStrategy
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5.seconds) { case _: Storage.StorageException => Restart }
init with actor refererences to the wrapper (watcher) of storage, tell counter (when initialized?) to use the storage.
forward message (increment, getCurrentCount) to counter if it exists; save
sender->msg
pair to log if counter is not available.if storage is terminated, tell counter that storage is not available, schedule task to initStorage again (reconnect).
Worker
, stop when receiving message from CounterService. initCounterService
actor.Listener
, shutdown the system while timeout. ListenProgress
, which is a case class ofWorker
.- App object init worker and listener. Tell worker to assign the
listener, which will schedule task to use the
CounterService
to increment count and pipe the message to listener.
1.2 Persistent Actor
2 Cluster
2.1 Simple Cluster
2.1.1 Configuration
To run cluster, you have to first creat a configuration file that specify the URI, actors, etc.
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] auto-down-unreachable-after = 10s } }
2.1.2 Code
package sample.cluster.simple import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem import akka.actor.Props object SimpleClusterApp { def main(args: Array[String]): Unit = { if (args.isEmpty) startup(Seq("2551", "2552", "0")) else startup(args) } def startup(ports: Seq[String]): Unit = { ports foreach { port => // Override the configuration of the port val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). withFallback(ConfigFactory.load()) // Create an Akka system val system = ActorSystem("ClusterSystem", config) // One system on one port? // Create an actor that handles cluster domain events system.actorOf(Props[SimpleClusterListener], name = "clusterListener") } } } class SimpleClusterListener extends Actor with ActorLogging { val cluster = Cluster(context.system) // subscribe to cluster changes, re-subscribe when restart override def preStart(): Unit = { //#subscribe cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) //#subscribe } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Member is Up: {}", member.address) case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member) case MemberRemoved(member, previousStatus) => log.info("Member is Removed: {} after {}", member.address, previousStatus) case _: MemberEvent => // ignore } }
2.1.3 Running
Log when starting the second seed node: [info] [INFO] [03/10/2015 23:57:27.799] [ClusterSystem-akka.actor.default-dispatcher-14] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
This code defines a cluster (集群), each node runs as dependennt
processes (different JVMs) since they all run on my local
machine. A cluster is indentified by akka://CllusterSystem
,
while each node is indentified by IP address and port number. The
IP address seems like to be defined by application.config.
- seed nodes are configured contact points for initial,
automatic, join of the cluster. Hence, the
seed-nodes
figure out where to join the cluster to current node. The seed nodes can also be configured by command line options when starting the JVM using the following syntax:-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552 -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host1:2552
Starting seed notes can be async and not all the seed nodes are necessary to run. But the first node must be started when starting a cluster.
It is also possible to join the cluster manually via JMX or Command Line Management or via programatic method with
Cluster.get(system).join(address)
orCluster(system).joinSeedNodes
. hostname
inremote
in configuration specify the IP of current node. Question: /what if I specify the IP to another machine? The application should be deploy to another machine before running? What protocole does it uses?/
2.2 Dial-in example
Frontend-backend pattern (not those in Group router):
- Backed subscribes to the cluster service to register new frontend
- Frontend watch the backend and define the
receive
method for the case ofTerminated(a)
. - Frontend redirect received jobs to backend.
- Roles of node is defined in the configuration property named
akka.cluster.roles
or in start script as a system property or environment variable.
Number of members restriction:
akka.cluster.min-nr-of-members = 3
, the leader will not
change the status of nodes to ‘Up’ until this number of
members reached. Finer configuration:
#+beign_src yaml akka.cluster.role { frontend.min-nr-of-members = 1 backend.min-nr-of-members = 2 }
#+end_src Define callback to be invoked when the current member status is changed to ‘Up’
Cluster(system) registerOnMemberUp { system.actorOf(Props(classOf[FactorialFrontend], upToN, true), name = "factorialFrontend") }
2.3 Cluster Aware Routers
Router is one that receive and send tasks to actors, while the ref to actors are called routees.
2.3.1 Group router
在示例代码中,每一个node上面都有一个StatsService,和一个 StatsWorker. StatsService里面包含了指向StatsWorker的router并且负 责接受和转发任务. 每次Client将一个任务传给Cluster时,Cluster中某 个一node上的StatsService会接收并分发任务,因为分发任务是依靠 router来实现的,所以任务会被分发到不同node上的StatsWorker上去.这 个例子中,每个node上的Service和Worker都是显式创建的.
Group example: Server side
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing-group
// specify the router, you have also other options like round-robin-pool
nr-of-instances = 100
routees.paths = ["/user/statsWorker"]
cluster {
enabled = on
allow-local-routees = on
use-role = compute
}
}
}
With this configuration, relative actor paths defined in
routees.paths
are used for selecting actors to send the message
by the router.
在创建System的时候将这个配置文件包含到配置中去, 然后在这个system
下面任意地方用 actorOf(FromConfig.props(Props[YourClass]))
来
得到router actor. 发送任务时将任务发给router actor,它会自动转发
给cluster中的注册在 “user” 下面的注册名为 “statsWorker” 的actor
(不一定是StatsWorker,你可以随意将不同类型的Actor注册在这个名字下
面). 更多的配置,查看routing.
下面的代码是用编程的方式实现配置文件里的配置.
val workerRouter = context.actorOf( ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings( totalInstances = 100, routeesPaths = List("/user/statsWorker"), allowLocalRoutees = true, useRole = Some("compute"))).props(), name = "workerRouter2")
Group example: Client side
object StatsSampleClient { def main(args: Array[String]): Unit = { // note that client is not a compute node, role not defined val system = ActorSystem("ClusterSystem") system.actorOf(Props(classOf[StatsSampleClient], "/user/statsService"), "client") } } class StatsSampleClient(servicePath: String) extends Actor { // servicePath is "/user/statsService", which is the second argument of the Props. val cluster = Cluster(context.system) val servicePathElements = servicePath match { case RelativeActorPath(elements) => { // println("servicePath is: "+servicePath) ; elements} case _ => throw new IllegalArgumentException( "servicePath [%s] is not a valid relative actor path" format servicePath) } import context.dispatcher val tickTask = context.system.scheduler.schedule(2.seconds, 10.seconds, self, "tick") var nodes = Set.empty[Address] override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent]) } override def postStop(): Unit = { cluster.unsubscribe(self) tickTask.cancel() } def receive = { case "tick" if nodes.nonEmpty => // just pick any one val address = nodes.toIndexedSeq(ThreadLocalRandom.current.nextInt(nodes.size)) val service = context.actorSelection(RootActorPath(address) / servicePathElements) service ! StatsJob("this is the text that will be analyzed") case result: StatsResult => println(result) case failed: JobFailed => println(failed) case state: CurrentClusterState => nodes = state.members.collect { case m if m.hasRole("compute") && m.status == MemberStatus.Up => m.address } case MemberUp(m) if m.hasRole("compute") => nodes += m.address case other: MemberEvent => nodes -= other.member.address case UnreachableMember(m) => nodes -= m.address case ReachableMember(m) if m.hasRole("compute") => nodes += m.address } }
而在客户端代码中,首先向cluster订阅成员事件来获取所有的成员. 然后
根据成员的role来筛选出相应的node. 真正开始发送任务的时候,随机排
列这些node,然后使用
context.actorSelection(RootActorPath(address) /
servicePathElements)
这句话返回一个 ActorSelection
的对象,对它
发送信息能自动地发送到某个注册在cluster上的某个node上的某个actor
上面去.准确地说,后半段确定Actor的URN,前半段应该是URL.
2.3.2 Pool router
利用ClusterSingletonManager作为StatsService的外包,虽然每个node上
都运行了创建代码,但实际上只有一个StatsService被创建,从而实现单例.
在node被创建之后,node沟通其它node并加入cluster,随后连接上singleton
下的statsService,如果是第一个node则会创建router,然后创建3个
worker. 之后的node直接在连上statsService之后创建3个worker. worer是
由workerRouter创建的,3个worker的设定在配置文件里
max-nr-of-instances-per-node = 3
. 这个设定只对pool有效,group里
面就算设定了也不会创建worker.
singlton意味着Actor在整个集群上只有一个,目前看来是先创建的node 上面载有这个单例,如果这个node挂了怎么办?
singletonManager 被映射到 /user/singleton 上作为 StatsService 的
容器, StatsService 自然就映射到其下
/user/singleton/statsService
.
2.4 Adaptive Load Balancing
AdaptiveLoadBalanceing Pool 和 AdaptiveLoadBalancing Group 两种 实现方式.中心是cluster metrics和配置adaptive的router.
- 注意,不要在
preStart
里面发送信息,这个时候路由还没有设置好,发 送的message全部/部分都会变成 dead letter. - 另外一个坑是:router会按照算法把负载分布到cluster的每个node上去.在 例子中,用role来区分前后端的node,所以router只会向后端node发送信 息.但是后端node不代表有后端actor,这一点router是无法探知的.所以 如果你把只运行有前端actor的node也标记为后端,router仍然会向其发 送信息,最后会导致信息送丢了. node的身份取只决于system config里 面的role,不取决于里面到底运行了什么actor.
- 追加一个坑:不要在callback里面调用任何状态相关的函数,除非你确定
你需要这个副作用.在callback里面调用
sender()
经常会返回deadLetter
, 因为sender()
真正被调用的时候不是接收到信息的 时候.