9.3.3 编写 Kafka 监听器

除了 send()sendDefault() 的惟一方法签名之外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。

对于 Kafka,消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener@RabbitListener,其使用方式大致相同。下面程序清单显示了为 Kafka 编写的基于 listener 的订单接收程序。

程序清单 9.9 使用 @KafkaListener 接收订单

package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;

@Component
public class OrderListener {

  private KitchenUI ui;

  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }

  @KafkaListener(topics="tacocloud.orders.topic")
  public void handle(TacoOrder order) {
    ui.displayOrder(order);
  }
}

handle() 方法由 @KafkaListener 注解,表示当消息到达名为 tacocloud.orders.topic 的主题时应该调用它。正如程序清单 8.9 中所写的,只为 handle() 方法提供了一个 Order(payload)参数 。但是,如果需要来自消息的其他元数据,它也可以接受一个 ConsumerRecord 或 Message 对象。

例如,handle() 的以下实现接受一个 ConsumerRecord,这样就可以记录消息的分区和时间戳:

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(
    TacoOrder order, ConsumerRecord<String, TacoOrder> record) {
  log.info("Received from partition {} with timestamp {}",
        record.partition(), record.timestamp());

  ui.displayOrder(order);
}

类似地,可以使用 Message 而不是 ConsumerRecord,并达到同样的效果:

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
  MessageHeaders headers = message.getHeaders();
  log.info("Received from partition {} with timestamp {}",
    headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
    headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
  ui.displayOrder(order);
}

值得注意的是,消息有效负载也可以通过 ConsumerRecord.value()Message.getPayload() 获得。这意味着可以通过这些对象请求 Order,而不是直接将其作为 handle() 的参数。

results matching ""

    No results matching ""