Protected
Protected

AMQP


Akka has an AMQP module which abstracts AMQP Producer and Consumer as Actors. It is fault-tolerant through supervisor hierarchies and does auto-reconnect on failure.
It is currently based on the RabbitMQ Java client but completely generic, works with any AMQP broker.

Here is an simple example showing how to create a Producer and Consumer and add a MessageConsumerListener (with an Actor listener) to the consumer to have it send the actor each new message posted to the consumer. The first method is configured using a 'direct' exchange and the second one a 'fanout' exchange.

import se.scalablesolutions.akka.amqp.AMQP._
import se.scalablesolutions.akka.actor.Actor._
 
val CONFIG = new ConnectionParameters
val HOSTNAME = "localhost"
val PORT = 5672
 
val IM = "im.whitehouse.gov"
val CHAT = "chat.whitehouse.gov"
 
def direct = {
  val consumer = AMQP.newConsumer(
    CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]())
 
  consumer ! MessageConsumerListener("@george_bush", "direct", actor {
    case Message(payload, _, _, _, _) => l
      log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
  })
 
  val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100)
 
  producer ! Message("@me: Nobody expects the Spanish Inquisition!".getBytes, "direct")
}
 
def fanout = {
  val consumer = AMQP.newConsumer(
    CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]())
 
  consumer ! MessageConsumerListener("@george_bush", "", actor {
    case Message(payload, _, _, _, _) =>
      log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
  })
 
  consumer ! MessageConsumerListener("@barack_obama", "", actor {
    case Message(payload, _, _, _, _) =>
      log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
  })
 
  val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100)
 
  producer ! Message("@me: I'm going surfing".getBytes, "")
}
Home
close
Loading...
Home Turn Off "Getting Started"
close
Loading...