@EmbeddedKafka and Spring Boot Kafka auto-configuration in a @SpringBootTest joined with bootstrapServersProperty
We all are used to Spring coming with reasonable defaults for everything and everything working together smoothly out of the box.
Surprisingly, however, this does not appear to be the case by default with
@EmbeddedKafka
and
@EnableAutoConfiguration
in a
@SpringBootTest
.
Eventually, as simple as
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
fixes the trick.
Mismatching property name defaults
Everything is documented as it should, but then I did not read all the documentation up-front as supposedly many of us developers would, too.
Spring Boot Kafka auto-configuration, consisting of
KafkaAutoConfiguration
and few more classes,
picks up Kafka's originally referred to as
bootstrap.servers
"spring.kafka.bootstrap-servers
"KafkaProperties.bootstrapServers
.
By default,
@EmbeddedKafka
,
however, provides such a value with a property with a different name,
named spring.embedded.kafka.brokers
"
spring.kafka.bootstrap-servers
is not equal to
spring.embedded.kafka.brokers
.
Make Spring Kafka auto-configuration pick up @EmbeddedKafka
's bootstrap.servers
The easiest and quickest approach to make the context property names fit is configuring
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
. Alternatively,
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers = ${spring.embedded.kafka.brokers}",
})
would do just as well among many more possibilities.
The essential parts of a Spring Boot test with
@EmbeddedKafka
might look like this:
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class EmbeddedKafkaTest {
@Configuration
@EnableAutoConfiguration
static class TestConfiguration {
}
...
}
With that configuration,
@KafkaListener
annotated consumer methods and all kinds of
@Autowired
dependencies such as
KafkaTemplate
,
KafkaAdmin
,
ConsumerFactory
,
and
ProducerFactory
already resolve.
With some more minor tweaks explained further below, a working example looks like this:
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@EmbeddedKafka(
bootstrapServersProperty = "spring.kafka.bootstrap-servers",
topics = EmbeddedKafkaTest.TOPIC_NAME
)
@TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset = earliest")
@TestInstance(Lifecycle.PER_CLASS)
class EmbeddedKafkaTest {
@Configuration
@EnableAutoConfiguration
static class TestConfiguration {
}
static final String TOPIC_NAME = "topic";
@Autowired
private KafkaAdmin admin;
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Autowired
private ProducerFactory<String, String> producerFactory;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private BlockingQueue<ConsumerRecord<String, String>> consumptionQueue = new LinkedBlockingDeque<>();
@KafkaListener(topics = TOPIC_NAME, groupId = "listener")
private void listen(ConsumerRecord<String, String> consumerRecord) throws InterruptedException {
consumptionQueue.put(consumerRecord);
}
@Test
void testProducerAndConsumer() throws Exception {
final String KEY = "key1", VALUE = "value1";
try (
Consumer<String, String> consumer = consumerFactory.createConsumer("consumer", null);
Producer<String, String> producer = producerFactory.createProducer();
) {
consumer.subscribe(asList(TOPIC_NAME));
producer.send(new ProducerRecord<>(TOPIC_NAME, KEY, VALUE), (metadata, exception) -> {
}).get();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
assertThat(records).singleElement().satisfies(singleRecord -> {
assertThat(singleRecord.key()).isEqualTo(KEY);
assertThat(singleRecord.value()).isEqualTo(VALUE);
});
consumer.commitSync();
consumer.unsubscribe();
}
}
@Test
void testTemplateAndListener() throws Exception {
final String KEY = "key2", VALUE = "value2";
consumptionQueue.clear();
kafkaTemplate.send(TOPIC_NAME, KEY, VALUE).get();
ConsumerRecord<String, String> consumerRecord = consumptionQueue.poll(3, TimeUnit.SECONDS);
assertThat(consumerRecord.key()).isEqualTo(KEY);
assertThat(consumerRecord.value()).isEqualTo(VALUE);
assertThat(consumptionQueue).isEmpty();
}
@Test
void checkBootstrapServersParameterResolutionExample(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
@Autowired EmbeddedKafkaBroker broker
) throws Exception {
assertThat(broker.getBrokersAsString()).isEqualTo(bootstrapServers);
}
@Test
void testAdmin() {
assertThat(admin.describeTopics(TOPIC_NAME)).containsKey(TOPIC_NAME);
}
}
Unique consumer group.id
s
Whenever there is more than one consumer, no matter whether created explicitly or indirectly by a
@KafkaListener
,
each consumer's group.id
has to be specified in order to be unique.
There are plenty other ways possible, but it can be achieved for example like this:
@KafkaListener(..., groupId = "unique-listener-group-id")
or
consumerFactory.createConsumer("unique-consumer-group-id", null);
By no means is this meant to comprehensively explain consumer groups, just as a potentially useful hint.
Applying
@TestInstance(Lifecycle.PER_CLASS)
to the test classes prevents JUnit from instantiating for each
@Test
method another instance of the test class
and in turn Spring instantiating another context including each time another
@KafkaListener
the always same group.id
of which then not any longer being unique because always based on the very same configuration
(obviously except when the test class has not more than one @Test
method).
auto.offset.reset = earliest
Also useful might be:
@TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset = earliest")
A side-note to alternatives
Docker Compose
There are many examples out there that work with Docker Compose. While that certainly has its point,
however, in my opinion, it is generally preferable that tests include all the setup they need,
which is the case with
@EmbeddedKafka
and is not with Docker Compose.
With
@EmbeddedKafka
anyone can just check out and start a test from within their favorite IDE.
With Docker Compose some Docker containers usually have to be started before running tests
or worse if there are for example port collisions or left over data from previously run tests.
Testcontainers
Probably the closest alternative to
@EmbeddedKafka
in-memory Kafka broker setup with Spring is
Testcontainers.
It supports for example environment variables as documented and
as opposed to similarly but not identically named Spring context properties,
for example bootstrap.servers
"spring.kafka.bootstrap-servers
"@EmbeddedKafka
ever will.
Testcontainers provides with
KafkaContainer
for configuring containers specifically for Kafka brokers.
One minor difference to
@EmbeddedKafka
might be that the latter supports configuring the number of brokers with a plain number with
EmbeddedKafka.count
whereas Testcontainers does not support multiple brokers exactly as easily.
For an example cluster setup, implying an appropriate example having multiple cluster nodes,
I'd recommend to check out
KafkaContainerCluster
as a starting point with Testcontainers.
Conclusion
There are plenty of examples out there in the Internet and their sheer number and variety makes it cumbersome to find appropriate examples.
As far as I remember, I never had anything from Spring that did not come with working defaults prior to
@EmbeddedKafka
and Spring Boot Kafka auto-configuration,
which is a big surprise and I figure justifies to spread the word about how to make it work.
After all and with the example shown, just one piece of additional configuration is unexpectedly necessary and writing tests with Spring is as easy as ever also with Kafka.
References
- Sources accompanying this article on Github: https://github.com/dev-qnz/embedded-kafka-spring-auto-configuration-test/tree/main/src/test/java
- Testing an Apache Kafka Integration within a Spring Boot Application and JUnit 5 - mimacom Tech Blog: https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/
- Spring Boot Kafka auto-configuration: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka
spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
- Spring for Apache Kafka: https://docs.spring.io/spring-kafka/docs/current/reference/html/#same-broker-multiple-tests@EmbeddedKafka(..., bootstrapServersProperty = "spring.kafka.bootstrap-servers")
- Spring Messaging: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.embedded@EmbeddedKafka(..., bootstrapServersProperty = "spring.kafka.bootstrap-servers")
- Spring for Apache Kafka: https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-testing-embeddedkafka-annotation- "
bootstrap.servers
" producers configuration property - Apache Kafka configuration reference: https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers - "
group.id
" consumers configuration property - Apache Kafka configuration reference: https://kafka.apache.org/documentation/#consumerconfigs_group.id