This is the second part of Testing with Kafka for JUnit. See the table underneath for links to the other parts.

Part 1: Writing component tests for Kafka producers
Part 2: Writing component tests for Kafka consumers
Part 3: Writing system tests for a Kafka-enabled microservice
Part 4: Using Kafka for JUnit with Spring Kafka

We have seen how easy it is to write concise and readable component tests for Kafka producers in the last article. In this installment, we will focus on the read-side and write component tests for a Kafka consumer. The example is centered around the same small lifecycle event service that we saw in the last article.

Suppose we have a generic Subscriber<T> interface that provides the means to read from a log.

public interface Subscriber<T> extends Runnable {

  void onEvent(T event);

  void close();

  @Override
  void run();
}

The core idea is that the onEvent(T) method will be called for each and every record that has been successfully deserialized into an instance of the parameterized type T. Recall from the last article that our lifecycle event service deals with subtypes of TurbineEvents. The base class is shown underneath.

@JsonTypeInfo(
  use = JsonTypeInfo.Id.NAME,
  include = JsonTypeInfo.As.EXISTING_PROPERTY,
  property = "type")
@JsonSubTypes({
  @JsonSubTypes.Type(
    value = TurbineRegisteredEvent.class, 
    name = TurbineRegisteredEvent.TYPE),
  @JsonSubTypes.Type(
    value = TurbineDeregisteredEvent.class, 
    name = TurbineDeregisteredEvent.TYPE)
})
public abstract class TurbineEvent {

  private final UUID eventId;
  private final String turbineId;
  private final long timestamp;

  @JsonCreator
  public TurbineEvent(@JsonProperty("eventId") UUID eventId,
                      @JsonProperty("turbineId") String turbineId,
                      @JsonProperty("timestamp") long timestamp) {
    this.eventId = eventId;
    this.turbineId = turbineId;
    this.timestamp = timestamp;
  }

  public TurbineEvent(String turbineId) {
    this(UUID.randomUUID(), turbineId, System.currentTimeMillis());
  }

  public abstract String getType();

  /** getters omitted for brevity **/
  /** toString() implementation omitted for brevity **/
}

If you're interested in the full description of this base event type and all of its derivatives, then please have a look at this GitHub repository. The repository contains the full source that was used for this article.

We have to implement a Deserializer for TurbineEvents which translates the resp. byte[] representation from the record into a JSON string that can be converted to the proper POJO.

public final class TurbineEventDeserializer implements Deserializer<TurbineEvent> {

  private final ObjectMapper mapper = new ObjectMapper();

  @Override
  public TurbineEvent deserialize(String topic, byte[] data) {
    final var rawPayload = new String(data);
    try {
       return mapper.readValue(rawPayload, TurbineEvent.class);
    } catch (JsonProcessingException e) {
      throw new SubscriberException(e);
    }
  }
}

With the deserializer in place, we are ready to provide a specific implementation of the Subscriber<T> interface that is able to handle TurbineEvents. To keep things simple, we will implement this in a single class - call it TurbineEventSubscriber. The core idea is that our subscriber runs in a separate Thread that looks for new records until it is explicitly stopped or receives an interrupt signal. The run() method is implemented as follows.

@Override
public void run() {
  initialize();
  while (isRunning()) {
    poll();
  }
  shutdown();
}

The initialize() method sets up the underlying Kafka consumer, while poll() simply uses the Consumer.poll(Duration) method to check for new records. If any of the aforemetioned shutdown conditions apply, then isRunning() will evaluate to false and the subscriber will run into the shutdown() method which tries to free up all previously acquired resources. Nothing too fancy; this basic design should not be a surprise to you if you've already done some work with Apache Kafka.

The implementation of onEvent(TurbineEvent) is the heart of the subscriber. For the sake of simplicity, we will simply consume the event and log its details to the console.

@Override
public void onEvent(final TurbineEvent event) {
  if (event instanceof TurbineRegisteredEvent) {
    log.info("A new wind turbine has been registered. It's ID is {} and it is located at {}/{} (lat/lon).",
      event.getTurbineId(),
      ((TurbineRegisteredEvent) event).getLatitude(),
      ((TurbineRegisteredEvent) event).getLongitude());
  } else if (event instanceof TurbineDeregisteredEvent) {
    log.info("The wind turbine with ID {} has been de-registered.", event.getTurbineId());
  } else {
    log.warn("Received an unknown event type: {}", event);
  }
}

To clarify, here's an overview of all the classes and interfaces that contribute to the read-path.

Overview of classes and components of the read-path

Let's write a simple component test that verifies that the subscriber is able to consume TurbineEvents. We don't need to explicitly use the TurbineEventPublisher that we have implemented in the last article. Instead, we will use Kafka for JUnit to inject Kafka records with the proper payload (or faulty ones to see how our subscriber reacts) and see if they get picked up as expected. So, let's create a test called TurbineEventSubscriberTest and bring up an embeddeded Kafka cluster with default settings.

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;

public class TurbineEventSubscriberTest {

  private EmbeddedKafkaCluster kafka;

  @BeforeEach
  void setupKafka() {
    kafka = provisionWith(defaultClusterConfig());
    kafka.start();
  }

  @AfterEach
  void tearDownKafka() {
    kafka.stop();
  }
}

This setup hasn't changed from the last time. We use the same factory methods to create builders that help us parameterize the cluster in a concise and readable way. The next step is to write the actual test. We want to make sure that a TurbineEvent that has been successfully published to the log is consumed by our subscriber.

Step 1. First off we create an instance of TurbineRegisteredEvent and use Kafka for JUnit to publish it to the topic turbine-events. Since we are publishing a record that has a key associated with it, we have to wrap it inside an instance of KeyValue<TurbineEvent>. The KeyValue class comes with Kafka for JUnit and lets you not only set the key and the value of the record, but also individual header values.

var event = new TurbineRegisteredEvent("1a5c6012", 49.875114, 8.978702);
var kv = new KeyValue<>(event.getTurbineId(), event);

Step 2. Once this is done, we can use the EmbeddedKafkaCluster to send the event to the designated topic. Be sure to parameterize the send operation with the appropriate value serializer.

kafka
  .send(inTransaction("turbine-events", Collections.singletonList(kv))
  .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,     
        TurbineEventSerializer.class.getName()));

Step 3. With the proper setup and data provisioning of the topic, we're all set to start up the subscriber and see if it properly consumes the record. Our implementation of TurbineEventSubscriber does not have any collaborating classes that consume a received event; it implements the business logic directly. Thus, mocking is not an option, but instrumentation is. Have a look at the following static class that we define as part of our test case.

static class InstrumentingTurbineEventSubscriber extends TurbineEventSubscriber {

  private final List<TurbineEvent> receivedEvents = new ArrayList<>();

  private final CountDownLatch latch;

  public InstrumentingTurbineEventSubscriber(String topic,
                                             Map<String, Object> config,
                                             CountDownLatch latch) {
    super(topic, config);
    this.latch = latch;
  }

  @Override
  public void onEvent(final TurbineEvent event) {
    super.onEvent(event);
    receivedEvents.add(event);
    latch.countDown();
  }

  public List<TurbineEvent> getReceivedEvents() {
    return Collections.unmodifiableList(receivedEvents);
  }
}

This class overrides the onEvent(TurbineEvent) method and uses a CountDownLatch that we can leverage to provide a feedback channel to the test case that signals if the record has been consumed - or a timeout has elapsed.

With this instrumentation in place, we can instantiate our subject-under-test and start it in a dedicated thread. The configuration is simple: Of course, we'll have to give the underlying Kafka consumer an entry point into the cluster. This is done by fetching the list of active brokers using EmbeddedKafkaCluster.getBrokerList().

var latch = new CountDownLatch(1);
var config = Map.<String, Object>of(
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokerList(),
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
var consumer = new InstrumentingTurbineEventSubscriber("turbine-events", config, latch);
var consumerThread = new Thread(consumer);
consumerThread.start();

Step 4. We'll pause the test execution by waiting on our barrier, giving it a timeout of 10 seconds. This should be more than enough time for the subscriber to connect with the broker, register itself as a consumer for the designated topic and consume the event. Please note that the timeout of 10 seconds is an upper bound on the time that the barrier blocks. Immediately after the subscriber has consumed the record it executes CountDownLatch.countDown(), which lets the test case pass through the barrier as soon as possible.

latch.await(10, TimeUnit.SECONDS);

Step 5. After the barrier is passed, we'll check if our instrumented TurbineEventSubscriber has seen any event.

assertThat(consumer.getReceivedEvents().size()).isEqualTo(1);

Step 6. Close the consumer and the thread it runs in and terminate the test.

consumer.close();
consumerThread.join(TimeUnit.SECONDS.toMillis(1));

For the sake of completeness, here is the whole test class.

/** regular imports omitted for brevity */
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static net.mguenther.kafka.junit.SendKeyValuesTransactional.inTransaction;
import static org.assertj.core.api.Assertions.assertThat;

public class TurbineEventSubscriberTest {

  private EmbeddedKafkaCluster kafka;

  @BeforeEach
  void setupKafka() {
    kafka = provisionWith(defaultClusterConfig());
    kafka.start();
  }

  @AfterEach
  void tearDownKafka() {
    kafka.stop();
  }

  @Test
  void shouldConsumeTurbineRegisteredEvent() throws Exception {

    var event = new TurbineRegisteredEvent("1a5c6012", 49.875114, 8.978702);
    var kv = new KeyValue<>(event.getTurbineId(), event);

    kafka
      .send(inTransaction("turbine-events", Collections.singletonList(kv))
      .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TurbineEventSerializer.class.getName()));

    var latch = new CountDownLatch(1);
    var config = Map.<String, Object>of(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokerList(),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    var consumer = new InstrumentingTurbineEventSubscriber("turbine-events", config, latch);
    var consumerThread = new Thread(consumer);
    consumerThread.start();

    latch.await(10, TimeUnit.SECONDS);

    assertThat(consumer.getReceivedEvents().size()).isEqualTo(1);

    consumer.close();
    consumerThread.join(TimeUnit.SECONDS.toMillis(1));
  }

  static class InstrumentingTurbineEventSubscriber extends TurbineEventSubscriber {

    private final List<TurbineEvent> receivedEvents = new ArrayList<>();

    private final CountDownLatch latch;

    public InstrumentingTurbineEventSubscriber(String topic,
                                               Map<String, Object> userSuppliedConfig,
                                               CountDownLatch latch) {
      super(topic, userSuppliedConfig);
      this.latch = latch;
    }

    @Override
    public void onEvent(TurbineEvent event) {
      super.onEvent(event);
      receivedEvents.add(event);
      latch.countDown();
    }

    public List<TurbineEvent> getReceivedEvents() {
      return Collections.unmodifiableList(receivedEvents);
    }
  }
}

And that's it for this test!

Kafka for JUnit comes with many more features. For instance, if our TurbineEventSubscriber uses a transactional consumer, we can check that it only reads records that have been published to the log as part of a valid and successful transaction. Using the following code, we can publish an TurbineEvent within a transaction and fail that transaction on purpose to see if our subscriber implementation behaves correctly.

kafka
  .send(inTransaction("turbine-events", record)
  .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TurbineEventSerializer.class.getName())
  .failTransaction());

This makes Kafka for JUnit a valuable tool when testing the resilience of your Kafka-based components. Check out the comprehensive user's guide to learn more!

We'll take a look at writing system tests for a microservice that publishes domain events to Kafka in the next article.