最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

java - Spring Integration - Kafka OutboundChannelAdapter propagates exception even if sendFailureChannel set - Stack Overflow

matteradmin6PV0评论

Spring integration: 6.2.0

I have noticed that even if I set a sendFailureChannel on a Kafka.outboundChannelAdapter an avro SerializationException is still rethrown and ends up in the error output whether a errorChannel header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel is set, or alternatively to take into account the errorChannel.

More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL is a direct channel)

public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";

@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
    return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
            .log(log.getName(),
                    m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
                            + "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
                            + "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                    .sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
            .get();
}

@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
    log.error("SendFailure:", exception);
}

@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
    log.error("Failed to send message to Kafka.", exception);
}

Spring integration: 6.2.0

I have noticed that even if I set a sendFailureChannel on a Kafka.outboundChannelAdapter an avro SerializationException is still rethrown and ends up in the error output whether a errorChannel header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel is set, or alternatively to take into account the errorChannel.

More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL is a direct channel)

public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";

@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
    return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
            .log(log.getName(),
                    m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
                            + "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
                            + "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                    .sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
            .get();
}

@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
    log.error("SendFailure:", exception);
}

@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
    log.error("Failed to send message to Kafka.", exception);
}
Share Improve this question asked Nov 18, 2024 at 15:35 Maxime DutautMaxime Dutaut 1326 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

That's correct.

The logic there is like this:

    catch (RuntimeException rtex) {
        sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
        throw rtex;
    }

So, indeed such an error is sent to the sendFailureChannel and re-thrown back to the caller.

The header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL) is not involved here because process is fully direct. The errorChannel is only considered when we deal with async channels.

We may consider to change such a logic, but only for tomorrow's 6.4.0 release. What I'm trying to say, that I am agreed with you that it is not supposed to be like that: if we provide sendFailureChannel, then no reason to re-throw such an exception.

Feel free to raise a GH issue and we will address it shortly!

Articles related to this article

Post a comment

comment list (0)

  1. No comments so far