Protected
Module stability: STABLE
Documentation on this page refers to the latest development version of the akka-camel module. Changes since Akka 0.9.1 are summarized in the Akka 0.10 release notes and the migration guide.
Since version 0.7, Akka offers a new feature that let actors and active objects receive and send messages over a great variety of protocols and APIs. This section gives a brief overview of the general ideas behind Akka's Camel integration, the remaining sections go into the details. In addition to the native Scala actor API, actors can now exchange messages with other systems over large number of protcols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported. This new feature is provided by the akka-camel module.
At the core of this new feature is Apache Camel, a powerful and leight-weight integration framework for the JVM. For an introduction to Apache Camel you may want to read this article. Camel comes with a large number of components that provide bindings to different protocols and APIs. The camel-extra project provides further components. Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example.
The above example exposes an actor over a tcp endpoint on port 6200 via Apache Camel's Mina component. The endpointUri is an abstract method declared in the Consumer trait. After starting the actor, tcp clients can immediately send messages to and receive responses from that actor. If the message exchange should go over HTTP (via Camel's Jetty component), only the actor's endpointUri must be redefined.
Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints.
In the above example, any message sent to this actor will be added (produced) to the example JMS queue. Producer actors may choose from the same set of Camel components as Consumer actors do.
The number of Camel components is constantly increasing. The akka-camel module can support these in a plug-and-play manner. Just add them to your application's classpath, define a component-specific endpoint URI and use it to exchange messages over the component-specific protocols or APIs. This is possible because Camel components bind protocol-specific message formats to a Camel-specific normalized message format. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The akka-camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage, for example.
Both actors and active objects can receive (consume) messages from Camel endpoints. For actors to receive messages, they must mixin the Consumer trait. For example, the following actor class (Consumer1) implements the endpointUri method, which is declared in the Consumer trait, in order to receive messages from the file:data/input/actor Camel endpoint.
Whenever a file is put into the data/input/actor directory, its content is picked up by the Camel file component and sent as message to the actor. Messages consumed by actors from Camel endpoints are of type Message. These are immutable representations of Camel messages.
Here's another example that sets the endpointUri to jetty:http://localhost:8877/camel/default. It causes Camel's jetty component to start an embedded Jetty server, accepting HTTP connections from localhost on port 8877.
After starting the actor, clients can send messages to that actor by POSTing to http://localhost:8877/camel/default. The actor sends a response by using the self.reply method. For returning a message body and headers to the HTTP client the response type should be Message. For any other response type, a new Message object is created by akka-camel with the actor response as message body.
Not all Camel endpoints support two-way communications as jetty endpoints do. For example, file endpoints only support one-way communications. Using self.reply in this case would throw an exception (because there's no sender reference).
Active objects can also receive messages from Camel endpoints. In contrast to actors, which only implement a single receive method, the implementation POJO may define several (message processing) methods, each of which can receive messages from a different Camel endpoint. For an active object method to be exposed as Camel endpoint it must be annotated with the @consume. For example, the following ConsumerPojo defines two methods, foo and bar.
The foo method can be invoked by placing a file in the data/input/pojo directory. Camel picks up the file from this directory and akka-camel invokes foo with the file content as argument (converted to a String). Camel automatically tries to convert messages to appropriate types as defined by the method parameter(s). The conversion rules are described in detail on the following pages:
The bar method can be invoked by POSTing a message to http://localhost:8877/camel/pojo. Here, parameter binding annotations are used to tell Camel how to extract data from the HTTP message. The @Body annotation binds the HTTP request body to the first parameter, the @Header annotation binds the X-Whatever header to the second parameter. The return value is sent as HTTP response message body to the client.
Publishing a consumer actor at its Camel endpoint occurs when the actor is started. Publication is done asynchronously; setting up an endpoint (more precisely, the route from that endpoint to the actor) may still be in progress after the ActorRef.start method returned.
Publishing of active object methods is done when the active object is created with one of the ActiveObject.newInstance(..) methods. Publication is done in the background here as well i.e. it may still be in progress when ActiveObject.newInstance(..) returns.
Publishing of consumer actors or active object methods requires a running CamelService. The Akka Kernel or the Akka Initializer (a ServletContextListener) start a CamelService automatically. When using Akka in other environments, a CamelService must be started manually. Applications can do that by creating a new CamelService instance and then start the service.
If applications need to wait for a certain number of consumer actors or active object methods to be published they can do so with the CamelService.expectEndpointActivationCount method.
The section Application configuration additionally describes how a CamelContext, that is managed by the CamelService, can be cutomized before starting the service. The CamelContext should be shut down when it is no longer needed.
When an actor is stopped, the route from the endpoint to that actor is stopped as well. For example, stopping an actor that has been previously published at http://localhost:8877/camel/test will cause an HTTP 404 when trying to access that endpoint. Stopping the route is done asynchronously; it may be still in progress after the ActorRef.stop method returned.
When an active object is stopped, routes to @consume annotated methods of this active objects are stopped as well. Stopping the routes is done asynchronously; it may be still in progress after the ActiveObject.stop method returned.
Endpoints that support two-way communications need to wait for a response from an actor or active object before returning it to the initiating client. For some endpoint types, timeout values can be defined in an endpoint-specific way which is described in the documentation of the individual Camel components. Another option is to configure timeouts for comsumer actors and active objects. For active objects, timeout values for invoking methods that return a result, can be set when the active object is created. In the following example, the timeout is set to 20 seconds (default is 5 seconds).
Two-way communications between a Camel endpoint and an actor are initiated by sending the request message to the actor with the ! (bang) operator and the actor replies to the endpoint when the response is ready. In order to support timeouts on actor-level, endpoints need to send the request message with the !! (bangbang) operator for which a timeout value is applicable. This can be achieved by overriding the Consumer.blocking method to return true.
This is a valid approach for all endpoint types that do not "natively" support asynchronous two-way message exchanges. For all other endpoint types (like jetty endpoints) is it not recommended to switch to blocking mode but rather to configure timeouts in an endpoint-specific way (see also asynchronous routing).
Publishing remote actors via Camel endpoint works for both client-initiated and server-initiated remote actors. It is always the actor on the remote node that is published, never the proxy. For example, the following actor is a remote actor that should be published at the endpoint URI jetty:http://localhost:6644/remote-actor-1.
To prepare a remote node to publish this actor, applications need to start a CamelService an that node.
Explicitly starting a CamelService can be omitted when Akka is running in Kernel mode, for example. The remote actor gets started and published on the remote node once a client application sends a message to that actor e.g.
Remote active objects are created with one of the ActiveObject.newRemoteInstance(..) methods. As with actors, the client must send an initial message (i.e. make an initial remote method call) to create and publish the active object remotely. The remote node must also have a CamelService running.
For sending messages to Camel endpoints actors need to mixin the Producer trait and implement the endpointUri method. For example, to POST a message to http://localhost:8080/news, the endpointUri method must be implemented as follows.
The actor inherits a default implementation of receive method from the Producer trait. For Akka versions lower than 0.10, the Producer trait doesn't provide a default implementation of receive and actors need to implement this method as shown in the next example
Any message sent to a Producer actor will be sent to the associated Camel endpoint, in the above example to http://localhost:8080/news. Response messages (if supported by the configured endpoint) will by default be sent to the original sender. The following example uses the !! operator to send a message to a Producer actor and waits for a response.
If the message is sent by an actor using the ! operator then the response message is sent back asynchronously as outlined in the following example.
Instead of replying to the initial sender, producer actors can implement custom reponse processing by overriding the receiveAfterProduce method. In the following example, the reponse message is forwarded to a target actor instead of being replied to the original sender.
Before producing messages to endpoints producer actors can pre-process them by overriding the receiveBeforeProduce method.
The interaction of producer actors with Camel endpoints can be configured to be one-way or two-way (by initiating in-only or in-out message exchanges, respectively). By default, the producer initiates an in-out message exchange with the endpoint. For initiating an in-only exchange, producer actors either have to override the oneway method to return true
or mixin the Oneway trait.
To correlate request with response messages, applications can set the Message.MessageExchangeId message header.
Responses of type Message or Failure will contain that header as well. When receiving messages from Camel endpoints this message header is already set (see Consume messages).
The following code snippet shows how to best match responses when sending messages with the !! operator.
The Producer trait is a very convenient way for actors to produce messages to Camel endpoints. Actors and active objects may also use a Camel ProducerTemplate for producing messages to endpoints. For active objects it's the only way to produce messages to Camel endpoints. A managed ProducerTemplate instance can be obtained via CamelContextManager.template. In the following example, an actor uses a ProducerTemplate to send a one-way message to a direct:news endpoint.
For a two-way message exchange, one of the ProducerTemplate.request* methods must be used.
Active objects get access to a managed ProducerTemplate in the same way, as shown in the next example.
At the moment, only the Producer trait fully supports asynchronous in-out message exchanges with Camel endpoints without allocating a thread for the full duration of the exchange. For example, when using endpoints that support non-blocking IO (such as jetty endpoints that internally use Jetty's asynchronous HTTP client) then usage of the Producer trait is highly recommended (see also asynchronous routing).
Since Akka 0.10, in-out message exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.
However, asynchronous two-way message exchanges, without allocating a thread for the full duration of exchange, cannot be generically supported by Camel's asynchronous routing engine alone. This must be supported by the individual Camel components (from which endpoints are created) as well. They must be able to suspend any work started for request processing (thereby freeing threads to do other work) and resume processing when the response is ready. This is currently the case for a subset of components such as the jetty component. All other Camel components can still be used, of course, but they will cause allocation of a thread for the duration of an in-out message exchange. There's also a running example that implements both, an asynchronous consumer and an asynchronous producer, with the jetty component.
Consumer actors and active objects can be also managed by supervisors. If a consumer is configured to be restarted upon failure the associated Camel endpoint is not restarted. It's behaviour during restart is as follows.
If a consumer is configured to be shut down upon failure, the associated endpoint is shut down as well. For details refer to the consumer un-publishing section.
For publishing consumer actors and active object methods, applications must start a CamelService. When starting Akka in Kernel mode or using the Akka Initializer in a web application, then a CamelService is started automatically. In all other cases (standalone applications) it must be started manually.
Here's an example how a standalone application should create and start a CamelService.
Internally, the CamelService uses the CamelContextManager singleton to manage a CamelContext. A CamelContext manages the routes from endpoints to consumer actors and active objects. These routes are added and removed at runtime (when actors and active objects are started and stopped). Applications may additionally want to add their own custom routes or modify the CamelContext in some other way. This can be done by initializing the CamelContextManager manually and making modifications to CamelContext before the CamelService is started.
Applications may even provide their own CamelContext instance as argument to the init method call as shown in the following snippet. Here, a DefaultCamelContext is created using a Spring application context as registry.
A better approach to configure a Spring application context as registry for the CamelContext is to use Camel's Spring support. Furthermore, Akka's Spring module additionally supports a <camel-service> element for creating and starting a CamelService. An optional reference to a custom CamelContext can be defined as well. Here's an example.
Creating a CamelContext this way automatically adds the defining Spring application context as registry to that CamelContext. The CamelService is started when the application context is started and stopped when the application context is closed. A simple usage example is shown in the following snippet.
If the CamelService doesn't reference a custom CamelContext then a default CamelContext is created (and accessible via the CamelContextManager).
For classes that are loaded by the Kernel or the Initializer, creation and starting of a CamelService instance can be omitted, as discussed in the previous section. Since these classes are loaded and instantiated before the CamelService is started (by Akka), application can make modifications to a CamelContext here as well (and even provide their own CamelContext). Assuming there's a boot class sample.camel.Boot configured in akka.conf.
Modifications to the CamelContext can be done like in the following snippet.
Besides automated setup of routes, applications may also define custom routes to actors and active objects using the actor and active-object Camel components, respectively. These components are provided by the akka-camel module and are described in the following subsections.
To access actors from custom Camel routes, the actor Camel component should be used. It fully supports Camel's asynchronous routing engine. This component accepts the following enpoint URI formats.
where <actorid> and <actoruuid> refer to actorRef.id and actorRef.uuid, respectively. By default, the actorRef.id is the actor's class name (but may be overriden by Actor implementations). In the following example, a custom route to an actor is created, using the actor's uuid (i.e. actorRef.uuid). The route starts from a jetty endpoint and ends at the target actor.
When the class is booted by the Akka Kernel, the target actor accepts HTTP requests to http://localhost:8877/camel/custom.
To access active object methods from custom Camel routes, the active-object Camel component should be used. It is a specialization of the Camel bean component. Applications should use the interface (endpoint URI syntax and options) as described in the bean component documentation. Active objects must be added to a Camel registry for being accessible by the active-object component.
The following example shows how to access active objects in a Spring application context. For adding active objects to the application context and for configuring and starting a CamelService the akka-spring module is used in the following example. It offers an <active-object> element to define active object factory beans and a <camel-service> element to start a CamelService.
SampleBean is a POJO that is turned into an active object by the <active-object> element.
The SampleRouteBuilder defines the custom route from the direct:test endpoint to the sample active object using an active-object endpoint URI.
The active-object endpoint URI syntax is
where bean-id is the id of the bean in the Spring application context and method-name is the name of the active object method to invoke.
Usage of the custom route for sending a message to the active object is shown in the following snippet.
The application uses a Camel producer template to access the active object via the direct:test endpoint.
Usage of akka-spring for adding active objects to the Camel registry and starting a CamelService is optional. Setting up a Spring-less application for accessing active objects is shown in the next example.
Here, SimpleRegistry, a java.util.Map based registry, is used to register active objects. The CamelService is started and stopped programmatically.
For all features described so far, there's running sample code in akka-sample-camel. The examples in sample.camel.Boot are started during Kernel startup because this class has been added to the boot configuration in akka-reference.conf.
If you don't want to have these examples started during Kernel startup, delete it from akka-reference.conf (or from akka.conf if you have a custom boot configuration). Alternatively, remove the akka-sample-camel jar from $AKKA_HOME/deploy. Other examples are standalone applications (i.e. classes with a main method) that can be started from sbt.
Some of the examples in akka-sample-camel are described in more detail in the following subsections.
This example demonstrates how to implement consumer and producer actors that support asynchronous in-out message exchanges with their Camel endpoints. The sample application transforms the content of the Akka homepage by replacing every occurence of Akka with AKKA. After starting the Akka Kernel, direct the browser to http://localhost:8875 and the transformed Akka homepage should be displayed. Please note that this example will probably not work if you're behind a proxy.
The following figure gives an overview how the example actors interact with external systems and with each other. A browser sends a GET request to http://localhost:8875 which is the published endpoint of the HttpConsumer actor. The HttpConsumer actor forwards the requests to the HttpProducer actor which retrieves the Akka homepage from http://akkasource.org. The retrieved HTML is then forwarded to the HttpTransformer actor which replaces all occurences of of Akka with AKKA. The transformation result is sent back the HttpConsumer which finally returns it to the browser.

Implementating the example actor classes and wiring them together is rather easy as shown in the following snippet (see also sample.camel.Boot).

This section also demonstrates the combined usage of a Producer and a Consumer actor as well as the inclusion of a custom Camel route. The following figure gives an overview.

The example is part of sample.camel.Boot. The consumer, transformer and producer actor implementations are as follows.
The producer actor knows where to reply the message to because the consumer and transformer actors have forwarded the original sender reference as well. The application configuration and the route starting from direct:welcome are as follows.
To run the example, start the Akka Kernel and POST a message to http://localhost:8877/camel/welcome.
The response should be
This section demonstrates how akka-camel can be used to implement publish/subscribe for actors. The following figure sketches an example for JMS-based publish/subscribe.

A consumer actor receives a message from an HTTP client. It sends the message to a JMS producer actor (publisher). The JMS producer actor publishes the message to a JMS topic. Two other actors that subscribed to that topic both receive the message. The actor classes used in this example are shown in the following snippet.
Wiring these actors to implement the above example is as simple as
To publish messages to subscribers one could of course also use the JMS API directly; there's no need to do that over a JMS producer actor as in this example. For the example to work, Camel's jms component needs to be configured with a JMS connection factory which is done in a Spring application context XML file (context-jms.xml).
To run the example, start the Akka Kernel and POST a message to http://localhost:8877/camel/pub/jms.
The HTTP response body should be
On the console, where you started the Akka Kernel, you should see something like
Publish/subscribe with CometD is equally easy using Camel's cometd component.

All actor classes from the JMS example can re-used, only the endpoint URIs need to be changed.

Camel
Module stability: STABLE
Table of Contents
Notice
Documentation on this page refers to the latest development version of the akka-camel module. Changes since Akka 0.9.1 are summarized in the Akka 0.10 release notes and the migration guide.
Introduction
Since version 0.7, Akka offers a new feature that let actors and active objects receive and send messages over a great variety of protocols and APIs. This section gives a brief overview of the general ideas behind Akka's Camel integration, the remaining sections go into the details. In addition to the native Scala actor API, actors can now exchange messages with other systems over large number of protcols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported. This new feature is provided by the akka-camel module.
At the core of this new feature is Apache Camel, a powerful and leight-weight integration framework for the JVM. For an introduction to Apache Camel you may want to read this article. Camel comes with a large number of components that provide bindings to different protocols and APIs. The camel-extra project provides further components. Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.{Message, Consumer} class MyActor extends Actor with Consumer { def endpointUri = "mina:tcp://localhost:6200?textline=true" def receive = { case msg: Message => { /* ... */} case _ => { /* ... */} } } // start and expose actor via tcp val myActor = actorOf[MyActor].start
The above example exposes an actor over a tcp endpoint on port 6200 via Apache Camel's Mina component. The endpointUri is an abstract method declared in the Consumer trait. After starting the actor, tcp clients can immediately send messages to and receive responses from that actor. If the message exchange should go over HTTP (via Camel's Jetty component), only the actor's endpointUri must be redefined.
class MyActor extends Actor with Consumer { def endpointUri = "jetty:http://localhost:8877/example" def receive = { case msg: Message => { /* ... */} case _ => { /* ... */} } }
Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.Producer class MyActor extends Actor with Producer { def endpointUri = "jms:queue:example" }
In the above example, any message sent to this actor will be added (produced) to the example JMS queue. Producer actors may choose from the same set of Camel components as Consumer actors do.
The number of Camel components is constantly increasing. The akka-camel module can support these in a plug-and-play manner. Just add them to your application's classpath, define a component-specific endpoint URI and use it to exchange messages over the component-specific protocols or APIs. This is possible because Camel components bind protocol-specific message formats to a Camel-specific normalized message format. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The akka-camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage, for example.
Consume messages
Both actors and active objects can receive (consume) messages from Camel endpoints. For actors to receive messages, they must mixin the Consumer trait. For example, the following actor class (Consumer1) implements the endpointUri method, which is declared in the Consumer trait, in order to receive messages from the file:data/input/actor Camel endpoint.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.{Message, Consumer} import se.scalablesolutions.akka.util.Logging class Consumer1 extends Actor with Consumer with Logging { def endpointUri = "file:data/input/actor" def receive = { case msg: Message => log.info("received %s" format msg.bodyAs[String]) } }
Whenever a file is put into the data/input/actor directory, its content is picked up by the Camel file component and sent as message to the actor. Messages consumed by actors from Camel endpoints are of type Message. These are immutable representations of Camel messages.
Here's another example that sets the endpointUri to jetty:http://localhost:8877/camel/default. It causes Camel's jetty component to start an embedded Jetty server, accepting HTTP connections from localhost on port 8877.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.{Message, Consumer} import se.scalablesolutions.akka.camel.Message class Consumer2 extends Actor with Consumer { def endpointUri = "jetty:http://localhost:8877/camel/default" def receive = { case msg: Message => self.reply("Hello %s" format msg.bodyAs[String]) } }
After starting the actor, clients can send messages to that actor by POSTing to http://localhost:8877/camel/default. The actor sends a response by using the self.reply method. For returning a message body and headers to the HTTP client the response type should be Message. For any other response type, a new Message object is created by akka-camel with the actor response as message body.
Not all Camel endpoints support two-way communications as jetty endpoints do. For example, file endpoints only support one-way communications. Using self.reply in this case would throw an exception (because there's no sender reference).
Active objects can also receive messages from Camel endpoints. In contrast to actors, which only implement a single receive method, the implementation POJO may define several (message processing) methods, each of which can receive messages from a different Camel endpoint. For an active object method to be exposed as Camel endpoint it must be annotated with the @consume. For example, the following ConsumerPojo defines two methods, foo and bar.
import org.apache.camel.Body; import org.apache.camel.Header; import se.scalablesolutions.akka.actor.annotation.consume; public class ConsumerPojo { @consume("file:data/input/pojo") public void foo(String body) { System.out.println("Received message:"); System.out.println(body); } @consume("jetty:http://localhost:8877/camel/pojo") public String bar(@Body String body, @Header("X-Whatever") String header) { return String.format("body=%s header=%s", body, header); } }
The foo method can be invoked by placing a file in the data/input/pojo directory. Camel picks up the file from this directory and akka-camel invokes foo with the file content as argument (converted to a String). Camel automatically tries to convert messages to appropriate types as defined by the method parameter(s). The conversion rules are described in detail on the following pages:
- Bean integration
- Bean binding
- Parameter binding
The bar method can be invoked by POSTing a message to http://localhost:8877/camel/pojo. Here, parameter binding annotations are used to tell Camel how to extract data from the HTTP message. The @Body annotation binds the HTTP request body to the first parameter, the @Header annotation binds the X-Whatever header to the second parameter. The return value is sent as HTTP response message body to the client.
Consumer publishing
Publishing a consumer actor at its Camel endpoint occurs when the actor is started. Publication is done asynchronously; setting up an endpoint (more precisely, the route from that endpoint to the actor) may still be in progress after the ActorRef.start method returned.
import se.scalablesolutions.akka.actor.Actor._ val actor = actorOf[Consumer1] // create Consumer actor actor.start // activate endpoint in background
Publishing of active object methods is done when the active object is created with one of the ActiveObject.newInstance(..) methods. Publication is done in the background here as well i.e. it may still be in progress when ActiveObject.newInstance(..) returns.
import se.scalablesolutions.akka.actor.ActiveObject; // create ConsumerPojo object, // turn into active object and // activate endpoint(s) in background ConsumerPojo activeObject = ActiveObject.newInstance(ConsumerPojo.class);
Publishing of consumer actors or active object methods requires a running CamelService. The Akka Kernel or the Akka Initializer (a ServletContextListener) start a CamelService automatically. When using Akka in other environments, a CamelService must be started manually. Applications can do that by creating a new CamelService instance and then start the service.
import se.scalablesolutions.akka.camel.CamelService // Create and start a new CamelService instance val service = CamelService.newInstance.load
If applications need to wait for a certain number of consumer actors or active object methods to be published they can do so with the CamelService.expectEndpointActivationCount method.
import se.scalablesolutions.akka.camel.CamelService // Create and start a new CamelService instance val service = CamelService.newInstance.load // Expect three endpoints to be activated val completion = service.expectEndpointActivationCount(3) // Start three consumer actors (for example) // ... // Wait for consumer actor publication (i.e. endpoint activation) to complete completion.await // Communicate with consumer actors via their activated endpoints // ...
The section Application configuration additionally describes how a CamelContext, that is managed by the CamelService, can be cutomized before starting the service. The CamelContext should be shut down when it is no longer needed.
// Shutdown the service service.unload
Consumer un-publishing
When an actor is stopped, the route from the endpoint to that actor is stopped as well. For example, stopping an actor that has been previously published at http://localhost:8877/camel/test will cause an HTTP 404 when trying to access that endpoint. Stopping the route is done asynchronously; it may be still in progress after the ActorRef.stop method returned.
import se.scalablesolutions.akka.actor.Actor._ val actor = actorOf[Consumer1] // create Consumer actor actor.start // activate endpoint in background // ... actor.stop // deactivate endpoint in background
When an active object is stopped, routes to @consume annotated methods of this active objects are stopped as well. Stopping the routes is done asynchronously; it may be still in progress after the ActiveObject.stop method returned.
// Create active object and activate endpoints in background ConsumerPojo obj = (ConsumerPojo) ActiveObject.newInstance(ConsumerPojo.class); // Deactivate endpoints in background ActiveObject.stop(obj)
Consumer timeout
Endpoints that support two-way communications need to wait for a response from an actor or active object before returning it to the initiating client. For some endpoint types, timeout values can be defined in an endpoint-specific way which is described in the documentation of the individual Camel components. Another option is to configure timeouts for comsumer actors and active objects. For active objects, timeout values for invoking methods that return a result, can be set when the active object is created. In the following example, the timeout is set to 20 seconds (default is 5 seconds).
import se.scalablesolutions.akka.actor.ActiveObject; ConsumerPojo activeObject = ActiveObject.newInstance(ConsumerPojo.class, 20000 /* 20 seconds */);
Two-way communications between a Camel endpoint and an actor are initiated by sending the request message to the actor with the ! (bang) operator and the actor replies to the endpoint when the response is ready. In order to support timeouts on actor-level, endpoints need to send the request message with the !! (bangbang) operator for which a timeout value is applicable. This can be achieved by overriding the Consumer.blocking method to return true.
class Consumer2 extends Actor with Consumer { self.timeout = 20000 // timeout set to 20 seconds override def blocking = true def endpointUri = "direct:example" def receive = { case msg: Message => self.reply("Hello %s" format msg.bodyAs[String]) } }
This is a valid approach for all endpoint types that do not "natively" support asynchronous two-way message exchanges. For all other endpoint types (like jetty endpoints) is it not recommended to switch to blocking mode but rather to configure timeouts in an endpoint-specific way (see also asynchronous routing).
Remote consumers
Publishing remote actors via Camel endpoint works for both client-initiated and server-initiated remote actors. It is always the actor on the remote node that is published, never the proxy. For example, the following actor is a remote actor that should be published at the endpoint URI jetty:http://localhost:6644/remote-actor-1.
import se.scalablesolutions.akka.actor.RemoteActor import se.scalablesolutions.akka.annotation.consume import se.scalablesolutions.akka.camel.Consumer class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { def endpointUri = "jetty:http://localhost:6644/remote-actor-1" protected def receive = { case msg => self.reply("response from remote actor 1") } }
To prepare a remote node to publish this actor, applications need to start a CamelService an that node.
import se.scalablesolutions.akka.camel.CamelService import se.scalablesolutions.akka.remote.RemoteNode // ... val camelService = CamelService.newInstance.load RemoteNode.start("localhost", 7777) // ...
Explicitly starting a CamelService can be omitted when Akka is running in Kernel mode, for example. The remote actor gets started and published on the remote node once a client application sends a message to that actor e.g.
import se.scalablesolutions.akka.actor.RemoteActor // ... val actor1 = actorOf[RemoteActor1].start actor1 ! "init" // initial message trigges creation and // publication of actor on remote nodeThis 'initialization message' is only needed when using client-managed remote actors. Server-managed remote actors are published immediately because they are explicitly created and started on the remote node.
Remote active objects are created with one of the ActiveObject.newRemoteInstance(..) methods. As with actors, the client must send an initial message (i.e. make an initial remote method call) to create and publish the active object remotely. The remote node must also have a CamelService running.
import se.scalablesolutions.akka.actor.ActiveObject // ... val obj = ActiveObject.newRemoteInstance(classOf[RemoteConsumerPojo1], "localhost", 7777) obj.foo("x", "y")) // initial remote method call trigges creation and // publication of active object methods on remote node
Produce messages
Producer trait
For sending messages to Camel endpoints actors need to mixin the Producer trait and implement the endpointUri method. For example, to POST a message to http://localhost:8080/news, the endpointUri method must be implemented as follows.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.Producer class Producer1 extends Actor with Producer { def endpointUri = "http://localhost:8080/news" }
The actor inherits a default implementation of receive method from the Producer trait. For Akka versions lower than 0.10, the Producer trait doesn't provide a default implementation of receive and actors need to implement this method as shown in the next example
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.Producer class Producer1 extends Actor with Producer { def endpointUri = "http://localhost:8080/news" def receive = produce // can be omitted with Akka 0.10 or higher }
Any message sent to a Producer actor will be sent to the associated Camel endpoint, in the above example to http://localhost:8080/news. Response messages (if supported by the configured endpoint) will by default be sent to the original sender. The following example uses the !! operator to send a message to a Producer actor and waits for a response.
import se.scalablesolutions.akka.actor.ActorRef class Client(producer: ActorRef) { def foo = { val response = producer !! "bar" // ... } }
If the message is sent by an actor using the ! operator then the response message is sent back asynchronously as outlined in the following example.
import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.camel.Message class Sender(producer: ActorRef) extends Actor { def receive = { case request: String => producer ! request case response: Message => { /* process response ... */ } // ... } }
Instead of replying to the initial sender, producer actors can implement custom reponse processing by overriding the receiveAfterProduce method. In the following example, the reponse message is forwarded to a target actor instead of being replied to the original sender.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.Producer class Producer1(target: ActorRef) extends Actor with Producer { def endpointUri = "http://localhost:8080/news" override protected def receiveAfterProduce = { // do not reply but forward result to target case msg => target forward msg } }
Before producing messages to endpoints producer actors can pre-process them by overriding the receiveBeforeProduce method.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.Producer class Producer1(target: ActorRef) extends Actor with Producer { def endpointUri = "http://localhost:8080/news" override protected def receiveBeforeProduce = { case msg: Message => { // do some pre-processing (e.g. add endpoint-specific message headers) // ... // and return the modified message msg } } }
Producer configuration options
The interaction of producer actors with Camel endpoints can be configured to be one-way or two-way (by initiating in-only or in-out message exchanges, respectively). By default, the producer initiates an in-out message exchange with the endpoint. For initiating an in-only exchange, producer actors either have to override the oneway method to return true
class Producer2 extends Actor with Producer { def endpointUri = "direct:welcome" override def oneway = true }
or mixin the Oneway trait.
// ... import se.scalablesolutions.akka.camel.Oneway class Producer2 extends Actor with Oneway { def endpointUri = "direct:welcome" }
Message correlation
To correlate request with response messages, applications can set the Message.MessageExchangeId message header.
import se.scalablesolutions.akka.camel.Message producer ! Message("bar", Map(Message.MessageExchangeId -> "123"))
Responses of type Message or Failure will contain that header as well. When receiving messages from Camel endpoints this message header is already set (see Consume messages).
Matching responses
The following code snippet shows how to best match responses when sending messages with the !! operator.
val response = producer !! message response match { case Some(Message(body, headers)) => ... case Some(Failure(exception, headers)) => ... case _ => ... }
ProducerTemplate
The Producer trait is a very convenient way for actors to produce messages to Camel endpoints. Actors and active objects may also use a Camel ProducerTemplate for producing messages to endpoints. For active objects it's the only way to produce messages to Camel endpoints. A managed ProducerTemplate instance can be obtained via CamelContextManager.template. In the following example, an actor uses a ProducerTemplate to send a one-way message to a direct:news endpoint.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.CamelContextManager class ProducerActor extends Actor { protected def receive = { // one-way message exchange with direct:news endpoint case msg => CamelContextManager.template.sendBody("direct:news", msg) } }
For a two-way message exchange, one of the ProducerTemplate.request* methods must be used.
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.camel.CamelContextManager class ProducerActor extends Actor { protected def receive = { // two-way message exchange with direct:news endpoint case msg => self.reply(CamelContextManager.template.requestBody("direct:news", msg)) } }
Active objects get access to a managed ProducerTemplate in the same way, as shown in the next example.
import se.scalablesolutions.akka.camel.CamelContextManager; public class ProducerPojo { public String foo(String msg) { ProducerTemplate template = CamelContextManager.template(); template.sendBody("direct:news", msg); } }
At the moment, only the Producer trait fully supports asynchronous in-out message exchanges with Camel endpoints without allocating a thread for the full duration of the exchange. For example, when using endpoints that support non-blocking IO (such as jetty endpoints that internally use Jetty's asynchronous HTTP client) then usage of the Producer trait is highly recommended (see also asynchronous routing).
Asynchronous routing
Since Akka 0.10, in-out message exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.
- A consumer endpoint sends request messages to its consumer actor using the ! (bang) operator and the actor returns responses with self.reply once they are ready. The sender reference used for reply is an adapter to Camel's asynchronous routing engine that implements the ActorRef trait.
- A producer actor sends request messages to its endpoint using Camel's asynchronous routing engine. Asynchronous responses are wrapped and added to the producer actor's mailbox for later processing. By default, response messages are returned to the initial sender but this can be overridden by Producer implementations (see also description of the receiveAfterProcessing method).
However, asynchronous two-way message exchanges, without allocating a thread for the full duration of exchange, cannot be generically supported by Camel's asynchronous routing engine alone. This must be supported by the individual Camel components (from which endpoints are created) as well. They must be able to suspend any work started for request processing (thereby freeing threads to do other work) and resume processing when the response is ready. This is currently the case for a subset of components such as the jetty component. All other Camel components can still be used, of course, but they will cause allocation of a thread for the duration of an in-out message exchange. There's also a running example that implements both, an asynchronous consumer and an asynchronous producer, with the jetty component.
Fault tolerance
Consumer actors and active objects can be also managed by supervisors. If a consumer is configured to be restarted upon failure the associated Camel endpoint is not restarted. It's behaviour during restart is as follows.
- A one-way (in-only) message exchange will be queued by the consumer and processed once restart completes.
- A two-way (in-out) message exchange will wait and either succeed after restart completes or time-out when the restart duration exceeds the configured timeout.
If a consumer is configured to be shut down upon failure, the associated endpoint is shut down as well. For details refer to the consumer un-publishing section.
CamelService configuration
For publishing consumer actors and active object methods, applications must start a CamelService. When starting Akka in Kernel mode or using the Akka Initializer in a web application, then a CamelService is started automatically. In all other cases (standalone applications) it must be started manually.
Standalone applications
Here's an example how a standalone application should create and start a CamelService.
import se.scalablesolutions.akka.camel.CamelService val service = CamelService.newInstance.load
Internally, the CamelService uses the CamelContextManager singleton to manage a CamelContext. A CamelContext manages the routes from endpoints to consumer actors and active objects. These routes are added and removed at runtime (when actors and active objects are started and stopped). Applications may additionally want to add their own custom routes or modify the CamelContext in some other way. This can be done by initializing the CamelContextManager manually and making modifications to CamelContext before the CamelService is started.
import org.apache.camel.builder.RouteBuilder import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.camel.CamelService // initialize CamelContextManager CamelContextManager.init // add a custom route to the managed CamelContext CamelContextManager.context.addRoutes(new CustomRouteBuilder) // create and start a CamelService val service = CamelService.newInstance.load // an application-specific route builder class CustomRouteBuilder extends RouteBuilder { def configure { // ... } }
Applications may even provide their own CamelContext instance as argument to the init method call as shown in the following snippet. Here, a DefaultCamelContext is created using a Spring application context as registry.
import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.camel.CamelService // create a custom Camel registry backed up by a Spring application context val context = new ClassPathXmlApplicationContext("/context.xml") val registry = new ApplicationContextRegistry(context) // initialize CamelContextManager with a DefaultCamelContext using the custom registry CamelContextManager.init(new DefaultCamelContext(registry)) // ... // create and start a CamelService val service = CamelService.newInstance.load
Standalone Spring applications
A better approach to configure a Spring application context as registry for the CamelContext is to use Camel's Spring support. Furthermore, Akka's Spring module additionally supports a <camel-service> element for creating and starting a CamelService. An optional reference to a custom CamelContext can be defined as well. Here's an example.
<!-- context.xml --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:akka="http://akkasource.org/schema/akka" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://akkasource.org/schema/akka http://scalablesolutions.se/akka/akka-0.10.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <!-- A custom CamelContext (SpringCamelContext) --> <camel:camelContext id="camelContext"> <!-- ... --> </camel:camelContext> <!-- Create a CamelService using a custom CamelContext --> <akka:camel-service> <akka:camel-context ref="camelContext" /> </akka:camel-service> </beans>
Creating a CamelContext this way automatically adds the defining Spring application context as registry to that CamelContext. The CamelService is started when the application context is started and stopped when the application context is closed. A simple usage example is shown in the following snippet.
import org.springframework.context.support.ClassPathXmlApplicationContext import se.scalablesolutions.akka.camel.CamelContextManager // Create and start application context (start CamelService) val appctx = new ClassPathXmlApplicationContext("/context.xml") // Access to CamelContext (SpringCamelContext) val ctx = CamelContextManager.context // Access to ProducerTemplate of that CamelContext val tpl = CamelContextManager.template // use ctx and tpl ... // Close application context (stop CamelService) appctx.close
If the CamelService doesn't reference a custom CamelContext then a default CamelContext is created (and accessible via the CamelContextManager).
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:akka="http://akkasource.org/schema/akka" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://akkasource.org/schema/akka http://scalablesolutions.se/akka/akka-0.10.xsd"> <!-- Create a CamelService using DefaultCamelContext --> <akka:camel-service /> </beans>
Kernel mode
For classes that are loaded by the Kernel or the Initializer, creation and starting of a CamelService instance can be omitted, as discussed in the previous section. Since these classes are loaded and instantiated before the CamelService is started (by Akka), application can make modifications to a CamelContext here as well (and even provide their own CamelContext). Assuming there's a boot class sample.camel.Boot configured in akka.conf.
<akka> ... boot = ["sample.camel.Boot"] ... </akka>
Modifications to the CamelContext can be done like in the following snippet.
package sample.camel import org.apache.camel.builder.RouteBuilder import se.scalablesolutions.akka.camel.CamelContextManager class Boot { CamelContextManager.init() // Customize CamelContext with application-specific routes CamelContextManager.context.addRoutes(new CustomRouteBuilder) // no need to create and start CamelService here ... } class CustomRouteBuilder extends RouteBuilder { def configure { // ... } }
Custom Camel routes
Besides automated setup of routes, applications may also define custom routes to actors and active objects using the actor and active-object Camel components, respectively. These components are provided by the akka-camel module and are described in the following subsections.
Access to actors
To access actors from custom Camel routes, the actor Camel component should be used. It fully supports Camel's asynchronous routing engine. This component accepts the following enpoint URI formats.
- actor:<actorid>
- actor:id:<actorid>
- actor:uuid:<actoruuid>
where <actorid> and <actoruuid> refer to actorRef.id and actorRef.uuid, respectively. By default, the actorRef.id is the actor's class name (but may be overriden by Actor implementations). In the following example, a custom route to an actor is created, using the actor's uuid (i.e. actorRef.uuid). The route starts from a jetty endpoint and ends at the target actor.
import org.apache.camel.builder.RouteBuilder import org.apache.camel.Exchange import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.CamelContextManager class Boot { val target = actorOf[CustomRouteTarget].start CamelContextManager.init() CamelContextManager.context.addRoutes(new CustomRouteBuilder(target.uuid)) class CustomRouteTarget extends Actor { def receive = { case msg: Message => self.reply("Hello %s" format msg.bodyAs[String]) } } class CustomRouteBuilder(uuid: String) extends RouteBuilder { def configure { val actorUri = "actor:uuid:%s" format uuid from("jetty:http://localhost:8877/camel/custom").to(actorUri) } }
When the class is booted by the Akka Kernel, the target actor accepts HTTP requests to http://localhost:8877/camel/custom.
Access to active objects
To access active object methods from custom Camel routes, the active-object Camel component should be used. It is a specialization of the Camel bean component. Applications should use the interface (endpoint URI syntax and options) as described in the bean component documentation. Active objects must be added to a Camel registry for being accessible by the active-object component.
Using Spring
The following example shows how to access active objects in a Spring application context. For adding active objects to the application context and for configuring and starting a CamelService the akka-spring module is used in the following example. It offers an <active-object> element to define active object factory beans and a <camel-service> element to start a CamelService.
<!-- context.xml --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:akka="http://akkasource.org/schema/akka" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://akkasource.org/schema/akka http://scalablesolutions.se/akka/akka-0.10.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="routeBuilder" class="sample.SampleRouteBuilder" /> <camel:camelContext id="camelContext"> <camel:routeBuilder ref="routeBuilder" /> </camel:camelContext> <akka:camel-service> <akka:camel-context ref="camelContext" /> </akka:camel-service> <akka:active-object id="sample" target="sample.SampleBean" timeout="1000" /> </beans>
SampleBean is a POJO that is turned into an active object by the <active-object> element.
package sample; public class SampleBean { public String foo(String s) { return "hello " + s; } }
The SampleRouteBuilder defines the custom route from the direct:test endpoint to the sample active object using an active-object endpoint URI.
package sample import org.apache.camel.builder.RouteBuilder class SampleRouteBuilder extends RouteBuilder { def configure = { // route to active object from("direct:test").to("active-object:sample?method=foo") } }
The active-object endpoint URI syntax is
- active-object:<bean-id>?method=<method-name>
where bean-id is the id of the bean in the Spring application context and method-name is the name of the active object method to invoke.
Usage of the custom route for sending a message to the active object is shown in the following snippet.
package sample import org.springframework.context.support.ClassPathXmlApplicationContext import se.scalablesolutions.akka.camel.CamelContextManager // load Spring application context (starts CamelService) val appctx = new ClassPathXmlApplicationContext("/context-standalone.xml") // access 'sample' active object via custom route assert("hello akka" == CamelContextManager.template.requestBody("direct:test", "akka")) // close Spring application context (stops CamelService) appctx.close
The application uses a Camel producer template to access the active object via the direct:test endpoint.
Without Spring
Usage of akka-spring for adding active objects to the Camel registry and starting a CamelService is optional. Setting up a Spring-less application for accessing active objects is shown in the next example.
package sample import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry} import se.scalablesolutions.akka.camel.{CamelService, CamelContextManager} import se.scalablesolutions.akka.actor.ActiveObject // register active object val registry = new SimpleRegistry registry.put("sample", ActiveObject.newInstance(classOf[SampleBean])) // customize CamelContext CamelContextManager.init(new DefaultCamelContext(registry)) CamelContextManager.context.addRoutes(new SampleRouteBuilder) // start CamelService val camelService = CamelService.newInstance.load // access 'sample' active object via custom route assert("hello akka" == CamelContextManager.template.requestBody("direct:test", "akka")) // stop CamelService camelService.unload
Here, SimpleRegistry, a java.util.Map based registry, is used to register active objects. The CamelService is started and stopped programmatically.
Examples
For all features described so far, there's running sample code in akka-sample-camel. The examples in sample.camel.Boot are started during Kernel startup because this class has been added to the boot configuration in akka-reference.conf.
<akka> ... boot = ["sample.camel.Boot", ...] ... </akka>
If you don't want to have these examples started during Kernel startup, delete it from akka-reference.conf (or from akka.conf if you have a custom boot configuration). Alternatively, remove the akka-sample-camel jar from $AKKA_HOME/deploy. Other examples are standalone applications (i.e. classes with a main method) that can be started from sbt.
$ sbt [info] Building project akka 0.10 against Scala 2.8.0.RC3 [info] using AkkaParent with sbt 0.7.4 and Scala 2.7.7 > project akka-sample-camel Set current project to akka-sample-camel 0.10 > run ... Multiple main classes detected, select one to run: [1] sample.camel.ClientApplication [2] sample.camel.ServerApplication [3] sample.camel.StandaloneApplication [4] sample.camel.StandaloneSpringApplication
Some of the examples in akka-sample-camel are described in more detail in the following subsections.
Asynchronous consumer-producer example
This example demonstrates how to implement consumer and producer actors that support asynchronous in-out message exchanges with their Camel endpoints. The sample application transforms the content of the Akka homepage by replacing every occurence of Akka with AKKA. After starting the Akka Kernel, direct the browser to http://localhost:8875 and the transformed Akka homepage should be displayed. Please note that this example will probably not work if you're behind a proxy.
The following figure gives an overview how the example actors interact with external systems and with each other. A browser sends a GET request to http://localhost:8875 which is the published endpoint of the HttpConsumer actor. The HttpConsumer actor forwards the requests to the HttpProducer actor which retrieves the Akka homepage from http://akkasource.org. The retrieved HTML is then forwarded to the HttpTransformer actor which replaces all occurences of of Akka with AKKA. The transformation result is sent back the HttpConsumer which finally returns it to the browser.

Implementating the example actor classes and wiring them together is rather easy as shown in the following snippet (see also sample.camel.Boot).
import org.apache.camel.Exchange import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.camel.{Producer, Message, Consumer} class HttpConsumer(producer: ActorRef) extends Actor with Consumer { def endpointUri = "jetty:http://0.0.0.0:8875/" protected def receive = { case msg => producer forward msg } } class HttpProducer(transformer: ActorRef) extends Actor with Producer { def endpointUri = "jetty://http://akkasource.org/?bridgeEndpoint=true" override protected def receiveBeforeProduce = { // only keep Exchange.HTTP_PATH message header (which needed by bridge endpoint) case msg: Message => msg.setHeaders(msg.headers(Set(Exchange.HTTP_PATH))) } override protected def receiveAfterProduce = { // do not reply but forward result to transformer case msg => transformer forward msg } } class HttpTransformer extends Actor { protected def receive = { case msg: Message => self.reply(msg.transformBody[String] {_ replaceAll ("Akka ", "AKKA ")}) case msg: Failure => self.reply(msg) } } // Wire and start the example actors val httpTransformer = actorOf(new HttpTransformer).start val httpProducer = actorOf(new HttpProducer(httpTransformer)).start val httpConsumer = actorOf(new HttpConsumer(httpProducer)).startThe jetty endpoints of HttpConsumer and HttpProducer support asynchronous in-out message exchanges and do not allocate threads for the full duration of the exchange. This is achieved by using Jetty continuations on the consumer-side and by using Jetty's asynchronous HTTP client on the producer side. The following high-level sequence diagram illustrates that.

Custom Camel route example
This section also demonstrates the combined usage of a Producer and a Consumer actor as well as the inclusion of a custom Camel route. The following figure gives an overview.

- A consumer actor receives a message from an HTTP client.
- It forwards the message to another actor that transforms the message (encloses the original message into hyphens).
- The transformer actor forwards the transformed message to a producer actor.
- The producer actor sends the message to a custom Camel route beginning at the direct:welcome enpoint.
- A processor (transformer) in the custom Camel route prepends "Welcome" to the original message and creates a result message
- The producer actor sends the result back to the consumer actor which returns it to the HTTP client.
The example is part of sample.camel.Boot. The consumer, transformer and producer actor implementations are as follows.
package sample.camel import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.camel.{Message, Consumer} class Consumer3(transformer: ActorRef) extends Actor with Consumer { def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" def receive = { // Forward a string representation of the message body to transformer case msg: Message => transformer.forward(msg.setBodyAs[String]) } } class Transformer(producer: ActorRef) extends Actor { protected def receive = { // example: transform message body "foo" to "- foo -" and forward result to producer case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) } } class Producer1 extends Actor with Producer { def endpointUri = "direct:welcome" }
The producer actor knows where to reply the message to because the consumer and transformer actors have forwarded the original sender reference as well. The application configuration and the route starting from direct:welcome are as follows.
package sample.camel import org.apache.camel.builder.RouteBuilder import org.apache.camel.{Exchange, Processor} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.CamelContextManager class Boot { CamelContextManager.init() CamelContextManager.context.addRoutes(new CustomRouteBuilder) val producer = actorOf[Producer1] val mediator = actorOf(new Transformer(producer)) val consumer = actorOf(new Consumer3(mediator)) producer.start mediator.start consumer.start } class CustomRouteBuilder extends RouteBuilder { def configure { from("direct:welcome").process(new Processor() { def process(exchange: Exchange) { // Create a 'welcome' message from the input message exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) } }) } }
To run the example, start the Akka Kernel and POST a message to http://localhost:8877/camel/welcome.
curl -H "Content-Type: text/plain" -d "Anke" http://localhost:8877/camel/welcome
The response should be
Welcome - Anke -
Publish/Subcribe example
JMS
This section demonstrates how akka-camel can be used to implement publish/subscribe for actors. The following figure sketches an example for JMS-based publish/subscribe.

A consumer actor receives a message from an HTTP client. It sends the message to a JMS producer actor (publisher). The JMS producer actor publishes the message to a JMS topic. Two other actors that subscribed to that topic both receive the message. The actor classes used in this example are shown in the following snippet.
package sample.camel import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.camel.{Producer, Message, Consumer} import se.scalablesolutions.akka.util.Logging class Subscriber(name:String, uri: String) extends Actor with Consumer with Logging { def endpointUri = uri protected def receive = { case msg: Message => log.info("%s received: %s" format (name, msg.body)) } } class Publisher(name: String, uri: String) extends Actor with Producer { self.id = name def endpointUri = uri // one-way communication with JMS override def oneway = true } class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer { def endpointUri = uri protected def receive = { case msg: Message => { publisher ! msg.bodyAs[String] self.reply("message published") } } }
Wiring these actors to implement the above example is as simple as
package sample.camel import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.CamelContextManager class Boot { // Create CamelContext with Spring-based registry and custom route builder val context = new ClassPathXmlApplicationContext("/context-jms.xml", getClass) val registry = new ApplicationContextRegistry(context) CamelContextManager.init(new DefaultCamelContext(registry)) // Setup publish/subscribe example val jmsUri = "jms:topic:test" val jmsSubscriber1 = actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start val jmsSubscriber2 = actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start val jmsPublisher = actorOf(new Publisher("jms-publisher", jmsUri)).start val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start }
To publish messages to subscribers one could of course also use the JMS API directly; there's no need to do that over a JMS producer actor as in this example. For the example to work, Camel's jms component needs to be configured with a JMS connection factory which is done in a Spring application context XML file (context-jms.xml).
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <!-- ================================================================== --> <!-- Camel JMS component and ActiveMQ setup --> <!-- ================================================================== --> <bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> <property name="configuration" ref="jmsConfig"/> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="jmsConnectionFactory"/> </bean> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://testbroker"/> </bean> </beans>
To run the example, start the Akka Kernel and POST a message to http://localhost:8877/camel/pub/jms.
curl -H "Content-Type: text/plain" -d "Happy hAkking" http://localhost:8877/camel/pub/jms
The HTTP response body should be
message published
On the console, where you started the Akka Kernel, you should see something like
... INF [20100622-11:49:57.688] camel: jms-subscriber-2 received: Happy hAkking INF [20100622-11:49:57.688] camel: jms-subscriber-1 received: Happy hAkking
Cometd
Publish/subscribe with CometD is equally easy using Camel's cometd component.

All actor classes from the JMS example can re-used, only the endpoint URIs need to be changed.
package sample.camel import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.CamelContextManager class Boot { // ... // Setup publish/subscribe example val cometdUri = "cometd://localhost:8111/test/abc?resourceBase=target" val cometdSubscriber = actorOf(new Subscriber("cometd-subscriber", cometdUri)).start val cometdPublisher = actorOf(new Publisher("cometd-publisher", cometdUri)).start val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start }