Implementing The Pipes and Filters Pattern using Actors in Akka for Java

Written on December 26, 2016

In this article we will introduce a couple of interesting concepts from Akka by giving an overview of how to implement the pipes and filters enterprise integration pattern. Akka is a popular library that provides new approaches to write modern reactive software in Java and Scala.

If you are generally interested about learning how your team can adopt and benefit from different reactive approaches, check out our two-day “Reactive and Asynchronous Java” intensive course.

The Business Problem

Recently we came across an author publishing application made available as a service. It was responsible for processing markdown text. It would execute a series of operations back to back:

  1. Warn authors if they were using forbidden short forms such as “isn’t” or “I’m” (let’s call this the “text checker”)
  2. Transform Latex code to Unicode. E.g. \alpha to α (let’s call this the “latex to unicode transformer”)
  3. Upload the result to an S3 bucket so it is publicly available (an “S3 uploader”)

After playing with the service, it came clear that it would benefit from several extensions:

  1. Use the Latex to Unicode functionality to transform text independently of the text checking functionality
  2. Add another functionality to translate the text using American-english spelling
  3. Create a bespoke chain of functionalities, for example just text checker + S3 uploader

The software engineering problem

The problem with this application is that the service was written in a way that violates the single responsibility principle. It was written as one single god class implementation. The different functionalities were coupled together inside a single pass. For example, one could not use the Latex to Unicode transformer without calling the text checker functionality. Consequently, it was difficult to create new or reuse existing behaviour to create a different pipeline of functionalities.

Here’s a simplified version of the problematic code and associated diagram to illustrate the problem:

public class PipesAndFilterProblem {
    public static void main(String[] args) {
        String[] forbiddenWords = {"isn't", "i'm", "don't"};
        String message = "I'm feeling \\beta!";
        String messageLowerCase = message.toLowerCase();
        if(Arrays.stream(forbiddenWords).noneMatch(messageLowerCase::contains)) {
            String converted =  message.replaceAll("\\\\alpha", "α").replaceAll("\\\\beta", "β");
            // ... upload to S3
            System.out.println(converted);
        }
    }
}

Tightly coupled publishing pipeline

The above code has two key issues that we need to address. How do you reorder functionalities? For example Latex to Unicode first and then the text checker. How do you add a new functionality? For example the text translation.

Pipes and filters

The pipes and filters pattern is a solution for this problem. You break down a complex task into independent components which you can then compose together to create a complex chain. There are several benefits with this approach:

  1. Each component is decoupled from one another and can be maintained independently
  2. Each component can also be tested in isolation
  3. You can re-use components to create different chains

Pipes and filters EIP

Think about the ways commands in unix work. You can say cat file | uniq | wc -l Each program is loosely coupled from one another but can also be arbitrarily composed to create complex pipelines.

A filter is a piece of functionality to execute. A pipe is simply a mechanism to pass the output of one filter to become the input of another one.

One can distinguish two approaches for delivering the output of one filter to another filter:

For the purpose of our business problem, we will implement the point-to-point semantics.

Akka

Akka is a library available in both Java and Scala to build distributed, message-based applications. We will focus on using a new abstraction provided by Akka called Actors.

In simplest terms, an actor is a component which processes messages off a queue (its “mailbox”). Actors have other interesting characteristics such as: * Actors do not expose internal state, which reduces the scope for concurrency bugs * Messages are sent to actors asynchronously, which enables flexible scheduling of the processing of the messages * Actors can be executed on a distributed system

The diagram below exemplifies the interaction between two actors.

Two actors communicating

So how do you implement an actor? The code below represents the text checker functionality as an independent actor written in Akka. It receives messages to process and check for the use of short forms. If the message is correct it is messaged back to the sender. Don’t worry about all the details of this code, we encourage user to take a deeper look at the Akka API if you are curious. In a nutshell:

You can have a play with the code by modifying the unit tests below. The code is kept simple for the purpose of the example but one can imagine that a text checker would go through an extensive list of words loaded from a file.

public class SingleTextCheckerActor extends UntypedActor {
    private final static String[] forbiddenWords = {"isn't", "i'm", "don't"};

    @Override
    public void onReceive(Object message) throws Throwable {
        String lowercaseMessage = ((String) message).toLowerCase();
        if(!containsForbiddenWord(lowercaseMessage)){
            getSender().tell(message, getSelf());
        }
    }

    private boolean containsForbiddenWord(String message) {
        return Arrays.stream(forbiddenWords)
                     .anyMatch(message::contains);
    }
}
public class SingleTextCheckerActorTest {
    private ActorSystem system;

    @Test
    public void testMessageDoesNotContainForbiddenWord() {
        JavaTestKit testProbe = new JavaTestKit(system);
        Props props = Props.create(SingleTextCheckerActor.class);
        ActorRef subject = system.actorOf(props);
        String msg = "I am waiting for you.";
        subject.tell(msg, testProbe.getRef());
        testProbe.expectMsgEquals(duration("1 second"), msg);
    }

    @Test
    public void testMessageContainsForbiddenWord() {
        JavaTestKit testProbe = new JavaTestKit(system);
        Props props = Props.create(SingleTextCheckerActor.class);
        ActorRef subject = system.actorOf(props);
        subject.tell("I'm feeling good!", testProbe.getRef());
        testProbe.expectNoMsg(duration("1 second"));
    }

    @Before
    public void setup() {
        system = ActorSystem.create();
    }

    @After
    public void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }
}

So how can you extend this to implement the pipe and filters pattern? In a nutshell, a filter can be seen an actor processing a message. A pipe is the mailbox of another actor.

Pipes and filters using actors

We need a way for our text checker actor to send its output to another actor who will process the message. In fact, all actors should have the ability to pass on the result to another actor. By doing this you can build a chain of functionalities. Let’s extend our text checker actor so it can send a message to another actor:

public class TextCheckerActor extends UntypedActor {

    private ActorRef nextActor;
    private final static String[] forbiddenWords = {"isn't", "i'm", "don't"};
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    public TextCheckerActor(ActorRef nextActor) {
        this.nextActor = nextActor;
    }

    @Override
    public void onReceive(Object message) throws Throwable {
        log.info("Received Message: " + message);
        String lowercaseMessage = ((String) message).toLowerCase();
        if(!containsForbiddenWord(lowercaseMessage)){
            nextActor.tell(message, getSelf());
        }
    }

    private boolean containsForbiddenWord(String message) {
        return Arrays.stream(forbiddenWords)
                .anyMatch(message::contains);
    }
}

We can create another actor to convert latex symbols to unicode. This actor will act as another filter:

public class LatexToUnicodeActor extends UntypedActor {

    private ActorRef nextActor;
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    public LatexToUnicodeActor(ActorRef nextActor) {
        this.nextActor = nextActor;
    }

    @Override
    public void onReceive(Object message) throws Throwable {
        log.info("Received Message: " + message);

        String msg = (String) message;
        String result =  msg.replaceAll("\\\\alpha", "α")
                            .replaceAll("\\\\beta", "β");

        nextActor.tell(result, getSelf());
    }
}

Now that you have two filters, how do you connect them together? Here’s a test that puts it all together:

public class PipesAndFilterTest {

    private ActorSystem system;

    @Test
    public void testTextCheckerAndLatextoUnicodePipeline() {
        JavaTestKit endProbe = new JavaTestKit(system);
        // LatexToUnicode filter
        Props latexToUnicodeProps = Props.create(LatexToUnicodeActor.class, endProbe.getRef());
        ActorRef latexToUnicodeActor = system.actorOf(latexToUnicodeProps, "latex-to-unicode-actor");
        // TextChecker filter
        Props textCheckerProps = Props.create(TextCheckerActor.class, latexToUnicodeActor);
        ActorRef textCheckerActor = system.actorOf(textCheckerProps, "text-checker-actor");
        // test
        textCheckerActor.tell("I think the answer is \\alpha + \\beta", textCheckerActor);
        endProbe.expectMsgEquals(duration("1 second"), "I think the answer is α + β");
    }

    @Test
    public void testTextCheckerAndNoLatextoUnicodePipeline() {
        JavaTestKit endProbe = new JavaTestKit(system);
        // LatexToUnicode filter
        Props latexToUnicodeProps = Props.create(LatexToUnicodeActor.class, endProbe.getRef());
        ActorRef latexToUnicodeActor = system.actorOf(latexToUnicodeProps, "latex-to-unicode-actor");
        // TextChecker filter
        Props textCheckerProps = Props.create(TextCheckerActor.class, latexToUnicodeActor);
        ActorRef textCheckerActor = system.actorOf(textCheckerProps, "text-checker-actor");
        // test
        textCheckerActor.tell("I'm wondering whether the answer is \\alpha + \\beta", textCheckerActor);
        endProbe.expectNoMsg(duration("1 second"));
    }

    @Before
    public void setup() {
        system = ActorSystem.create();
    }

    @After
    public void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }
}

Benefits and Summary

Hopefully through this short article you got an overview of how you can get started with working with Akka and actors. You may be wondering what are the benefits of using actors here instead of perhaps implementing this yourself or by using a messaging queue system?

Note that if distribution of the services is not an issue, we could have simply solved the problem using function composition which is more lightweight in comparison to adding actors. However, using actors you get distribution and elasticity thanks to their location transparency property as well as support for logging and monitoring provided by Akka. If you are looking for a more supportive library that provides implementations for Enterprise Integration Patterns, you should look at Apache Camel and Spring Integration.

To learn more about how to use actors, we recommend you check out the Akka documentation. We’ll be following up in the future with additional articles that explore the facilities offered by Akka.

If you are interested in learning more about how you can benefit from reactive approaches, check out our popular two-day “Reactive and Asynchronous Java” in-house course.