领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我谨代表 Spring Integration 社区宣布 Spring Integration AWS 扩展 1.1
版本的首个里程碑版本。其构件为 spring-integration-aws.1.1.0.M1
,可在 里程碑存储库 中获取。
当然,首先要感谢您,社区,感谢您的贡献!
此里程碑版本中包含的功能亮点
提供了 KinesisMessageDrivenChannelAdapter
和 KinesisMessageHandler
来与 Amazon Kinesis 集成。前者非常简单,允许将数据发送到 Kinesis 流中。目标 PutRecordRequest
的所有信息都可以从请求 Message
中确定。
@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler kinesisMessageHandler() {
KinesisMessageHandler kinesisMessageHandler =
new KinesisMessageHandler(amazonKinesis());
kinesisMessageHandler.setAsyncHandler(asyncHandler());
kinesisMessageHandler.setStream("my_stream");
kinesisMessageHandler.
setPartitionKeyExpressionString("headers[aws_partitionKey]");
return kinesisMessageHandler;
}
默认情况下,它使用 SerializingConverter
将请求数据转换为 byte[]
。com.amazonaws.handlers.AsyncHandler
可用于异步 putRecordAsync()
结果反应。
KinesisMessageDrivenChannelAdapter
提供了一个全面的 Kinesis 流数据摄取实现,包括 sequenceNumber
检查点和重新分片支持。concurrency
选项可用于在下游流中严格排序记录处理。在这种情况下,提供的分片在线程之间分配。如果没有提供并发性,则内部 ShardConsumer
将直接在 consumerExecutor
上执行。
@Bean
public KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis(), STREAM1);
adapter.setOutputChannel(kinesisChannel());
adapter.setCheckpointStore(checkpointStore());
adapter.setCheckpointMode(CheckpointMode.manual);
adapter.setListenerMode(ListenerMode.batch);
adapter.setStartTimeout(10000);
adapter.setDescribeStreamRetries(1);
adapter.setConcurrency(10);
return adapter;
}
如果 CheckpointMode
为 manual
,则 AwsHeaders.CHECKPOINTER
消息头将填充到每个发出的消息中。它是一个 Checkpointer
实例,其 checkpoint()
可在下游流中用于为分片中处理的记录checkpoint
一个 sequenceNumber
。
注意
Amazon Kinesis Channel Adapters 实现完全基于标准 aws-java-sdk-kinesis
API,不使用 Kinesis 客户端库。
为了避免本地文件系统限制(尤其是在云环境中可能根本不可用),除了常规的 S3InboundFileSynchronizingMessageSource
之外,还引入了 S3StreamingMessageSource
。
@Bean
@InboundChannelAdapter(value = "s3FilesChannel",
poller = @Poller(fixedDelay = "1000"))
S3StreamingMessageSource s3InboundStreamingMessageSource(AmazonS3 amazonS3) {
S3SessionFactory s3SessionFactory = new S3SessionFactory(amazonS3);
S3RemoteFileTemplate s3FileTemplate =
new S3RemoteFileTemplate(s3SessionFactory);
S3StreamingMessageSource s3MessageSource =
new S3StreamingMessageSource(s3FileTemplate,
Comparator.comparing(FileInfo::getFilename));
s3MessageSource.setRemoteDirectory("/myBucket");
s3MessageSource.setFilter(
new S3PersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(),
"streaming"));
return s3MessageSource;
}
此消息源会生成远程 S3 对象的 InputStream
作为消息有效负载,并且与 FTP 流式入站通道适配器 完全相似。
我们接下来的计划是为 Amazon DynamoDB 提供 MetadataStore
实现,使 SnsMessageHandler
和 SqsMessageHandler
基于异步客户端实现。我们面前还有另一个挑战——为 Spring Cloud Stream 实现 Amazon Kinesis 绑定器。
请随时通过任何可用的通信渠道与我们联系!