Spring Integration AWS 扩展 1.1.0 M1 发布

发布 | Artem Bilan | 2017年3月9日 | ...

我谨代表 Spring Integration 社区宣布 Spring Integration AWS 扩展 1.1 版本的首个里程碑版本。其构件为 spring-integration-aws.1.1.0.M1,可在 里程碑存储库 中获取。

当然,首先要感谢您,社区,感谢您的贡献!

此里程碑版本中包含的功能亮点

Kinesis 支持

提供了 KinesisMessageDrivenChannelAdapterKinesisMessageHandler 来与 Amazon Kinesis 集成。前者非常简单,允许将数据发送到 Kinesis 流中。目标 PutRecordRequest 的所有信息都可以从请求 Message 中确定。

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler kinesisMessageHandler() {
    KinesisMessageHandler kinesisMessageHandler =
                new KinesisMessageHandler(amazonKinesis());
    kinesisMessageHandler.setAsyncHandler(asyncHandler());
    kinesisMessageHandler.setStream("my_stream");
    kinesisMessageHandler.
             setPartitionKeyExpressionString("headers[aws_partitionKey]");
    return kinesisMessageHandler;
}

默认情况下,它使用 SerializingConverter 将请求数据转换为 byte[]com.amazonaws.handlers.AsyncHandler 可用于异步 putRecordAsync() 结果反应。

KinesisMessageDrivenChannelAdapter 提供了一个全面的 Kinesis 流数据摄取实现,包括 sequenceNumber 检查点和重新分片支持。concurrency 选项可用于在下游流中严格排序记录处理。在这种情况下,提供的分片在线程之间分配。如果没有提供并发性,则内部 ShardConsumer 将直接在 consumerExecutor 上执行。

@Bean
public KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
    KinesisMessageDrivenChannelAdapter adapter =
            new KinesisMessageDrivenChannelAdapter(amazonKinesis(), STREAM1);
    adapter.setOutputChannel(kinesisChannel());
    adapter.setCheckpointStore(checkpointStore());
    adapter.setCheckpointMode(CheckpointMode.manual);
    adapter.setListenerMode(ListenerMode.batch);
    adapter.setStartTimeout(10000);
    adapter.setDescribeStreamRetries(1);
    adapter.setConcurrency(10);
    return adapter;
}

如果 CheckpointModemanual,则 AwsHeaders.CHECKPOINTER 消息头将填充到每个发出的消息中。它是一个 Checkpointer 实例,其 checkpoint() 可在下游流中用于为分片中处理的记录checkpoint 一个 sequenceNumber

注意

Amazon Kinesis Channel Adapters 实现完全基于标准 aws-java-sdk-kinesis API,不使用 Kinesis 客户端库

S3 流式消息源

为了避免本地文件系统限制(尤其是在云环境中可能根本不可用),除了常规的 S3InboundFileSynchronizingMessageSource 之外,还引入了 S3StreamingMessageSource

@Bean
@InboundChannelAdapter(value = "s3FilesChannel",
                 poller = @Poller(fixedDelay = "1000"))
S3StreamingMessageSource s3InboundStreamingMessageSource(AmazonS3 amazonS3) {
    S3SessionFactory s3SessionFactory = new S3SessionFactory(amazonS3);
    S3RemoteFileTemplate s3FileTemplate =
                         new S3RemoteFileTemplate(s3SessionFactory);
    S3StreamingMessageSource s3MessageSource =
                        new S3StreamingMessageSource(s3FileTemplate,
                                    Comparator.comparing(FileInfo::getFilename));
    s3MessageSource.setRemoteDirectory("/myBucket");
    s3MessageSource.setFilter(
                      new S3PersistentAcceptOnceFileListFilter(
                                             new SimpleMetadataStore(),
                                             "streaming"));
    return s3MessageSource;
}

此消息源会生成远程 S3 对象的 InputStream 作为消息有效负载,并且与 FTP 流式入站通道适配器 完全相似。

我们接下来的计划是为 Amazon DynamoDB 提供 MetadataStore 实现,使 SnsMessageHandlerSqsMessageHandler 基于异步客户端实现。我们面前还有另一个挑战——为 Spring Cloud Stream 实现 Amazon Kinesis 绑定器。

请随时通过任何可用的通信渠道与我们联系!

项目页面 | 问题 | 帮助 | 聊天

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部