10.2.8 通道适配器

通道适配器代表集成信息流的入口点和出口点。数据通过入站信道适配器的方式进入到集成流中,通过出站信道适配器的方式离开集成流。如图 10.9。

图 10.9 通道适配器是集成流的入口和出口点。

入站信道的适配器可以采取多种形式,这取决于它们引入到流的数据源。例如,声明一个入站通道适配器,它采用从 AtomicInteger 到流递增的数字。使用 Java 配置,它可能是这样的:

@Bean
@InboundChannelAdapter(
  poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
  return () -> {
    return new GenericMessage<>(source.getAndIncrement());
  };
}

@Bean 方法声明了一个入站信道适配器 bean,后面跟随着 @InboundChannelAdapter 注解,它们每 1 秒(1000 ms)从注入的 AtomicInteger 提交一个数字到名 numberChannel 的通道中。

当使用 Java 配置时,@InboundChannelAdapter 意味着是一个入站通道适配器,from() 方法就是使用 Java DSL 来定义流的时候,表明它是怎么处理的。下面对于流定义的一个片段展示了在 Java DSL 配置中类似的输入通道适配器:

@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
  return IntegrationFlows
      .from(integerSource, "getAndIncrement",
        c -> c.poller(Pollers.fixedRate(1000)))
    ...
      .get();
}

通常情况下,通道适配器通过的 Spring Integration 的多端点模块之一进行提供。举个例子,假设需要一个入站通道适配器,用它来监视指定的目录,同时将任何写入到那个目录中的文件作为消息,提交到名为 file-channel 的通道中。下面的 Java 配置使用 FileReadingMessageSource 从 Spring Integration 的文件端点模块来实现这一目标:

@Bean
@InboundChannelAdapter(channel="file-channel",
  poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
  FileReadingMessageSource sourceReader = new FileReadingMessageSource();
  sourceReader.setDirectory(new File(INPUT_DIR));
  sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
  return sourceReader;
}

当在 Java DSL 中写入同样的 file-reading 入站通道适配器时,来自 Files 类的 inboundAdapter() 方法达到的同样的目的。出站通道适配器位于集成信息流的最后位置,将最终消息扇出到应用程序或是其他系统中:

@Bean
public IntegrationFlow fileReaderFlow() {
  return IntegrationFlows
    .from(Files.inboundAdapter(new File(INPUT_DIR))
      .patternFilter(FILE_PATTERN))
    .get();
}

服务激活器(作为消息处理的实现)往往是为出站通道适配器而存在的,特别是当数据需要被扇出到应用程序本身的时候。

值得一提的,Spring Integration 的端点模块为几种常见的用例提供了有用的消息处理程序。如在程序清单 10.3 中所看到的 FileWritingMessageHandler 出站通道适配器,这就是一个很好的例子。说到 Spring Integration 端点模块,让我们快速浏览一下准备使用的集成端点模块。

results matching ""

    No results matching ""