@EmbeddedKafka and Spring Boot Kafka auto-configuration in a @SpringBootTest joined with bootstrapServersProperty

June 15, 2023

We all are used to Spring coming with reasonable defaults for everything and everything working together smoothly out of the box. Surprisingly, however, this does not appear to be the case by default with @EmbeddedKafka and @EnableAutoConfiguration in a @SpringBootTest.

Eventually, as simple as bootstrapServersProperty = "spring.kafka.bootstrap-servers" fixes the trick.

Mismatching property name defaults

Everything is documented as it should, but then I did not read all the documentation up-front as supposedly many of us developers would, too.

Spring Boot Kafka auto-configuration, consisting of KafkaAutoConfiguration and few more classes, picks up Kafka's originally referred to as "bootstrap.servers" property from the context as property named "spring.kafka.bootstrap-servers" into KafkaProperties.bootstrapServers.

By default, @EmbeddedKafka, however, provides such a value with a property with a different name, named "spring.embedded.kafka.brokers" and both don't match out of the box:

spring.kafka.bootstrap-servers

is not equal to

spring.embedded.kafka.brokers

.

Make Spring Kafka auto-configuration pick up @EmbeddedKafka's bootstrap.servers

The easiest and quickest approach to make the context property names fit is configuring

@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")

. Alternatively,

@TestPropertySource(properties = {
    "spring.kafka.bootstrap-servers = ${spring.embedded.kafka.brokers}",
})

would do just as well among many more possibilities.

The essential parts of a Spring Boot test with @EmbeddedKafka might look like this:

@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class EmbeddedKafkaTest {

    @Configuration
    @EnableAutoConfiguration
    static class TestConfiguration {
    }

   ...
}

With that configuration, @KafkaListener annotated consumer methods and all kinds of @Autowired dependencies such as KafkaTemplate, KafkaAdmin, ConsumerFactory, and ProducerFactory already resolve.

With some more minor tweaks explained further below, a working example looks like this:

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
@EmbeddedKafka(
    bootstrapServersProperty = "spring.kafka.bootstrap-servers",
    topics = EmbeddedKafkaTest.TOPIC_NAME
)
@TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset = earliest")
@TestInstance(Lifecycle.PER_CLASS)
class EmbeddedKafkaTest {

    @Configuration
    @EnableAutoConfiguration
    static class TestConfiguration {
    }

    static final String TOPIC_NAME = "topic";

    @Autowired
    private KafkaAdmin admin;

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Autowired
    private ProducerFactory<String, String> producerFactory;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private BlockingQueue<ConsumerRecord<String, String>> consumptionQueue = new LinkedBlockingDeque<>();

    @KafkaListener(topics = TOPIC_NAME, groupId = "listener")
    private void listen(ConsumerRecord<String, String> consumerRecord) throws InterruptedException {
        consumptionQueue.put(consumerRecord);
    }

    @Test
    void testProducerAndConsumer() throws Exception {
        final String KEY = "key1", VALUE = "value1";
        try (
            Consumer<String, String> consumer = consumerFactory.createConsumer("consumer", null);
            Producer<String, String> producer = producerFactory.createProducer();
        ) {
            consumer.subscribe(asList(TOPIC_NAME));

            producer.send(new ProducerRecord<>(TOPIC_NAME, KEY, VALUE), (metadata, exception) -> {
            }).get();
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));

            assertThat(records).singleElement().satisfies(singleRecord -> {
                assertThat(singleRecord.key()).isEqualTo(KEY);
                assertThat(singleRecord.value()).isEqualTo(VALUE);
            });
            consumer.commitSync();
            consumer.unsubscribe();
        }
    }

    @Test
    void testTemplateAndListener() throws Exception {
        final String KEY = "key2", VALUE = "value2";
        consumptionQueue.clear();

        kafkaTemplate.send(TOPIC_NAME, KEY, VALUE).get();
        ConsumerRecord<String, String> consumerRecord = consumptionQueue.poll(3, TimeUnit.SECONDS);

        assertThat(consumerRecord.key()).isEqualTo(KEY);
        assertThat(consumerRecord.value()).isEqualTo(VALUE);
        assertThat(consumptionQueue).isEmpty();
    }

    @Test
    void checkBootstrapServersParameterResolutionExample(
        @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
        @Autowired EmbeddedKafkaBroker broker
    ) throws Exception {
        assertThat(broker.getBrokersAsString()).isEqualTo(bootstrapServers);
    }

    @Test
    void testAdmin() {
        assertThat(admin.describeTopics(TOPIC_NAME)).containsKey(TOPIC_NAME);
    }

}

Unique consumer group.ids

Whenever there is more than one consumer, no matter whether created explicitly or indirectly by a @KafkaListener, each consumer's group.id has to be specified in order to be unique. There are plenty other ways possible, but it can be achieved for example like this:

@KafkaListener(..., groupId = "unique-listener-group-id")

or

consumerFactory.createConsumer("unique-consumer-group-id", null);

By no means is this meant to comprehensively explain consumer groups, just as a potentially useful hint.

Applying

@TestInstance(Lifecycle.PER_CLASS)

to the test classes prevents JUnit from instantiating for each @Test method another instance of the test class and in turn Spring instantiating another context including each time another @KafkaListener the always same group.id of which then not any longer being unique because always based on the very same configuration (obviously except when the test class has not more than one @Test method).

auto.offset.reset = earliest

Also useful might be:

@TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset = earliest")

A side-note to alternatives

Docker Compose

There are many examples out there that work with Docker Compose. While that certainly has its point, however, in my opinion, it is generally preferable that tests include all the setup they need, which is the case with @EmbeddedKafka and is not with Docker Compose. With @EmbeddedKafka anyone can just check out and start a test from within their favorite IDE. With Docker Compose some Docker containers usually have to be started before running tests or worse if there are for example port collisions or left over data from previously run tests.

Testcontainers

Probably the closest alternative to @EmbeddedKafka in-memory Kafka broker setup with Spring is Testcontainers.

It supports for example environment variables as documented and as opposed to similarly but not identically named Spring context properties, for example "bootstrap.servers" vs. "spring.kafka.bootstrap-servers" and generally might feel to come closer to plain Kafka or a production setup than @EmbeddedKafka ever will.

Testcontainers provides with KafkaContainer for configuring containers specifically for Kafka brokers. One minor difference to @EmbeddedKafka might be that the latter supports configuring the number of brokers with a plain number with EmbeddedKafka.count whereas Testcontainers does not support multiple brokers exactly as easily. For an example cluster setup, implying an appropriate example having multiple cluster nodes, I'd recommend to check out KafkaContainerCluster as a starting point with Testcontainers.

Conclusion

There are plenty of examples out there in the Internet and their sheer number and variety makes it cumbersome to find appropriate examples.

As far as I remember, I never had anything from Spring that did not come with working defaults prior to @EmbeddedKafka and Spring Boot Kafka auto-configuration, which is a big surprise and I figure justifies to spread the word about how to make it work.

After all and with the example shown, just one piece of additional configuration is unexpectedly necessary and writing tests with Spring is as easy as ever also with Kafka.

References

About the author: Philipp Kunz

Works at mimacom as a software engineer.

Comments
Join us