Reactive Kafka + Slow Consumers = Diagnosis Nightmare

Recently I’ve been working with the combination of Reactive Streams (in the form of Akka Streams) and Kafka, as it’s a good fit for some of the systems we’re building at work.

I hope it’ll be beneficial to others to share a particular nuance I discovered whilst working with this combintion, in particular a problem with slow downstream consumers.

To give a brief overview, we were taking messages from a Kafka topic and then sending them as the body of http post requests. This was working fine for the majority of the time, as we only get a message every couple of seconds normally.

However, the issues came when we had to deal with an influx of messages, from an upstream source that every now and then batches about 300 messages and pushes them onto Kafka. What made it even more difficult is that we didn’t know this was a contributing factor at the time…

So what happened? We saw a lot of rebalancing exceptions happen in the consumer, and also because we were using non-committing Kafka consumers, all the messages from offset 0 were constantly being re-read every second or so as a result of the constant rebalancing. Also when you try and use the kafka-consumer-groups script that comes with Kafka, you don’t get a list of partitions and consumers, but a notification that the consumer group either doesn’t exist or is rebalancing.

It turns out, Kafka was constantly redistributing the partitions across the 2 nodes within my affected consumer group. I can’t recall how I eventually figured this out, but the root cause was combining kafka in a reactive stream with a slow downstream consumer (http).

At the time of writing we’re using akka-stream-kafka 0.11-M3, and it has an “interesting” behaviour when working with slow downstream consumers – it stops its scheduled polling when there is no downstream demand, which in turn stops its heartbeating back to Kafka. Because of this, whenever the stream was applying backpressure (because we were waiting on http responses), the backpressure propogated all the way back to the Kafka consumer, which in turn stopped heartbeating.

To replicate this, I created the following Kafka topic:
./ --create --zookeeper --topic test_topic --replication-factor 3 --partitions 6

Then I used this code to publish messages onto Kafka, and ran two of these consumers to consume in parallel within the same Kafka consumer group.

What this causes the Kafka broker to do (at least with its default configuration) is to consider that node as slow or unavailable, which triggers a rebalancing of partitions to other nodes (which it deems might be available to pick up the slack). That’s why when I kept reviewing the state of kafka-consumer-groups, you’d eventually see all partitions being consumed by one node, then the other, then getting the rebalancing message. And because both of our nodes were using non-committing consumers, they both kept receiving the full backlog of messages, meanining they both became overwhelmed with messages and applied backpressure, which meant Kafka kept reassigning partitions… it was a vicious cycle!

Using the kafka-consumer-groups script you can see this happening:

benfoster$ ./ --new-consumer --bootstrap-server --describe --group test-consumer
test-consumer, test_topic, 3, unknown, 3, unknown, consumer-1_/
test-consumer, test_topic, 4, unknown, 2, unknown, consumer-1_/
test-consumer, test_topic, 5, unknown, 3, unknown, consumer-1_/
test-consumer, test_topic, 0, unknown, 3, unknown, consumer-1_/
test-consumer, test_topic, 1, unknown, 2, unknown, consumer-1_/
test-consumer, test_topic, 2, unknown, 3, unknown, consumer-1_/
benfoster$ ./ --new-consumer --bootstrap-server --describe --group test-consumer
test-consumer, test_topic, 3, unknown, 3, unknown, consumer-1_/
test-consumer, test_topic, 4, unknown, 2, unknown, consumer-1_/
test-consumer, test_topic, 5, unknown, 3, unknown, consumer-1_/
test-consumer, test_topic, 0, unknown, 3, unknown, consumer-1_/
test-consumer, test_topic, 1, unknown, 2, unknown, consumer-1_/
test-consumer, test_topic, 2, unknown, 3, unknown, consumer-1_/
benfoster$ ./ --new-consumer --bootstrap-server --describe --group test-consumer
test-consumer, test_topic, 0, unknown, 75, unknown, consumer2_/
test-consumer, test_topic, 1, unknown, 74, unknown, consumer2_/
test-consumer, test_topic, 2, unknown, 75, unknown, consumer2_/
test-consumer, test_topic, 3, unknown, 75, unknown, consumer2_/
test-consumer, test_topic, 4, unknown, 75, unknown, consumer2_/
test-consumer, test_topic, 5, unknown, 75, unknown, consumer2_/
benfoster$ ./ --new-consumer --bootstrap-server --describe --group test-consumer
Consumer group `test-consumer` does not exist or is rebalancing.
benfoster$ ./ --new-consumer --bootstrap-server --describe --group test-consumer
Consumer group `test-consumer` does not exist or is rebalancing.

And within my consumer’s app logs, you can see it constantly rereading the same messages:

2016-07-01 09:37:37,171 [] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 0, offset = 0, key = null, value = test2)
2016-07-01 09:42:07,344 [] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 0, offset = 0, key = null, value = test2)
2016-07-01 09:38:57,217 [] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 1, offset = 3, key = null, value = test24)
2016-07-01 09:43:37,390 [] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 1, offset = 3, key = null, value = test24)

So how did we fix this? Thankfully for us we knew that the number of elements ever to appear in a batch would be small (few hundred elements) so we added an in-memory buffer to the stream, which meant we could buffer all these for the http endpoint to eventually process and Kafka would be unaffected. This was a quick fix and got us what we needed.

As soon as you add a buffer, the two consumers behave, and you get this:

benfoster$ ./ --new-consumer --bootstrap-server --describe --group test-consumer
test-consumer, test_topic, 3, unknown, 87, unknown, consumer2_/
test-consumer, test_topic, 4, unknown, 86, unknown, consumer2_/
test-consumer, test_topic, 5, unknown, 86, unknown, consumer2_/
test-consumer, test_topic, 0, unknown, 87, unknown, consumer1_/
test-consumer, test_topic, 1, unknown, 86, unknown, consumer1_/
test-consumer, test_topic, 2, unknown, 86, unknown, consumer1_/

Is it the right fix? Probably not, if we were dealing with greater volume or velocity we’d have to treat this app as a slow consumer, and possibly ditch the reactive streams abstraction in favour of utilising the lower level Kafka API to ensure we had full control over our partition and heartbeat management. But that constitutes a dedicated article(s) in its own right.

I hope someone else finds this useful, one of the mishaps you can have when you abstract so far away you don’t realise the issues that could occur beneath the woodwork.

I’ve uploaded the source code for the reactive producer (shamelessly ripped from an Activator template, and the ticker publisher code from my friend Neil Dunlop) and consumer I used if you’d like to replicate the scenario, You’ll need a Kafka broker running:

Road to Continuous Deployment Part 1 – Performance Testing

The next set of Articles are documenting a presentation I’m working on to demonstrate Continuous Delivery, and how in a world where CD is desired, it’s becoming increasingly important for the appropriate level of testing to happen. I’ll be breaking them down to tackle one topic at a time, finally building up to a full demo on how I feel CD can work.

Firstly, I’m going to start with a problem CI/CD can address – Performance Testing. Not so much on how to performance test or how CI/CD can build systems that don’t require it, but how a continuous delivery pipeline can quickly alert developers to potential problems that could otherwise remain undetected and take ages to debug once in production.

Nothing new or groovy here, but a problem most developers (and definitely sql developers) are familiar with: the N+1 problem. Java developers using Hibernate will be familiar with the @OneToMany annotation, but less so on the @Fetch annotation, and even less so on the implications of changing the FetchMode. I aim to demonstrate how something as simple as changing a Hibernate @OneToMany Fetch strategy can drastically affect the performance of a system

There are several good reasons why you may want to do this:
* This is a reasonable thing to do: Maybe you want to lazy load children, no point eagerly loading all details with a join
* Might not always be a bad thing to do (perhaps most of the time only a few children are accessed, which negates the overall performance impace) but testing should be done to assess the potential impact on performance

Side-bar: The demo project for this article was originally ported from the spring boot data rest example within their samples, however the @Fetch annotation appears to be ignored, which makes it difficult to demonstrate.
This article gives a good direction on what I expected to happen and what the problem likely is:
I suspect the spring boot configuration doesn’t use the Criteria API behind the scenes, which means the @Fetch annotation will be ignored.

The application is a simple school class registration system, with the domain modelled around classes and students. One GET resource is available which returns all classes and students as children nodes. Below is a sample of the returned json:

    "id": 3,
    "className": "Oakleaf Grammar School - Class 3",
    "students": [
        "id": 970,
        "firstName": "Marie",
        "lastName": "Robinson"
        "id": 2277,
        "firstName": "Steve",
        "lastName": "Parker"
        "id": 4303,
        "firstName": "Lillian",
        "lastName": "Carpenter"
        "id": 9109,
        "firstName": "Samuel",
        "lastName": "Woods"

So at this point, I have a simple application whose performance can be altered by simply changing the @Fetch annotation, but how can we test the effect of this?

This is what Gatling is designed for. It is a performance and load testing library written in Scala, which has a really nice DSL that can be used to express scenarios for testing load on systems

This is code required to define a scenario for testing our system:

import io.gatling.core.Predef._
import io.gatling.http.Predef._

class SchoolClassLoadTest extends Simulation {

val httpConf = http
.baseURL("http://localhost:8080") // Here is the root for all relative URLs

val scn = scenario("Scenario Name").repeat(10, "n") {

exec(http("request_1") // Using http protocol
.get("/v1/class") // Get http method with the relative url
.check( // Check response status code is 200 (OK)

Not much is it?

Side-bar: Gatling also comes with is a recorder UI. This allows you to record all interactions with applications over HTTP and save them as scenarios. Gatling achieves this by acting as a proxy, which you can use like any other web proxy. You can route web browser or application traffic via the Gatling Recorder Proxy, and it will record all interactions made through it and save them as a scenario similar to the one above, which can then be tweaked later.

Once you have your scenario defined, you can configure your simulations to meet whatever load and duration parameters you like. For example in our app I’m going to run the test simulating 10 concurrent each making 10 requests:


This is just a simple example of what you can do with Gatling: you can specify ramp up and down of users to test how an app scales; pause between steps to simulate human interaction; and much more. For more about what you can do with Gatling check out their QuickStart Page

Back to our app. I’m now going to hit this simulation against our app whilst it’s using the FetchMode.JOIN config:

N.B. I’ve warmed up the DB cache before running both the before and after benchmarks by running the simulation once before I’ve recorded the results.

Above is the baseline for our app – you can see the mean response time is 221ms, and the max is 378ms. The 95 percentile is 318ms. Now look what happens when I simply change the @Fetch strategy from JOIN to SELECT:

The average response time has increased from 221ms to 3.4 seconds, with the 95th percentage rocketing up to 8.3 seconds! Something as simple as an annoatation change can have such a dramatic impact on performance. The worrying thing is this is so easy to do: it could be done by someone unfamiliar with Hibernate and database performance tuning; by someone frantically looking to find some performance gain and changing a number of things arbitrarily; or what I consider worse – someone who believes that the benefits of doing this outweigh the performance knock, but fail to gather the evidence to back it up!

Now that we’ve got the tools required to test performance, in my next article I’ll look into how this can be proactively monitored between releases, stay tuned…

Monitoring Solutions for Scakka Applications

Over the past few days I have been looking into monitoring Scala and Akka applications, as a number of new integration projects at work are using this framework. Scakka is designed to allow easy building of concurrent applications, and is a great middle-ground for transitioning from OOP to a functional-based approach.

The choices within this first investigation were between AppDynamics and a combination of Kamon, StatsD, Graphite and Graphana. So what are the two:

AppDynamics ( An Application Performance Management tool, capable of providing detailed analytics of a system, not just for the JVM but a variety of technologies, single apps or distributed interactions across a network.

The second choice is actually a combination of open source tools, which provide a subset of AppDynamics’ functionality:
Kamon ( Modular API for logging metrics for JVM based applications
StatsD ( Metric collator and aggregator service, which can take metrics from a number of sources (Kamon being one of them)
Graphite ( A highly scalable real-time graphing system
Graphana ( A dashboard builder and service capable of presenting charts provided by Graphite, Elasticsearch and a number of others in a unified dashboard

Disclaimer: AppDynamics has saved my bacon in the past on a previous project, providing detailed insight to a concurrent system which was suffering from performance issues. Needless to say I’m a little biased to it but still interested to see what else there is out there.

I am aware that there is also New Relic, but given it’s fairly comparable to AppDynamics I dropped it as part of this (mainly because it didn’t provide a broad enough contrast in choice, and it isn’t free)

So I started off with AppDynamics, as that was familiar ground and suspeted it wouldn’t be too much effort given my previous experience with it.

I used the latest version at the time (4.1), and setup the Controller in a VM. This was fairly straightforward as usual (although it takes quite a while) and after this I downloaded a Java Agent and hooked it up to my app using the -javaagent VM argument.

Screen Shot 2015-11-28 at 12.58.10

Sometimes you’re lucky, and AppDynamics will just detect everything going on in your app straight away… This wasn’t one of those times. No metrics appeared for any Business Transactions, and the App overview looked very bare. Digging into the App Servers list however showed that my app had indeed been registered, and when I clicked on the Memory tab I could see that memory usage was being monitored.

Ok, so I’m guessing AppDynamics wasn’t able to detect any entrypoints, as the app establishes it’s own outward connections, so nothing from the outside initiates connections to it. I also couldn’t use any of the automated instrumentation that the Spring framework benefits from either.

Screen Shot 2015-11-28 at 12.58.58

I got around this by adding some custom POJO instrumentation, by monitoring any objects that implemented the Actor class, and listening for invocations for the aroundReceive method. Splitting the transactions by the simple class name gives a breakdown per Actor type. If necessary you could go a level further and split by message type, but I felt this was a bit OTT to start with.

Screen Shot 2015-11-28 at 12.21.31

From here, you get high level metrics about each Actor, such as response times, calls per minute, error rates, each accompanied by historical sparklines. Double Clicking each of the Business Transactions provides very detailed information, from a graphical view of interactions (i.e. http calls made) right down to elapsed time at method level.

AppDynamics’ ability to track errors logged using SLF4J pretty much gives me everything I’d like from a app monitoring perspective, however there are a couple of drawbacks. Firstly there is a free version, but it limits you to storing 24 hours of data, and you can only track 1 JVM instance with it. This makes it very limited if you wanted to monitor interactions acreoss a number of co-ordinated microservices. AD’s default refresh rate of metrics appears to be a minute, although this may be configurable. This might not be realtime enough for some people but hasn’t been a problem for me.

Secondly I tried using the Kamon stack to see what compareable metrics I could get from my app. The benefits from this stack are that as it’s free (as it’s open source) and as it’s modular you can mix and match many of the components within it. For example, you could use a different metrics aggregator (instead of StatsD) or a different dashboaring tool (instead of Graphana), or you can embed it within your own app monitoring framework. As it’s open source it shouldn’t be too difficult to write your own extensions to give you extra functionality.

I used the following example stack ( that all runs within a single Docker app and makes starting up and tearing down a breeze. From what I’ve seen this appears to be the standard platform for monitoring Akka so far.

Setting it up was a bit more involved compared to AppDynamics, which put me off a little as I’d like a monitoring platform to be as unobtrusive as possible. Compared to AD, I had to add a number of Kamon components as dependencies, and also the AspectJ Weaver (as this is how Kamon instruments methods within the Actor system). The AspectJWeaver.jar also needs adding as a Java Agent – there’s 3 ways provided to do this, I used the -javaagent argument option as this allowed me to run it directly from IntelliJ (without having to run sbt externally)

Once I got Kamon working correctly with my app, I configured the monitoring. Again this is more intrusive than AD as it’s done within the app’s config file. However, it provides a number of configuration options, such as filters for including or excluding Actors, Routers and Dispatchers. You can also change the tick interval (to make dashboard updates more frequent) and include System and JVM metrics (such as CPU usage, host memory and jvm heap memory consumption, network utilisation etc.)

Once this was running and posting data to StatsD, I connected to the Graphana app on the webapp port advertised by docker (use “docker ps” to find out what this is) and started playing about with what was available. Firstly I created a row by selecting the button on the left, added a chart, and then started looking through all the metrics that were available. Most of them appear under the stats.timers hierarchy, and are collated under your app name (which is declared within the Kamon configuration in your app’s config file, under the kamon.statsd.simple-metric-key-generator.application element). The Akka-specific metrics live under the akka-actor section, and is constantly updated with your Actors as the system creates new ones.

Screen Shot 2015-11-30 at 12.11.56

I didn’t like how Graphana didn’t know what scales or format to use for any of the metrics, this had to be done manually. I suspect this is because there’s no metadata about the metrics available, which is understandable given it’s a pretty generic framework for aggregating whatever metrics you want. For example, When adding the processing time, it doesn’t provide any suggestions, and leaves you to pick between seconds, milliseconds, nanoseconds etc. Given I didn’t know at which level Kamon was recording these made it difficult to pick the right one!

Kamon doesn’t currently provide much in the way of metrics, although I’m sure that’s set to increase with future releases. Alongide the system metrics (i.e. CPU, memory, heap usage etc.) it provides, Kamon specificically monitors Akka so it can give you informtion on processing time, time spent in mailbox etc., which may be more useful than AD as it only provides generic metrics. Along with the few Akka metrics already available, another one I’d like is messages processed, so I can see if an Actor is getting swamped or processing more than it should.

For now, I think I’ll be sticking with AppDynamics – yes it’s more heavyweight (needs a dedicated VM) and not free, it provides more than enough information for me to make informed decisions about an applications performance or issues. When I need to monitor more than one microservice simultaneously however I might need to look elsewhere.

Then again, it’s quite likely that my lack of knowledge with the Kamon stack has limited my understanding of its potential. If anybody has examples or suggestions on how it can be better refined to monitor these kind of applications please comment below, it would be greatly appreciated!

Using Spotify Docker Plugin on Windows

So recently I have been looking into using Docker to automate parts of my testing and deployment, Using my lightweight CEP component ( that uses Siddhi under the covers as a basis to develop my skills in Continuous Integration / Deployment.

Currently using Windows 10 on my personal laptop and never one to give up a challenge, I finally got Docker Toolbox running on Windows with version 1.8.2, with boot2docker and Docker Quickstart Terminal.

My next goal was to configure Docker builds directly to my boot2docker VM using Spotify’s Docker Maven Plugin ( as a step towards Continuous Integration / Deployment. The idea being whenever I run the deploy phase of my Maven build, a Docker image of my software would be built, ready to run immediately afterwards.

This is the maven configuration I used within my “runner” module’s pom.xml:

            <entryPoint>["java", "-jar", "/app/${}.jar"]</entryPoint>
            <!-- copy the service's jar file from target into the root directory of the image -->

I was trying to run it within IntelliJ on Windows, and was getting the following error:

Docker - Socket Write Error

Reading the Spotify Docker readme on GitHub states that it uses the DOCKER_HOST environment variable to determine the location of the Docker Daemon, and if this isn’t set it uses localhost:2375. So I tried setting this to the IP of the VM.

20151108 Docker 7

Unsuccessful again with this error:

Docker Error 2

This hadn’t worked either. Next thing to do was ssh onto the boot2docker VM and do a “netstat -apn” command, to see what ports Docker was listening on

Docker Machine Netstat

Docker was actually listening on 2376, not 2375! So after changing this I got back to the Socket write error from previous attempts.

A quick Google for this brought a bug up for the plugin (, mentioning other environment variables that potentially needed setting up, but at this point I didn’t know if they’d fix my specific problem, or even if they did what to set them to.

So next thing I tried to do was go back to Docker basics and build an image of my Spring Boot uberjar using a standard Dockerfile (completely outside of Maven):

FROM nimmis/java:oracle-8-jdk

ADD smalldata-cep-runner-1.0-SNAPSHOT.jar /app/smalldata-cep-runner.jar

CMD ["java", "-jar", "/app/smalldata-cep-runner.jar"]

Then from within the Docker Quickstart Terminal I was able to run this command within the folder that had my Dockerfile in it: “docker build -t foyst/smalldata-cep .”

This successfully built my image in my boot2docker VM, which I could then run with “docker run –rm -p 8080:8080 foyst/smalldata-cep”

Ok, so using Vanilla Docker build I could successfully create an image, happy days! So from there I moved on to running “mvn docker:build” from PowerShell…

Still an error. Then it occurred to me to try “mvn docker:build” from the Docker Quickstart Terminal:

20151108 Docker 5

Success!! So what’s the difference? Eventually it occurred to me that the Docker Quickstart Terminal must somehow be configured out of the box to communicate with the boot2docker VM. So I started focusing my search on that

Always review the documentation, this came in handy:

This allowed my to actually configure the shell of my choice (Powershell), and knowing how that worked I could then apply the same to my IntelliJ configuration.

20151108 Docker 6

Using Docker in Windows is still quite a PITA, but now the client works natively on Windows and you understand what goes on behind the scenes it’s possible to get it working quite nicely.

Next I will be looking into automated builds and deployments using a combination of GitHub, Jenkins and Docker…

Docker Toolbox and COMODO Internet Security

So after a couple of hours fighting trying to get the latest Docker Toolbox (1.8.2b) installed on my Windows laptop, I thought I’d share my adventure!

tl;dr – disable ALL of your security software whilst installing. COMODO’s HIPS module was preventing changes being made to the file system, even though I thought I’d closed COMODO (right click and close) it was still interfering. Disabling this explicitly through the UI allowed me to install Docker Toolbox.


So after opening the DockerToolbox-1.8.2b.exe and allowing all of the requests coming through COMODO, I get this error message before any sight of a Docker Installer UI:

Docker Toolbox Install Error


After clicking OK the installer drops out, and I get another request from COMODO for the Docker install to modify a file.

So after repeating this frustrating loop a couple of times, I turned off COMODO internet security by right clicking the COMODO icon in the notifications area, and selecting Exit. This allowed me to get a bit further in the install right up to starting the install process.

Then the pain arrives…

MoveFile Failed Code 5 Access Denied – whilst trying to configure the uninstaller for Docker (the first thing that the installer does). So I aborted at this point and googled for that error message.

Nothing specific to Docker, however as I suspected it’s quite a general error message, which returned support posts for a number of other app installs all suffering the same issue. And they all confirmed the same thing – disable your anti-virus and security software.

Just to double check, I tried installing the Toolbox within a Windows 8.1 VM I use for specific development (and with no security software installed), and of course it was fine in there.

As I’d seen a couple of the support posts mention HIPS modules within security software being the culprit, I tried disabling COMODO’s HIPS module specifically (without turning the whole thing off)

Worked first time after that, COMODO raised an alarm about a threat being detected from the installer, but I suspect it was a false-positive so let it carry on. Can now say it works fine on my machine 🙂


Sometimes trying to be over-secure can be a right PITA…

WCF or ASP.NET Web API – Which to use?!

Found this interesting article when deciding whether to go with WCF or the newer ASP.NET Web API framework when creating new web services. the Web API looks good as the way to go when creating RESTful services, without all the complex and verbose configuration that comes with WCF.

However, along with that you lose the extra configuration WCF provides you with, such as multiple transport types, duplex communication, message queues etc. So it’s definitely worth considering what the service will be used for before committing to one of them.

COM woes – Retrieving COM class factory failed due to the following error: 80040154

I spent ages trying to figure out this problem, when trying to reference Microsoft Office Interop libraries in my project (For manipulating an Excel file programmatically).

Turns out, the dlls I was trying to reference were 32-bit, and the platform my project was set to run on was 64-bit. As I was building this project on a 64-bit machine, Visual Studio set the default platform to be 64-bit.

This is a simple fix: Go to the “Project” menu, select “Properties”, then click the “Build” tab on the side. From here you just need to select x86 from the Platform combo box and rebuild your project. Sorted!

Create SQL Table from .NET Datatable in C#

In one of my recent projects I needed to generate a datatable in C# whose fields could vary depending on which properties of a variable needed to be exported from the system. It needed to be able to generate the SQL CREATE TABLE script on the fly and create the table in the database, and then populate the table with the data to export, either using a SqlBulkCopy object or on a row-by-row basis.

The tutorial below provides a good code base to developing this kind of functionality, as well as providing a link to a similar project on MSDN.