最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

spring kafka - How to use ContainerTestUtils.waitForAssignment when this method is called multiple times for the same container?

matteradmin13PV0评论

I'm trying to write reliable tests of Kafka listeners. It's worth to mention I use external kafka container rather than @EmbeddedKafka.

The biggest problem I'm struggling with is how to ensure Kafka listener is assigned to partition and ready for consuming messages, before tests start. I've found ContainerTestUtils.waitForAssignment method but it doesn't work as I expect.

Let's say there is Kafka listener component:

@Component
public class SampleKafkaConsumer {

    @KafkaListener(
        topics = "${kafka.listener.some-event.topic}",
        groupId = "${kafka.listener.some-event.group-id}",
    )
    void consume(final SomeEvent event) {
        // do sth
    }
}

And there is a base class for integration tests:

@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {

    @Autowired(required = true)
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

    void setup() {
        kafkaListenerEndpointRegistry.getAllListenerContainers()
            .stream()
            .forEach { ContainerTestUtils.waitForAssignment(it, 1) }
    }
}

And for listener there is assigned fixed group-id:

kafka.listener.some-event.group-id=test.some-event

Assuming there is a few integration test classes extending BaseKafkaIntegrationSpec, the first test class works, but the second try of executing waitForAssignment() ends with an error:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

I was trying to guard waitForAssignment call with many different ways, with no luck. How should this look like so it works? There is no @DirtiesContext so Spring context is cached, but I feel like something wrong happens with the container when running new test class.

Another thing is that when I set random group ID name:

kafka.listener.some-event.group-id=test.some-event-${random.uuid}

Another problem appears. Despite Spring caches it context, new instance of SampleKafkaConsumer is created for each next test class (because of random group ID). When looking into logs, I can see that for e.g. three test classes in test run, at the end three consumers for three different group IDs are running and consuming events.

If going with approach with random group ID, is it possible to somehow configure Spring to override existing bean/containers for specific topic instead of creating the new one?

I'm trying to write reliable tests of Kafka listeners. It's worth to mention I use external kafka container rather than @EmbeddedKafka.

The biggest problem I'm struggling with is how to ensure Kafka listener is assigned to partition and ready for consuming messages, before tests start. I've found ContainerTestUtils.waitForAssignment method but it doesn't work as I expect.

Let's say there is Kafka listener component:

@Component
public class SampleKafkaConsumer {

    @KafkaListener(
        topics = "${kafka.listener.some-event.topic}",
        groupId = "${kafka.listener.some-event.group-id}",
    )
    void consume(final SomeEvent event) {
        // do sth
    }
}

And there is a base class for integration tests:

@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {

    @Autowired(required = true)
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

    void setup() {
        kafkaListenerEndpointRegistry.getAllListenerContainers()
            .stream()
            .forEach { ContainerTestUtils.waitForAssignment(it, 1) }
    }
}

And for listener there is assigned fixed group-id:

kafka.listener.some-event.group-id=test.some-event

Assuming there is a few integration test classes extending BaseKafkaIntegrationSpec, the first test class works, but the second try of executing waitForAssignment() ends with an error:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

I was trying to guard waitForAssignment call with many different ways, with no luck. How should this look like so it works? There is no @DirtiesContext so Spring context is cached, but I feel like something wrong happens with the container when running new test class.

Another thing is that when I set random group ID name:

kafka.listener.some-event.group-id=test.some-event-${random.uuid}

Another problem appears. Despite Spring caches it context, new instance of SampleKafkaConsumer is created for each next test class (because of random group ID). When looking into logs, I can see that for e.g. three test classes in test run, at the end three consumers for three different group IDs are running and consuming events.

If going with approach with random group ID, is it possible to somehow configure Spring to override existing bean/containers for specific topic instead of creating the new one?

Share Improve this question edited Nov 19, 2024 at 13:47 BartekN asked Nov 18, 2024 at 20:22 BartekNBartekN 2313 silver badges12 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

I think the @DirtiesContext is your friend. The point is that when Spring application context is cached, all of its bean are active. Therefore when you run a new test there is high possibility that your partitions are stolen by the container in the other cached context. Just because all of them are connected to the same Kafka broker.

Post a comment

comment list (0)

  1. No comments so far