最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

spring integration - What is the effect of streamInitialSequence in case there are already checkpoints in DynamoDB? - Stack Over

matteradmin6PV0评论

I would like to use KinesisMessageDrivenChannelAdapter to read records from a Kinesis stream. When starting the consumer application for the very first time, I would like it to receive all records that already exist in the stream. On subsequent starts though, the application should continue reading from the latest checkpointed sequence number coming from DynamoDb.

Is my assumption correct that adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon()) provides this behaviour?

I would like to use KinesisMessageDrivenChannelAdapter to read records from a Kinesis stream. When starting the consumer application for the very first time, I would like it to receive all records that already exist in the stream. On subsequent starts though, the application should continue reading from the latest checkpointed sequence number coming from DynamoDb.

Is my assumption correct that adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon()) provides this behaviour?

Share Improve this question asked Nov 18, 2024 at 16:21 Shuffling2241Shuffling2241 272 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

That works only for new consumers in the group. If there is already a checkpoint for this consumer group and that shard, then we go like this:

        if (this.shardOffset.isReset()) {
            this.checkpointer.remove();
        }
        else {
            String checkpoint = this.checkpointer.getCheckpoint();
            if (checkpoint != null) {
                this.shardOffset.setSequenceNumber(checkpoint);
                this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
            }
        }

So, it is going to consume from a stored checkpoint.

If you'd like to use that trimHorizon, then you call KinesisMessageDrivenChannelAdapter.resetCheckpoints().

Articles related to this article

Post a comment

comment list (0)

  1. No comments so far