JMS详解

8/18/2021 MQ

# JMS标准

# JMS概述

全称:Java Message Service 中文:Java 消息服务。

JMS 是 Java 的一套 API 标准,最初的目的是为了使应用程序能够访问现有的 MOM 系 统(MOM 是 Message Oriented Middleware 的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。) ;

后来被许多现有 的 MOM 供应商采用,并实现为 MOM 系统。【常见 MOM 系统包括 Apache 的 ActiveMQ、 阿里巴巴的 RocketMQ、IBM 的 MQSeries、Microsoft 的 MSMQ、BEA 的 RabbitMQ 等。 (并 非全部的 MOM 系统都遵循 JMS 规范)】

基于 JMS 实现的 MOM,又被称为 JMSProvider。

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串; 也可以更复杂,可能包含嵌入对象。 消息被发送到队列中。

“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。

队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

**消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。**所以主要的使 用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系, 也不需要受对方的影响,即解耦和。如: 跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电 话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。 多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调 用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和 接收方同时在线。 在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以 作为集成的方法。 应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应 用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步 通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。 消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处 理流程可以根据需要拆成多个阶段(Stage) ,阶段之间用队列连接起来,前一个阶段处理的 结果放入队列,后一个阶段从队列中获取消息继续处理。 应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。 跨局域网,甚至跨城市的通讯,比如北京机房与广州机房的应用程序的通信。

# 消息中间件应用场景

# 异步通信

​ 有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

# 缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。

# 解耦

降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

# 冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

# 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。

# 可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

# 顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

# 过载保护

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

# 数据流处理

分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

# 常用消息队列比较

特性MQ ActiveMQ RabbitMQ RocketMQ Kafka
生产者消费者模式 支持 支持 支持 支持
发布订阅模式 支持 支持 支持 支持
请求回应模式 支持 支持 不支持 不支持
Api完备性
多语言支持 支持 支持 java 支持
单机吞吐量 万级 万级 万级 十万级
消息延迟 微秒级 毫秒级 毫秒级
可用性 高(主从) 高(主从) 非常高(分布式) 非常高(分布式)
消息丢失 理论上不会丢失 理论上不会丢失
文档的完备性
提供快速入门
社区活跃度
商业支持 商业云 商业云

# JMS中的一些角色

# Broker

消息服务器,作为server提供消息核心服务,一个Server实例

# Producer

生产者

消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。

# Consumer

消费者

消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:

  • 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
  • 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

# p2p

基于点对点的消息模型

消息生产者生产消息发送到 queue 中,然后消息消费者从 queue 中取出并且消费消息。 消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它 的则不能消费此消息了。 当消费者不存在时,消息会一直保存,直到有消费消费

image-20200110192535698

# pub/sub

基于订阅/发布的消息模型

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。 和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。 当生产者发布消息,不管是否有消费者。都不会保存消息 一定要先有消息的消费者,后有消息的生产者。

image-20200110192613518

# P2P 和 PUB/SUB 简单对比

1 Topic Queue
Publish Subscribe messaging 发布 订阅消息 Point-to-Point 点对点
有无状态 topic 数据默认不落地,是无状态的。 Queue 数据默认会在 mq 服务器上以文件形式保存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面。也可以配置成 DB 存储。
完整性保障 并不保证 publisher 发布的每条数据,Subscriber 都能接受到。 Queue 保证每条数据都能被receiver 接收。消息不超时。
消息是否会丢失 一般来说 publisher 发布消息到某 一个 topic 时,只有正在监听该 topic 地址的 sub 能够接收到消息;如果没有 sub 在监听,该 topic 就丢失了。 Sender发送消息到目标Queue,receiver 可以异步接收这个 Queue 上的消息。Queue 上的 消息如果暂时没有 receiver 来取,也不会丢失。前提是消息不超时。
消息发布接 收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到 publisher 发送的消息。Sub 接收完 通知 mq 服务器 一对一的消息发布接收策 略,一个 sender 发送的消息,只能有一个 receiver 接收。 receiver 接收完后,通知 mq 服务器已接收,mq 服务器对 queue 里的消息采取删除或其他操作。

# Queue

队列存储,常用于点对点消息模型

默认只能由唯一的一个消费者处理。一旦处理消息删除。

# Topic

主题存储,用于订阅/发布消息模型

主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处理的业务场景中可使用。

Queue/Topic都是 Destination 的子接口

# ConnectionFactory

连接工厂,jms中用它创建连接

连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。

# Connection

JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。

# Destination

消息的目的地

目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。 点对点消息传递域的特点如下:

  • 每个消息只能有一个消费者。
  • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

发布/订阅消息传递域的特点如下:

  • 每个消息可以有多个消费者。
  • 生产者和消费者之间有时间上的相关性。
  • 订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求 。持久订阅允许消费者消费它在未处于激活状态时发送的消息。 在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

# Session

JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

# JMS的消息格式

# JMS消息由以下三部分组成的:

  • 消息头。

    每个消息头字段都有相应的getter和setter方法。

  • 消息属性。

    如果需要除消息头字段以外的值,那么可以使用消息属性。

  • 消息体。

    JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

# TextMessage

文本消息

# MapMessage

k/v

# BytesMessage

字节流

# StreamMessage

java原始的数据流

# ObjectMessage

序列化的java对象

# 消息可靠性机制

# 确认 JMS消息

只有在被确认之后,才认为已经被成功地消费了。

消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。

在事务性会话中,当一个事务被提交的时候,确认自动发生。

在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

  • Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
  • Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
  • Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。

# 持久性

JMS 支持以下两种消息提交模式:

  • PERSISTENT。指示JMS Provider持久保存消息,以保证消息不会因为JMS Provider的失败而丢失。
  • NON_PERSISTENT。不要求JMS Provider持久保存消息。

# 优先级

​ 可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。

# 消息过期

可以设置消息在一定时间后过期,默认是永不过期。

# 临时目的地

可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

# 持久订阅

​ 首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic,第二个参数是订阅的名称。 JMS Provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMS Provider会象客户发送客户处于非激活状态时所发布的消息。 持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。

# 本地事务

在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。 事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。 需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。 需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。