1 Actors

import akka.actor.{ ActorRef, ActorSystem, Props, Actor, Inbox }
import scala.concurrent.duration._

// Three messages, one is object, two are case classes
case object Greet
case class WhoToGreet(who: String)
case class Greeting(message: String)

class Greeter extends Actor {
  var greeting = ""

  def receive = {
    case WhoToGreet(who) => greeting = s"hello, $who"
    case Greet           => sender ! Greeting(greeting)
    // Send the current greeting back to the sender
  }
}

object HelloAkkaScala extends App {

  // Create the 'helloakka' actor system
  val system = ActorSystem("helloakka")

  // Create the 'greeter' actor
  val greeter = system.actorOf(Props[Greeter], "greeter")

  // Create an "actor-in-a-box"
  val inbox = Inbox.create(system)

  // Tell the 'greeter' to change its 'greeting' message
  greeter.tell(WhoToGreet("akka"), ActorRef.noSender)
  // Ask the 'greeter for the latest 'greeting'
  // Reply should go to the "actor-in-a-box"
  inbox.send(greeter, Greet)

  // Wait 5 seconds for the reply with the 'greeting' message
  val Greeting(message1) = inbox.receive(5.seconds)
  println(s"Greeting: $message1")

  // Change the greeting and ask for it again
  greeter.tell(WhoToGreet("typesafe"), ActorRef.noSender)
  inbox.send(greeter, Greet)
  val Greeting(message2) = inbox.receive(5.seconds)
  println(s"Greeting: $message2")

  val greetPrinter = system.actorOf(Props[GreetPrinter])
  // after zero seconds, send a Greet message every second to the greeter with a sender of the greetPrinter
  system.scheduler.schedule(0.seconds, 1.second, greeter, Greet)(system.dispatcher, greetPrinter)

}

// prints a greeting
class GreetPrinter extends Actor {
  def receive = {
    case Greeting(message) => println(message)
  }
}
  • Sending messages
    1. actor ! message or actor.tell(message, sender).
    2. inbox.send(sender, message)
    3. system.scheduler.schedule(initDelay, interval, receiver, message)(executor, sender)
    4. actor ? message, it means “ask”, which resturns a Future representing a possible reply. When you implement ask function, you have to catch the exception and send a Failure to the sender
      try {
        val result = operation()
        sender() ! result
      } catch {
        case e: Exception =>
          sender() ! akka.actor.Status.Failure(e)
          throw e
      }
      

      You can set a timeout period for the Future by implicit value implicit val timeout = Timeout(5 seconds) or using curry function myActor.ask(msg)(5 seconds).

  • Receiving messages
    1. receive defined method in actor will be called when message comes. Attention:, the sender() method returns the current sender when the method is called, DO NOT use this function in callbacks since the sender method may returns wrong sender (deadletter in many cases) when the callback is actually invoked.
    2. inbox.receive
    3. Set timeout period for receiving method, context.setReceiveTimeout(30 milliseconds). Then you can catch this event by receiving message case ReceiveTimeout.
  • Forward message target forward message, it will keep the sender unchanged. A send msg to B, B forward msg to C, the sender for C is still A instead of B
  • Stop actor akka.actor.PoisonPill message will stop the actor. gracefulStop is useful to wait for termination or compose ordered termination of several actors. It is a pattern, not precoded, it sends a user defined message to the actor and returns a Future, you can Await this future for a duration.

    actor ! Kill will also stop this actor, but it causes the actor to throw a ActorKilledException. This actor will suspend and like the supervisor to decide how to handle the failure.

  • Become/Unbecome context.become method in Actor can setting the receive function, hence the behavior of Actor is changed. unbecome lets the actor returns to previews behavior.

    You can use stash and unstash to save mails into a queue in one behavior and get them back in another behavior. Of course, you have to mix the trait Stash to be able to use it.

  • Extend actor and intialization patterns Use PartialFunction#orElse to chain the receive function of the extended actor.

    Three init patterns

    1. constructor
    2. preStart
    3. Message

1.1 Falt toleration

Example code from document.

  1. Counter, increment and save count to storage, return current count number
  2. Storage, simulate database, store (String, Long) pairs.
  3. CounterService, override supervisorStrategy by OneForOneStrategy
    override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,
      withinTimeRange = 5.seconds) {
      case _: Storage.StorageException => Restart
    }
    

    init with actor refererences to the wrapper (watcher) of storage, tell counter (when initialized?) to use the storage.

    forward message (increment, getCurrentCount) to counter if it exists; save sender->msg pair to log if counter is not available.

    if storage is terminated, tell counter that storage is not available, schedule task to initStorage again (reconnect).

  4. Worker, stop when receiving message from CounterService. init CounterService actor.
  5. Listener, shutdown the system while timeout. Listen Progress, which is a case class of Worker.
  6. App object init worker and listener. Tell worker to assign the listener, which will schedule task to use the CounterService to increment count and pipe the message to listener.

1.2 Persistent Actor

2 Cluster

2.1 Simple Cluster

2.1.1 Configuration

To run cluster, you have to first creat a configuration file that specify the URI, actors, etc.

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
}

2.1.2 Code

package sample.cluster.simple

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props

object SimpleClusterApp {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty)
      startup(Seq("2551", "2552", "0"))
    else
      startup(args)
  }

  def startup(ports: Seq[String]): Unit = {
    ports foreach { port =>
      // Override the configuration of the port
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
        withFallback(ConfigFactory.load())

      // Create an Akka system
      val system = ActorSystem("ClusterSystem", config)
      // One system on one port?

      // Create an actor that handles cluster domain events
      system.actorOf(Props[SimpleClusterListener], name = "clusterListener")
    }
  }

}

class SimpleClusterListener extends Actor with ActorLogging {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, re-subscribe when restart
  override def preStart(): Unit = {
    //#subscribe
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent], classOf[UnreachableMember])
    //#subscribe
  }
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
    case MemberRemoved(member, previousStatus) =>
      log.info("Member is Removed: {} after {}",
        member.address, previousStatus)
    case _: MemberEvent => // ignore
  }
}

2.1.3 Running

Log when starting the second seed node: [info] [INFO] [03/10/2015 23:57:27.799] [ClusterSystem-akka.actor.default-dispatcher-14] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []

This code defines a cluster (集群), each node runs as dependennt processes (different JVMs) since they all run on my local machine. A cluster is indentified by akka://CllusterSystem, while each node is indentified by IP address and port number. The IP address seems like to be defined by application.config.

  • seed nodes are configured contact points for initial, automatic, join of the cluster. Hence, the seed-nodes figure out where to join the cluster to current node. The seed nodes can also be configured by command line options when starting the JVM using the following syntax:
    -Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
    -Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host1:2552
    

    Starting seed notes can be async and not all the seed nodes are necessary to run. But the first node must be started when starting a cluster.

    It is also possible to join the cluster manually via JMX or Command Line Management or via programatic method with Cluster.get(system).join(address) or Cluster(system).joinSeedNodes.

  • hostname in remote in configuration specify the IP of current node. Question: /what if I specify the IP to another machine? The application should be deploy to another machine before running? What protocole does it uses?/

2.2 Dial-in example

Frontend-backend pattern (not those in Group router):

  1. Backed subscribes to the cluster service to register new frontend
  2. Frontend watch the backend and define the receive method for the case of Terminated(a).
  3. Frontend redirect received jobs to backend.
  4. Roles of node is defined in the configuration property named akka.cluster.roles or in start script as a system property or environment variable.

Number of members restriction: akka.cluster.min-nr-of-members = 3, the leader will not change the status of nodes to ‘Up’ until this number of members reached. Finer configuration:

#+beign_src yaml akka.cluster.role { frontend.min-nr-of-members = 1 backend.min-nr-of-members = 2 }

#+end_src Define callback to be invoked when the current member status is changed to ‘Up’

Cluster(system) registerOnMemberUp {
  system.actorOf(Props(classOf[FactorialFrontend], upToN, true),
    name = "factorialFrontend")
}

2.3 Cluster Aware Routers

Router is one that receive and send tasks to actors, while the ref to actors are called routees.

2.3.1 Group router

在示例代码中,每一个node上面都有一个StatsService,和一个 StatsWorker. StatsService里面包含了指向StatsWorker的router并且负 责接受和转发任务. 每次Client将一个任务传给Cluster时,Cluster中某 个一node上的StatsService会接收并分发任务,因为分发任务是依靠 router来实现的,所以任务会被分发到不同node上的StatsWorker上去.这 个例子中,每个node上的Service和Worker都是显式创建的.

Group example: Server side

akka.actor.deployment {
  /statsService/workerRouter {
      router = consistent-hashing-group
      // specify the router, you have also other options like round-robin-pool
      nr-of-instances = 100
      routees.paths = ["/user/statsWorker"]
      cluster {
        enabled = on
        allow-local-routees = on
        use-role = compute
      }
    }
}

With this configuration, relative actor paths defined in routees.paths are used for selecting actors to send the message by the router. 在创建System的时候将这个配置文件包含到配置中去, 然后在这个system 下面任意地方用 actorOf(FromConfig.props(Props[YourClass])) 来 得到router actor. 发送任务时将任务发给router actor,它会自动转发 给cluster中的注册在 “user” 下面的注册名为 “statsWorker” 的actor (不一定是StatsWorker,你可以随意将不同类型的Actor注册在这个名字下 面). 更多的配置,查看routing.

下面的代码是用编程的方式实现配置文件里的配置.

val workerRouter = context.actorOf(
    ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings(
      totalInstances = 100, routeesPaths = List("/user/statsWorker"),
      allowLocalRoutees = true, useRole = Some("compute"))).props(),
    name = "workerRouter2")

Group example: Client side

object StatsSampleClient {
  def main(args: Array[String]): Unit = {
    // note that client is not a compute node, role not defined
    val system = ActorSystem("ClusterSystem")
    system.actorOf(Props(classOf[StatsSampleClient], "/user/statsService"), "client")
  }
}

class StatsSampleClient(servicePath: String) extends Actor {
  // servicePath is "/user/statsService", which is the second argument of the Props.
  val cluster = Cluster(context.system)
  val servicePathElements = servicePath match {
    case RelativeActorPath(elements) => {
//      println("servicePath is: "+servicePath) ;
      elements}
    case _ => throw new IllegalArgumentException(
      "servicePath [%s] is not a valid relative actor path" format servicePath)
  }
  import context.dispatcher
  val tickTask = context.system.scheduler.schedule(2.seconds, 10.seconds, self, "tick")

  var nodes = Set.empty[Address]

  override def preStart(): Unit = {
    cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
  }
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
    tickTask.cancel()
  }

  def receive = {
    case "tick" if nodes.nonEmpty =>
      // just pick any one
      val address = nodes.toIndexedSeq(ThreadLocalRandom.current.nextInt(nodes.size))
      val service = context.actorSelection(RootActorPath(address) / servicePathElements)
      service ! StatsJob("this is the text that will be analyzed")
    case result: StatsResult =>
      println(result)
    case failed: JobFailed =>
      println(failed)
    case state: CurrentClusterState =>
      nodes = state.members.collect {
        case m if m.hasRole("compute") && m.status == MemberStatus.Up => m.address
      }
    case MemberUp(m) if m.hasRole("compute")        => nodes += m.address
    case other: MemberEvent                         => nodes -= other.member.address
    case UnreachableMember(m)                       => nodes -= m.address
    case ReachableMember(m) if m.hasRole("compute") => nodes += m.address
  }

}

而在客户端代码中,首先向cluster订阅成员事件来获取所有的成员. 然后 根据成员的role来筛选出相应的node. 真正开始发送任务的时候,随机排 列这些node,然后使用 context.actorSelection(RootActorPath(address) / servicePathElements) 这句话返回一个 ActorSelection 的对象,对它 发送信息能自动地发送到某个注册在cluster上的某个node上的某个actor 上面去.准确地说,后半段确定Actor的URN,前半段应该是URL.

2.3.2 Pool router

利用ClusterSingletonManager作为StatsService的外包,虽然每个node上 都运行了创建代码,但实际上只有一个StatsService被创建,从而实现单例. 在node被创建之后,node沟通其它node并加入cluster,随后连接上singleton 下的statsService,如果是第一个node则会创建router,然后创建3个 worker. 之后的node直接在连上statsService之后创建3个worker. worer是 由workerRouter创建的,3个worker的设定在配置文件里 max-nr-of-instances-per-node = 3. 这个设定只对pool有效,group里 面就算设定了也不会创建worker.

singlton意味着Actor在整个集群上只有一个,目前看来是先创建的node 上面载有这个单例,如果这个node挂了怎么办?

singletonManager 被映射到 /user/singleton 上作为 StatsService 的 容器, StatsService 自然就映射到其下 /user/singleton/statsService.

2.4 Adaptive Load Balancing

AdaptiveLoadBalanceing Pool 和 AdaptiveLoadBalancing Group 两种 实现方式.中心是cluster metrics和配置adaptive的router.

  1. 注意,不要在 preStart 里面发送信息,这个时候路由还没有设置好,发 送的message全部/部分都会变成 dead letter.
  2. 另外一个坑是:router会按照算法把负载分布到cluster的每个node上去.在 例子中,用role来区分前后端的node,所以router只会向后端node发送信 息.但是后端node不代表有后端actor,这一点router是无法探知的.所以 如果你把只运行有前端actor的node也标记为后端,router仍然会向其发 送信息,最后会导致信息送丢了. node的身份取只决于system config里 面的role,不取决于里面到底运行了什么actor.
  3. 追加一个坑:不要在callback里面调用任何状态相关的函数,除非你确定 你需要这个副作用.在callback里面调用 sender() 经常会返回 deadLetter, 因为 sender() 真正被调用的时候不是接收到信息的 时候.