Protected
Protected

Actors

Module stability: SOLID

The Actor Model provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management, making it easier to write correct concurrent and parallel systems. Actors were defined in the 1973 paper by Carl Hewitt but have been popularized by the Erlang language, and used for example at Ericsson with great success to build highly concurrent and reliable telecom systems.

The API of Akka’s Actors is similar to Scala Actors which has borrowed some of its syntax from Erlang.

The Akka 0.9 release introduced a new concept; ActorRef, which requires some refactoring. If you are new to Akka just read along, but if you have used Akka 0.6.x, 0.7.x and 0.8.x then you might be helped by the 0.8.x => 0.9.x migration guide

Creating Actors


Actors can be created either by:
  • Extending the Actor class and implementing the receive method.
  • Create an anonymous actor using one of the actor methods.

Defining an Actor class


Actor classes are implemented by extending the Actor class and implementing the 'receive' method. In the 'receive' method should definie a series of case statements (which has the type 'PartialFunction[Any, Unit]') that defines which messages your Actor can handle, using standard Scala pattern matching, along with the implementation of how the messages should be processed.

Here is an example:

class MyActor extends Actor {
  def receive = {
    case "test" => log.info("received test")
    case _ => log.info("received unknown message")
  }
}

The 'Actor' trait mixes in the 'se.scalablesolutions.akka.util.Logging' trait which defines a logger in the 'log' field that you can use to log. This logger is configured in the 'akka.conf' configuration file (and is based on the Configgy library which is using Java Logging).

Creating Actors


val myActor = Actor.actorOf[MyActor]
myActor.start

Normally you would want to import the 'actorOf' method like this:

import se.scalablesolutions.akka.actor.Actor._
 
val myActor = actorOf[MyActor]

To avoid prefix it with 'Actor' every time you use it.

You can also start it in the same statement:

val myActor = actorOf[MyActor].start

The call to 'actorOf' returns an instance of 'ActorRef'. This is a handle to the 'Actor' instance which you can use to interact with the Actor, like send messages to it etc. more on this shortly. The 'ActorRef' is immutble and has a one to one relationship with the Actor it represents. The 'ActorRef' is also serializable and network-aware. This means that you can serialize it, send it over the wire and use it on a remote host and it will still be representing the same Actor on the original node, across the network.

Creating Actors with non-default constructor


If your Actor has a constructor that takes parameters then you can't create it using 'actorOf[TYPE]'. Instead you can use a variant of 'actorOf' that takes a call-by-name block in which you can create the Actor in any way you like.

Here is an example:

val a = actorOf(new MyActor(..)).start // allows passing in arguments into the MyActor constructor

Creating anonymous actors


Anonymous actors are created using the 'actor' method and its friends, who are defined in the 'Actor' object.

Some examples.

Here we create an actor that does not have an init body, only a message handler. The actor is defined with 'permanent' life-cycle which means that it will be restarted if it dies while being supervised.

import se.scalablesolutions.akka.actor.Actor._
 
val a = actor {
  case msg => ... // handle message
}

Here we create an actor with both an init body and a message handler. The actor is created with a 'permanent' life-cycle configuration, which means that if the actor is supervised and dies it will be restarted.

val a = Actor.init {
  ... // init stuff
} receive {
  case msg => ... // handle message
}

Here we create an actor with a 'temporary' life-cycle, which means that it will *not* be restarted if it dies while being supervised. This actor does not have an init body, only a message handler.

val a = temporaryActor {
  case msg => ... // handle message
}

Here we create a light-weight actor-based thread, that can be used to spawn off a task. Code blocks spawned up like this are always implicitly started, shut down and made eligible for garbage collection.

spawn {
  ... // do stuff
}

Messages and immutability


IMPORTANT: Messages can be any kind of object but have to be immutable. Scala can’t enforce immutability (yet) so this has to be by convention. Primitives like String, Int, Boolean are always immutable. Apart from these the recommended approach is to use Scala case classes which are immutable (if you don’t explicitly expose the state) and works great with pattern matching at the receiver side.

Here is an example:
// define the case class
case class Register(user: User)
 
// create a new case class message
val message = Register(user)

Other good messages types are 'scala.Tuple2', 'scala.List', 'scala.Map' which are all immutable and great for pattern matching.

Send messages


Messages are sent to an Actor through one of the “bang” methods.
  • ! means “fire-and-forget”, e.g. send a message asynchronously and return immediately.
  • !! means “send-and-reply-eventually”, e.g. send a message asynchronously and wait for a reply through aFuture. Here you can specify a timeout. Using timeouts is very important. If no timeout is specified then the actor’s default timeout (set by the this.timeout variable in the actor) is used. This method returns an 'Option[Any]' which will be either 'Some(result)' if returning successfully or None if the call timed out.
  • !!! sends a message asynchronously and returns a 'FutureResult'.

Fire-forget


This is the preferred way of sending messages. No blocking waiting for a message. Give best concurrency and scalability characteristics.

actor ! "Hello"

If invoked from within an Actor, then the sending actor reference will be implicitly passed along with the message and available to the receiving Actor in its 'sender: Option[AnyRef]' member field. He can use this to reply to the original sender or use the 'reply(message: Any)' method.

If invoked from an instance that is not an Actor there will be no implicit sender passed along the message and you will get an IllegalStateException if you call 'self.reply(..)'.

Send-And-Receive-Eventually


Using '!!' will send a message to the receiving Actor asynchronously but it will wait for a reply on a 'FutureResult', blocking the sender Actor until either:

  • A reply is received, or
  • The FutureResult times out

You can pass an explicit time-out to the '!!' method and if none is specified then the default time-out defined in the sender Actor will be used.

The '!!' method returns an 'Option[Any]' which will be either 'Some(result)' if returning successfully, or None if the call timed out.
Here are some examples:

val resultOption = actor !! ("Hello", 1000)
if (resultOption.isDefined) ... // handle reply
else ... // handle timeout
 
val result: Option[String] = actor !! "Hello"
resultOption match {
  case Some(reply) => ... // handle reply
  case None =>        ... // handle timeout
}
 
val result = (actor !! "Hello").getOrElse(throw new RuntimeException("TIMEOUT"))
 
(actor !! "Hello").foreach(result => ...) // handle result

Send-And-Receive-Future


Using '!!!' will send a message to the receiving Actor asynchronously and will return a 'FutureResult':

val future = actor !!! "Hello"

The 'FutureResult' trait looks like this:

trait FutureResult {
  def await
  def awaitBlocking
  def isCompleted: Boolean
  def isExpired: Boolean
  def timeoutInNanos: Long
  def result: Option[Any]
  def exception: Option[Tuple2[AnyRef, Throwable]]
}

So the normal way of working with futures is something like this:

val future = actor !!! "Hello"
future.await
future.result.foreach(result => ...)

We also have a utility class 'Futures' that have a couple of convenience methods:

object Futures {
  def awaitAll(futures: List[FutureResult]): Unit
  def awaitOne(futures: List[FutureResult]): FutureResult

Forward message


You can forward a message from one actor to another. This means that the original sender address/reference is maintained even though the message is going through a 'mediator'. This can be useful when writing actors that work as routers, load-balancers, replicators etc.

actor.forward(message)

Receive messages


An Actor has to implement the ‘receive’ method to receive messages:
protected def receive: PartialFunction[Any, Unit]

Note: Akka has an alias to the 'PartialFunction[Any, Unit]' type called 'Receive', so you can use this type instead for clarity. But most often you don't need to spell it out.

This method should return a PartialFunction, e.g. a ‘match/case’ clause in which the message can be matched against the different case clauses using Scala pattern matching. Here is an example:
class MyActor extends Actor {
  def receive = {
    case "Hello" =>
      log,info("Received 'Hello'")
 
    case _ =>
      throw new RuntimeException("unknown message")
  }
}

Actor internal API


The Actor trait contains almost no member fields or methods to invoke, you just use the Actor trait to implement the:
  1. 'receive' message handler
  2. life-cycle callbacks:
    1. init
    2. shutdown
    3. preRestart
    4. postRestart

The 'Actor' trait has one single member field (apart from the 'log' field from the mixed in 'Logging' trait):

val self: ActorRef

This 'self' field holds a reference to its 'ActorRef' and it is this reference you want to access the Actor's API. Here, for example, you find methods to reply to messages, send yourself messages, define timeouts, fault tolerance etc., start and stop etc.

However, for convenience you can import these functions and fields like below, which will allow you do drop the 'self' prefix:

class MyActor extends Actor {
  import self._
  id = ...
  dispatcher = ...
  start
  ...
}

But in this documentation we will always prefix the calls with 'self' for clarity.

Let's start by looking how we can reply to messages in a convenient way using this 'ActorRef' API.

Reply to messages


Reply using the reply and reply_? methods


If you want to send a message back to the original sender of the message you just received then you can use the 'reply(..)' method.

case request =>
  val result = process(request)
  self.reply(result)

In this case the 'result' will be send back to the Actor that send the 'request'.

The 'reply' method throws an 'IllegalStateException' if unable to determine what to reply to, e.g. the sender is not an actor. You can also use the more forgiving 'reply_?' method which returns 'true' if reply was sent, and 'false' if unable to determine what to reply to.

case request =>
  val result = process(request)
  if (self.reply_?(result)) ...// success
  else ... // handle failure

Reply using the sender reference


If the sender is an Actor then its reference will be implicitly passed along together with the message and will end up in the 'sender: Option[ActorRef]' member field in the 'ActorRef. This means that you can use this field to send a message back to the sender.

// receiver code
case request =>
  val result = process(request)
  self.sender.get ! result

It's important to know that 'sender.get' will throw an exception if the 'sender' is not defined, e.g. the 'Option' is 'None'. You can check if it is defined by invoking the 'sender.isDefined' method, but a more elegant solution is to use 'foreach' which will only be executed if the sender is defined in the 'sender' member 'Option' field. If it is not, then the operation in the 'foreach' method is ignored.

// receiver code
case request =>
  val result = process(request)
  self.sender.foreach(_ ! result)

The same pattern holds for using the 'senderFuture' in the section below.

Reply using the sender future


If a message was sent with the '!!' or '!!!' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.

The reference to the Future resides in the 'senderFuture: Option[CompletableFutureResult]' member field in the 'ActorRef' class.

Here is an example of how it can be used:

case request =>
  try {
    val result = process(request)
    self.senderFuture.foreach(_.completeWithResult(result))
  } catch {
    case e =>
      senderFuture.foreach(_.completeWithException(this, e))
  }

Starting actors


Actors are started by invoking the ‘start’ method.
val actor = actorOf[MyActor]
actor.start

You can create and start the Actor in a oneliner like this:
val actor = actorOf[MyActor].start

When you start the actor then it will automatically call the 'def init' callback method on the 'Actor' trait. This is an excellent place to add initialization code for the actor.
override def init = {
... // initialization code
}

Stopping actors


Actors are stopped by invoking the ‘stop’ method.
actor.stop

When stop is called then a calll to the ‘def shutdown’ callback method will take place. The Actor can use this callback to implement shutdown behavior.
override def shutdown = {
  ... // clean up resources
}

You can shut down all Actors in the system by invoking:

ActorRegistry.shutdownAll

ActorRegistry: Finding Actors


Actors can be looked up using the 'se.scalablesolutions.akka.actor.ActorRegistry' object. Through this registry you can look up actors by:
  • uuid string – this uses the ‘uuid’ field in the Actor class, returns all actor instances with that uuid
  • id string – this uses the ‘id’ field in the Actor class, which can be set by the user (default is the class name), returns instances of a specific Actor
  • specific actor class - returns a 'List[Actor]' with all actors of this exact class
  • parameterized type - returns a 'List[Actor]' with all actors that are a subtype of this specific type

Actors are automatically registered in the ActorRegistry when they are started and removed when they are stopped. But you can explicitly register and unregister ActorRef's if you need to using the 'register' and 'unregister' methods.

Here is a summary of the API for finding actors:

def actors: List[ActorRef]
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef]
def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef]
def actorsFor(id : String): List[ActorRef]

Examples of how to use them:

val actors = ActorRegistry.actorsFor(classOf[...])

val actors =  ActorRegistry.actorsFor(id)

val actors = ActorRegistry.actorsFor[MyActorType]

The ActorRegistry also has a 'shutdownAll' and 'foreach' method:

def foreach(f: (ActorRef) => Unit)
def shutdownAll

If you want to know when a new Actor is added or to or removed from the registry, you can use the subscription API. You can register an Actor that should be notified when an event happens in the ActorRegistry:

def addRegistrationListener(listener: ActorRef)
def removeRegistrationListener(listener: ActorRef)

The messages sent to this Actor are:

case class ActorRegistered(actor: ActorRef)
case class ActorUnregistered(actor: ActorRef)

So your listener Actor needs to be able to handle these two messages.

HotSwap


Akka supports hotswapping the Actor’s message loop (e.g. its implementation) at runtime. All you need to do is to send a message ‘HotSwap’ with the new 'PartialFunction' defining the new message loop for the Actor. The old version will be kept so you can “downgrade” the Actor any time you want.

To hotswap the actor body:
actor ! HotSwap(Some({
  case message => println("Hotswapped body...")
}))

To hotswap back to the original actor body:
actor ! HotSwap(None)

Killing an Actor


You can kill an actor by sending a 'Kill' message. This will restart the actor through regular supervisor semantics.

Use it like this:
// kill the actor called 'victim'
victim ! Kill

Actor life-cycle


The actor has a well-defined non-circular life-cycle.

NEW (newly created actor) - can't receive messages (yet)
    => STARTED (when 'start' is invoked) - can receive messages
        => SHUT DOWN (when 'exit' or 'stop' is invoked) - can't do anything

Extending Actors using PartialFunction chaining


A bit advanced but very useful way of defining a base message handler and then extend that, either through inheritance or delegation, is to use 'PartialFunction' 'orElse' chaining.

In generic base Actor:
abstract class GenericActor extends Actor {
 
  // to be defined in subclassing actor
  def specificMessageHandler: PartialFunction[Any, Unit]
 
  // generic message handler
  def genericMessageHandler = {
      ... // generic message handler
  }
 
  def receive = specificMessageHandler orElse genericMessageHandler
}

In subclassing Actor:
class SpecificActor extends GenericActor {
  def specificMessageHandler = {
    ... // specific message handler
  }
}
Home
close
Loading...
Home Turn Off "Getting Started"
close
Loading...