9.1.2 使用 JmsTemplate 发送消息

在构建中有 JMS starter 依赖(无论 Artemis 还是 ActiveMQ),Spring Boot 将会自动配置 JmsTemplate,这样就可以将其注入并使用它发送和接收消息了。

JmsTemplate 是 Spring JMS 集成支持的核心。与 Spring 的其他面向模板的组件非常相似,JmsTemplate 消除了大量与 JMS 协同工作所需的样板代码。如果没有 JmsTemplate,将需要编写代码来创建与消息代理的连接和会话,并编写更多代码来处理在发送消息过程中可能抛出的任何异常。JmsTemplate 专注于真正想做的事情:发送消息。

JmsTemplate 有几个发送消息的有用方法,包括:

// 发送原始消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator)
                            throws JmsException;
void send(String destinationName, MessageCreator messageCreator)
                            throws JmsException;
// 发送转换自对象的消息
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message)
                            throws JmsException;
void convertAndSend(String destinationName, Object message)
                            throws JmsException;
// 发送经过处理后从对象转换而来的消息
void convertAndSend(Object message,
                MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message,
                MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message,
                MessagePostProcessor postProcessor) throws JmsException;

实际上只有两个方法,send()convertAndSend(),每个方法都被重载以支持不同的参数。如果仔细观察,会发现 convertAndSend() 的各种形式可以分为两个子类。在试图理解所有这些方法的作用时,请考虑以下细分:

  • send() 方法需要一个 MessageCreator 来制造一个 Message 对象。
  • convertAndSend() 方法接受一个 Object,并在后台自动将该 Object 转换为一条 Message。
  • 三种 convertAndSend() 方法会自动将一个 Object 转换成一条 Message,但也会接受一个 MessagePostProcessor,以便在 Message 发送前对其进行定制。

此外,这三个方法类别中的每一个都由三个重载的方法组成,它们是通过指定 JMS 目的地(队列或主题)的方式来区分的:

  • 一个方法不接受目的地参数,并将消息发送到默认目的地。
  • 一个方法接受指定消息目的地的 Destination 对象。
  • 一个方法接受一个 String,该 String 通过名称指定消息的目的地。

要使这些方法工作起来,请考虑下面程序清单中的 JmsOrderMessagingService,它使用 send() 方法的最基本形式。

程序清单 9.1 使用 .send() 发送订到到默认目的地

package tacos.messaging;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;


@Service
public class JmsOrderMessagingService implements OrderMessagingService {
  private JmsTemplate jms;

  @Autowired
  public JmsOrderMessagingService(JmsTemplate jms) {
    this.jms = jms;
  }

  @Override
  public void sendOrder(TacoOrder order) {
    jms.send(new MessageCreator() {
      @Override
      public Message createMessage(Session session)
        throws JMSException {
          return session.createObjectMessage(order);
        }
      }
    );
  }
}

sendOrder() 方法调用 jms.send(),传递 MessageCreator 的匿名内部类实现。该实现重写 createMessage() 以从给定的 Order 对象创建新的对象消息。

因为特定于 JMS 的 JMSOrdMessageService 实现了更通用的 OrderMessagingService 接口,我们可以通过将此服务注入 OrderApicController 并在创建订单时调用 sendOrder()

@RestController
@RequestMapping(path="/api/orders",
        produces="application/json")
@CrossOrigin(origins="*")
public class OrderApiController {

private OrderRepository repo;
private OrderMessagingService messageService;

  public OrderApiController(
            OrderRepository repo,
            OrderMessagingService messageService) {
    this.repo = repo;
    this.messageService = messageService;
  }

  @PostMapping(consumes="application/json")
  @ResponseStatus(HttpStatus.CREATED)
  public TacoOrder postOrder(@RequestBody TacoOrder order) {
    messageService.sendOrder(order);
    return repo.save(order);
  }

  ...

}

现在,当您通过 Taco Cloud 网站创建订单时,应向代理消息,以便其他应用程序接收订单信息。不过我们的程序还没法接收消息,如果想收到这条信息。您也可以使用 Artemis 控制台查看消息队列数据。请参阅 Artemis 文档 https://activemq.apache.org/components/artemis/documentation/latest/management-console.html 以了解如何执行此操作的详细信息。

我不太清楚您怎么想,但我认为清单 9.1 中的代码虽然简单,但有点笨拙。代码中包括声明匿名内部类,但只是提供给另一个类进行简单的方法调用。如果发现 MessageCreator 是一个函数式接口,您可以使用 lambda 表达式使 sendOrder() 方法看起来更简洁:

@Override
public void sendOrder(TacoOrder order) {
  jms.send(session -> session.createObjectMessage(order));
}

但是请注意,对 jms.send() 的调用没有指定目的地。为了实现这一点,还必须使用 spring.jms.template.default-destination 属性指定一个默认的目的地名称。例如,可以在 application.yml 中设置属性:

spring:
  jms:
    template:
      default-destination: tacocloud.order.queue

在许多情况下,使用缺省目的地是最简单的选择。它让您指定一次目的地名称,允许代码只关心发送消息,而不关心消息被发送到哪里。但是,如果需要将消息发送到缺省目的地之外的目的地,则需要将该目的地指定为 send() 方法的参数。

一种方法是传递目标对象作为 send() 的第一个参数。最简单的方法是声明一个 Destination bean,然后将其注入执行消息传递的 bean。例如,下面的 bean 声明了 Taco Cloud 订单队列 Destination:

public Destination orderQueue() {
    return new ActiveMQQueue("tacocloud.order.queue");
}

这个 bean 方法可以添加到,需要通过 JMS 接收发送消息的应用程序的任何配置类中。为了便于分类管理,最好将其添加到专门的消息配置类中,例如 MessagingConfig。

需要注意的是,这里使用的 ActiveMQQueue 实际上来自于 Artemis(来自 org.apache.activemq.artemis.jms.client 包)。如果正在使用 ActiveMQ(而不是 Artemis),那么还有一个名为 ActiveMQQueue 的类(来自 org.apache.activemq.command 包)。

如果这个 Destination bean 被注入到 JmsOrderMessagingService 中,那么可以在调用 send() 时使用它来指定目的地:

private Destination orderQueue;

@Autowired
public JmsOrderMessagingService(JmsTemplate jms,
            Destination orderQueue) {
  this.jms = jms;
  this.orderQueue = orderQueue;
}

...

@Override
public void sendOrder(TacoOrder order) {
  jms.send(
    orderQueue,
    session -> session.createObjectMessage(order));
}

使用类似这样的 Destination 对象指定目的地,使您有机会配置 Destination,而不仅仅是目的地的名称。但是在实践中,几乎只指定了目的地名称,将名称作为 send() 的第一个参数通常更简单:

@Override
public void sendOrder(TacoOrder order) {
  jms.send(
    "tacocloud.order.queue",
    session -> session.createObjectMessage(order));
}

虽然 send() 方法并不是特别难以使用(特别是当 MessageCreator 以 lambda 形式给出时),但是提供 MessageCreator 还是会增加一些复杂性。如果只需要指定要发送的对象(以及可选的目的地),不是会更简单吗?这简要地描述了 convertAndSend() 的工作方式,让我们来看看。

在发送前转换消息

JmsTemplates 的 convertAndSend() 方法不需要提供 MessageCreator,从而简化了消息发布。相反,将要直接发送的对象传递给 convertAndSend(),在发送之前会将该对象转换为消息。

例如,sendOrder() 的以下新实现使用 convertAndSend() 将 Order 发送到指定的目的地:

@Override
public void sendOrder(TacoOrder order) {
  jms.convertAndSend("tacocloud.order.queue", order);
}

send() 方法一样,convertAndSend() 将接受 Destination 或 String 值来指定目的地,或者可以完全忽略目的地来将消息发送到默认目的地。

无论选择哪种形式的 convertAndSend(),传递给 convertAndSend() 的 Order 都会在发送之前转换为消息。实际上,这是通过 MessageConverter 实现的,它完成了将对象转换为消息的复杂工作。

配置消息转换器

MessageConverter 是 Spring 定义的接口,它只有两个用于实现的方法:

public interface MessageConverter {
  Message toMessage(Object object, Session session)
            throws JMSException, MessageConversionException;
  Object fromMessage(Message message)
}

这个接口的实现很简单,都不需要创建自定义实现。Spring 已经提供了一些有用的实现,就像表 9.3 中描述的那样。

表 9.3 用于常见转换任务的 Spring 消息转换器(全部在 org.springframework.jms.support.converter 包中)

消息转换器 功能
MappingJackson2MessageConverter 使用 Jackson 2 JSON 库对消息进行与 JSON 的转换
MarshallingMessageConverter 使用 JAXB 对消息进行与 XML 的转换
MessagingMessageConverter 使用底层 MessageConverter(用于有效负载)和JmsHeaderMapper(用于将 Jms 信息头映射到标准消息标头)将 Message 从消息传递抽象转换为 Message,并从 Message 转换为 Message
SimpleMessageConverter 将 String 转换为 TextMessage,将字节数组转换为 BytesMessage,将 Map 转换为 MapMessage,将Serializable 转换为 ObjectMessage

SimpleMessageConverter 是默认的消息转换器,但是它要求发送的对象实现 Serializable 接口。这样要求可能还不错,但是可能更喜欢使用其他的消息转换器,如 MappingJackson2MessageConverter,来避免上述限制。

为了应用不同的消息转换器,需要做的是将选择的转换器声明为一个 bean。例如,下面这个 bean 声明将会使用 MappingJackson2MessageConverter 而不是 SimpleMessageConverter:

@Bean
public MappingJackson2MessageConverter messageConverter() {
  MappingJackson2MessageConverter messageConverter =
                  new MappingJackson2MessageConverter();
  messageConverter.setTypeIdPropertyName("_typeId");
  return messageConverter;
}

注意一下,您在返回 MappingJackson2MessageConverter 之前调用了 setTypeIdPropertyName()。这是非常重要的,因为它使接收者知道要将传入消息转换成什么类型。默认情况下,它将包含被转换类型的完全限定类名。但这有点不灵活,要求接收方也具有相同的类型,具有相同的完全限定类名。

为了实现更大的灵活性,可以通过调用消息转换器上的 setTypeIdMappings() 将合成类型名称映射到实际类型。例如,对消息转换器 bean 方法的以下更改将合成订单类型 ID 映射到 Order 类:

@Bean
public MappingJackson2MessageConverter messageConverter() {
  MappingJackson2MessageConverter messageConverter =
                new MappingJackson2MessageConverter();
  messageConverter.setTypeIdPropertyName("_typeId");

  Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
  typeIdMappings.put("order", TacoOrder.class);
  messageConverter.setTypeIdMappings(typeIdMappings);

  return messageConverter;
  }

与在消息的 _typeId 属性中发送完全限定的类名不同,将发送值 order。在接收应用程序中,将配置类似的消息转换器,将 order 映射到它自己对 order 的理解。订单的实现可能在不同的包中,有不同的名称,甚至有发送者 Order 属性的一个子集。

后期处理消息

让我们假设,除了利润丰厚的网络业务,Taco Cloud 还决定开几家实体 Taco 连锁店。考虑到他们的任何一家餐馆也可以成为 web 业务的执行中心,他们需要一种方法来将订单的来源传达给餐馆的厨房。这将使厨房工作人员能够对商店订单采用与网络订单不同的流程。

在 Order 对象中添加一个新的 source 属性来携带此信息是合理的,可以用 WEB 来填充在线订单,用 STORE 来填充商店中的订单。但这将需要更改网站的 Order 类和厨房应用程序的 Order 类,而实际上,这些信息只需要为 taco 准备人员提供。

更简单的解决方案是在消息中添加一个自定义头信息,以承载订单的源。如果正在使用 send() 方法发送 taco 订单,这可以通过调用消息对象上的 setStringProperty() 轻松实现:

jms.send("tacocloud.order.queue",
  session -> {
    Message message = session.createObjectMessage(order);
    message.setStringProperty("X_ORDER_SOURCE", "WEB");
  });

这里的问题是没有使用 send()。通过选择使用 convertAndSend(),Message 对象是在幕后创建的,并且不能访问它。

幸运的是,有一种方法可以在发送消息之前调整在幕后创建的 Message。通过将 MessagePostProcessor 作为最后一个参数传递给 convertAndSend(),可以在消息创建之后对其进行任何操作。下面的代码仍然使用 convertAndSend(),但是它也使用 MessagePostProcessor 在消息发送之前添加 X_ORDER_SOURCE 头信息:

jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {
  @Override
  public Message postProcessMessage(Message message) throws JMSException {
    message.setStringProperty("X_ORDER_SOURCE", "WEB");
    return message;
  }
});

可能注意到了 MessagePostProcessor 是一个函数接口,这意味着可以使用 lambda 将其简化为匿名内部类:

jms.convertAndSend("tacocloud.order.queue", order,
  message -> {
    message.setStringProperty("X_ORDER_SOURCE", "WEB");
    return message;
  });

尽管只需要这个特定的 MessagePostProcessor 来处理对 convertAndSend() 的调用,但是可能会发现自己使用同一个 MessagePostProcessor 来处理对 convertAndSend() 的几个不同调用。在这些情况下,也许方法引用是比 lambda 更好的选择,避免了不必要的代码重复:

@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
  TacoOrder order = buildOrder();
  jms.convertAndSend("tacocloud.order.queue", order,
        this::addOrderSource);
  return "Convert and sent order";
}

private Message addOrderSource(Message message) throws JMSException {
  message.setStringProperty("X_ORDER_SOURCE", "WEB");
  return message;
}

已经看到了几种发送消息的方法。但是,如果没有人收到信息,就没有什么用处。让我们看看如何使用 Spring 和 JMS 接收消息。

results matching ""

    No results matching ""