抢先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我谨代表 Spring Integration 社区宣布 Spring Integration Extension for Amazon Web Services 版本 1.1
的第一个里程碑版本。 其制品是 spring-integration-aws.1.1.0.M1
,可在 里程碑存储库 中找到。
当然,首先要感谢你们,社区,你们的贡献!
此里程碑中包含的功能的一些亮点
提供 KinesisMessageDrivenChannelAdapter
和 KinesisMessageHandler
以与 Amazon Kinesis 集成。 前者非常简单,允许将数据发送到 Kinesis 流中。 目标 PutRecordRequest
的所有信息都可以从请求 Message
中确定
@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler kinesisMessageHandler() {
KinesisMessageHandler kinesisMessageHandler =
new KinesisMessageHandler(amazonKinesis());
kinesisMessageHandler.setAsyncHandler(asyncHandler());
kinesisMessageHandler.setStream("my_stream");
kinesisMessageHandler.
setPartitionKeyExpressionString("headers[aws_partitionKey]");
return kinesisMessageHandler;
}
默认情况下,它使用 SerializingConverter
将请求数据转换为 byte[]
。com.amazonaws.handlers.AsyncHandler
可用于异步 putRecordAsync()
结果反应。
KinesisMessageDrivenChannelAdapter
提供了一个全面的 Kinesis 流数据提取实现,包括 sequenceNumber
检查点和重新分片支持。 concurrency
选项可用于下游流程中的严格顺序记录处理。 在这种情况下,提供的分片在线程之间分配。 如果未提供并发性,则内部 ShardConsumer
直接在 consumerExecutor
上执行
@Bean
public KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis(), STREAM1);
adapter.setOutputChannel(kinesisChannel());
adapter.setCheckpointStore(checkpointStore());
adapter.setCheckpointMode(CheckpointMode.manual);
adapter.setListenerMode(ListenerMode.batch);
adapter.setStartTimeout(10000);
adapter.setDescribeStreamRetries(1);
adapter.setConcurrency(10);
return adapter;
}
如果 CheckpointMode
是 manual
,则 AwsHeaders.CHECKPOINTER
消息头将填充到每个发出的消息中。 它是 Checkpointer
的一个实例,其 checkpoint()
可用于下游流程中 checkpoint
分片中已处理记录的 sequenceNumber
。
注意
Amazon Kinesis Channel Adapters 的实现完全基于标准的 aws-java-sdk-kinesis
API,不使用 Kinesis Client Library。
为了避免本地文件系统限制(尤其是在云环境中甚至可能不可用),除了常规的 S3InboundFileSynchronizingMessageSource
之外,还引入了 S3StreamingMessageSource
@Bean
@InboundChannelAdapter(value = "s3FilesChannel",
poller = @Poller(fixedDelay = "1000"))
S3StreamingMessageSource s3InboundStreamingMessageSource(AmazonS3 amazonS3) {
S3SessionFactory s3SessionFactory = new S3SessionFactory(amazonS3);
S3RemoteFileTemplate s3FileTemplate =
new S3RemoteFileTemplate(s3SessionFactory);
S3StreamingMessageSource s3MessageSource =
new S3StreamingMessageSource(s3FileTemplate,
Comparator.comparing(FileInfo::getFilename));
s3MessageSource.setRemoteDirectory("/myBucket");
s3MessageSource.setFilter(
new S3PersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(),
"streaming"));
return s3MessageSource;
}
此消息源为远程 S3 对象生成一个 InputStream
作为消息有效负载,并且与 FTP 流式入站通道适配器 完全相似。
我们接下来的计划是为 Amazon DynamoDB 提供 MetadataStore
实现,使 SnsMessageHandler
和 SqsMessageHandler
基于异步客户端实现。 我们面前的另一个挑战是 - 为 Spring Cloud Stream 实现 Amazon Kinesis Binder。
请随时通过任何可用的沟通渠道与我们联系!