消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。它可以实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。
消息队列在电商系统、消息通讯、日志收集等应用中扮演着关键作用,以阿里为例,其研发的消息队列(RocketMQ)在历次天猫 “双十一” 活动中支撑了万亿级的数据洪峰,为大规模交易提供了有力保障。
作为提升应用性能的重要手段,分布式消息队列技术在互联网领域得到了越来越广泛的关注 。本文将介绍四种常用的分布式消息队列开源软件:Kafka、ActiveMQ、RabbitMQ 及 RocketMQ。
在分布式消息队列的江湖里,Kafka 凭借其优秀的性能占据重要一席。它最初由 LinkedIn 公司开发,Linkedin 于 2010 年贡献给了 Apache基金会,之后成为顶级开源项目。
关于 Kafka,网上有很多介绍,经过不断地复制、洗稿、演绎后,难免背离原意,因此,我们还是来看一下官网给出的定义:
Apache Kafka is a distributed streaming platform.
Kafka 作为流平台具有以下三种能力:
Kafka 适用于两类应用:
kafka 包含四种核心 API。
作为一种高吞吐量的分布式发布订阅消息系统,Kafka 具有如下特性:
操作系统
环境要求
如上图所示,一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 Broker(Kafka 支持水平扩展,一般 Broker 数量越多,集群吞吐率越高),若干 Consumer(Group),以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 Leader,以及在 Consumer Group 发生变化时进行 Rebalance。Producer 使用 Push(推)模式将消息发布到 Broker,Consumer 使用 Pull(拉)模式从 Broker 订阅并消费消息。
各个名词的解释请见下表:
Kafka 高可用性的保障来源于其健壮的副本(Replication)策略。为了提高吞吐能力,Kafka 中每一个 Topic 分为若干 Partitions;为了保证可用性,每一个 Partition 又设置若干副本(Replicas);为了保障数据的一致性,Zookeeper 机制得以引入。基于 Zookeeper,Kafka 为每一个 Partition 找一个节点作为 Leader,其余备份作为 Follower,只有 Leader 才能处理客户端请求,而 Follower 仅作为副本同步 Leader 的数据,如下示意图:TopicA 分为两个 Partition,每个 Partition 配置两个副本。
基于上图的架构,当 Producer Push 的消息写入 Partition(分区) 时,Leader 所在的 Broker(Kafka 节点)会将消息写入自己的分区,同时还会将此消息复制到各个 Follower,实现同步。如果某个 Follower 挂掉,Leader 会再找一个替代并同步消息;如果 Leader 挂了,将会从 Follower 中选举出一个新的 Leader 替代,继续业务,这些都是由 ZooKeeper 完成的。
优点主要包括以下几点:
缺点主要有:
ActiveMQ 是 Apache 下的一个子项目。之所以把它放在第二位介绍,是因为它官网上的说明:
Apache ActiveMQ is the most popular and powerful open source messaging and Integration Patterns server.
居然没有“之一”,不太谦虚呀,放在第二位,以示“诫勉”。
ActiveMQ 由 Apache 出品,据官网介绍,它是最流行和最强大的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,非常快速,支持多种语言的客户端和协议,而且可以非常容易地嵌入到企业的应用环境中,并有许多高级功能。
ActiveMQ 基于 Java 语言开发,目前最新版本为 5.1.5.6。
ActiveMQ 的特点,官网在 Features 一栏中做了非常详细的说明,我做了下翻译,如下:
相较于 Kafka,ActiveMQ 的部署简单很多,支持多个版本的 Windows 和 Unix 系统,此外,ActiveMQ 由 Java 语言开发而成,因此需要 JRE 支持。
硬件要求
操作系统
环境要求
JAVA_HOME
;ActiveMQ 的主体架构如下图所示。
传输协议: 消息之间的传递,无疑需要协议进行沟通,启动一个 ActiveMQ 便打开一个监听端口。ActiveMQ 提供了广泛的连接模式,主要包括 SSL、STOMP、XMPP。ActiveMQ 默认的使用协议为 OpenWire,端口号为 61616。
通信方式: ActiveMQ 有两种通信方式,Point-to-Point Model(点对点模式),Publish/Subscribe Model (发布/订阅模式),其中在 Publich/Subscribe 模式下又有持久化订阅和非持久化订阅两种消息处理方式。
消息存储: 在实际应用中,重要的消息通常需要持久化到数据库或文件系统中,确保服务器崩溃时,信息不会丢失。
Cluster(集群): 最常见到集群方式包括 Network of Brokers 和 Master Slave。
Monitor(监控): ActiveMQ 一般由 JMX 进行监控。
默认配置下的 ActiveMQ 只适合学习而不适用于实际生产环境,ActiveMQ 的性能需要通过配置挖掘,其性能提高包括代码级性能、规则性能、存储性能、网络性能以及多节点协同方法(集群方案),所以我们优化 ActiveMQ 的中心思路也是这样的:
在生产环境中,ActiveMQ 集群的部署方式主要有下面两种。
在生产环境中,高可用(High Availability,HA)可谓 “刚需”, ActiveMQ 的高可用性架构基于 Master/Slave 模型。ActiveMQ 总共提供了四种配置方案来配置 HA,其中 Shared Nothing Master/Slave 在 5.8 版本之后不再使用了,并在 ActiveMQ 5.9 版本中引入了基于 Zookeeper 的 Replicated LevelDB Store HA 方案。
关于几种 HA 方案的详细介绍,读者可查看官网说明,在此,我仅做简单介绍。
方案一:Shared Nothing Master/Slave
这是一种最简单最典型的 Master-Slave 模式,Master 与 Slave 有各自的存储系统,不共享任何数据。“Shared Nothing” 模式有很多局限性,存在丢失消息、“双主”等问题。目前,在要求严格的生产环境中几乎没有应用,是一种趋于淘汰的方案,因此,本文就不作介绍了。
方案二:Shared Storage Master/Slave
这是很常用的一种架构。“共享存储”意味着 Master 与 Slave 之间的数据是共享的。为了实现数据共享,有两种方式:
(1)Shared File System Master/Slaves
这是基于共享文件系统的 Master/Slaves 模式。此处所谓的“共享文件系统”目前只能是基于 POSIX 接口可以访问的文件系统,比如本地文件系统或者 SAN 分布式共享文件系统(如 glusterFS)。对于 Broker 而言,启动时将会首先获取存储引擎的文件锁,如果获取成功才能继续初始化 transportConnector,否则它将一直尝试获取锁(tryLock),这对于共享文件系统而言,需要严格确保任何时候只能有一个进程获取排他锁。如果你选择的 SAN 文件系统不能保证此条件,那么将不能作为 Master/Slavers 的共享存储引擎。
“Shared File System”这种方式是最常用的模式,架构简单,可靠实用。我们只需要一个 SAN 文件系统即可。
(2)JDBC Store Master/Slaves
显而易见,数据存储引擎为 Database,ActiveMQ 通过 JDBC 方式与 Database 交互,排他锁使用 Database 的表级排他锁。JDBC Store 相对于日志文件而言,通常被认为是低效的,尽管数据的可见性较好,但是 Database 的扩容能力非常弱,无法良好地适应高并发、大数据情况(严格来说,单组 M-S 架构是无法支持大数据的),况且 ActiveMQ 的消息通常存储时间较短,频繁地写入,频繁地删除,都是性能的影响点。我们通常在研究 ActiveMQ 存储原理时使用 JDBC Store,或者在对数据一致性(可靠性、可见性)要求较高的中小型应用环境中使用,比如订单系统中交易流程支撑系统等。但由于 JDBC 架构实施简便,易于管理,我们仍然倾向于首选这种方式。
在使用 JDBC Store 之前,必须有一个稳定的 Database,且为 AcitveMQ 中的链接用户授权“创建表”和普通
CRUD 的权限。Master 与 Slave 中的配置文件基本一样,开发者需要注意 brokerName 和 brokerId
全局不可重复。此外还需要把相应的 jdbc-connector 的 Jar 包复制到 ${acitvemq}/lib/optional
目录下。
方案三: Replicated LevelDB Store
基于复制的 LevelDB Store,是 ActiveMQ 最新的 HA 方案,在 5.9+ 版本中获得支持。相较于方案二中的两种“Shared Storage”模式,本方案在存储和通讯机制上,更符合“Master-Slave”模型。
“Replicated LevelDB”同样允许有多个 Slaves,而且 Slaves 的个数有了约束性的限制,这归结于其使用 ZooKeeper 选举 Master。要进行选举,则需要多数派的“参与者”。因为 Replicated LevelDB Store 中有多个 Broker,从多个 Broker 中选举出一个成为 Master,其他的则成为 Slave。只有 Master 接收 Client 的连接,Slave 负责连接到 Master,并接收(同步方式、异步方式)Master 上的数据。每个 Broker 实例将消息数据保存本地(类似于“Shared Nothing”),它们之间并不共享任何数据,因此,某种意义上把“Replicated LevelDB”归类为“Shared Storage”并不妥当。
特别说明:ActiveMQ 官网警告,LevelDB 不再作为推荐的存储方案,取而代之的是 KahaDB。
在 2.5 节中介绍的几种 HA 方案,本质上都只有一个 Master 节点,无法满足高并发、大吞吐量的商用场景,因此,ActiveMQ 官方推出了 “网桥”架构模式,即真正的“分布式消息队列”。该模式可应对大规模 Clients、高密度的消息增量的场景;它以集群的模式,承载较大数据量的应用。
如上图所示,集群由多个子 Groups 构成,每个 Group 为 M-S 模式、共享存储;多个 Groups 之间基于“Network Connector”建立连接(Master-Slave 协议),通常为双向连接,所有的 Groups 之间彼此相连,Groups 之间形成“订阅”关系,比如 G2 在逻辑上为 G1 的订阅者(订阅的策略是根据各个 Broker 上消费者的 Destination 列表进行分类),消息的转发原理也基于此。对于 Client 而言,仍然支持 Failover,Failover 协议中可以包含集群中“多数派”的节点地址。
Topic 订阅者的消息,将会在所有 Group 中复制存储,对于 Queue 的消息,将会在 Brokers 之间转发,并最终到达 Consumer 所在的节点。
Producers 和 Consumers 可以与任何 Group 中的 Master 建立连接并进行消息通信,当 Brokers 集群拓扑变化时,Producers 或 Consumers 的个数变化时,将会动态平衡 Clients 的连接位置。Brokers 之间通过“Advisory”机制来同步 Clients 的连接信息,比如新的 Consumers 加入,Broker 将会发送 Advisory 消息(内部的通道)通知其他 Brokers。
集群模式提供了较好的可用性担保能力,在某些特性上或许需要权衡,比如 Queue 消息的有序性将会打破,因为同一个 Queue 的多个 Consumer 可能位于不同的 Group 上,如果某个 Group 实现,那么保存在其上的消息只有当其恢复后才能对 Clients 可见。
“网络转发桥”集群模式,构建复杂,维护成本高,可以在生产环境中使用。
优点主要有以下几点。
其缺点主要有以下几点:
RabbitMQ 是由 RabbitMQ Technologies Ltd 开发并提供技术支持的开源软件。该公司在 2010 年 4 月被 SpringSource(VMWare 的一个部门)收购。在 2013 年 5 月被并入 Pivotal。事实上 VMWare、Pivotal 和 EMC 同属一家,不同的是 VMWare 是独立上市子公司,而 Pivotal 整合了 EMC 的某些资源,现在并没有上市。
RabbitMQ 是流行的开源消息队列系统,最新版本为 3.7.8。RabbitMQ 是 AMQP(Advanced Message Queuing Protocol)的标准实现。支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX、持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 采用 Erlang 语言开发。Erlang 是一种面向并发运行环境的通用编程语言。该语言由爱立信公司在 1986 年开始开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。Erlang 问世于 1987 年,经过十年的发展,于 1998 年发布开源版本。
Erlang 是一个结构化、动态类型编程语言,内建并行计算支持。使用 Erlang 编写出的应用运行时通常由成千上万个轻量级进程组成,并通过消息传递相互通讯。进程间上下文切换对于 Erlang 来说仅仅只是一两个环节,比起 C 程序的线程切换要高效得多。Erlang 运行时环境是一个虚拟机,有点像 Java 虚拟机,这样代码一经编译,同样可以随处运行。它的运行时系统甚至允许代码在不被中断的情况下更新。另外字节代码也可以编译成本地代码运行。
根据官方介绍,RabbitMQ 是部署最广泛的消息代理,有以下特点:
综上所述,RabbitMQ 是一个“体系较为完善”的消息代理系统,性能好、安全、可靠、分布式,支持多种语言的客户端,且有专门的运维管理工具。
RabbitMQ 支持多个版本的 Windows 和 Unix 系统,此外,ActiveMQ 由 Erlang 语言开发而成,因此需要 Erlang 环境支持。某种意义上,RabbitMQ 具有在所有支持 Erlang 的平台上运行的潜力,从嵌入式系统到多核心集群还有基于云端的服务器。
操作系统
环境要求:
根据官方文档说明,RabbitMQ 的架构图如下所示:
接下来解释几个重要的概念。
消息队列的使用过程如下:
有三种类型的 Exchange,即 Direct、Fanout、Topic,每个实现了不同的路由算法(Routing Algorithm)。
Direct Exchange: 完全根据 Key 投递。如果 Routing Key 匹配,Message 就会被传递到相应的 Queue 中。其实在 Queue 创建时,它会自动地以 Queue 的名字作为 Routing Key 来绑定 Exchange。例如,绑定时设置了 Routing Key 为“abc”,那么客户端提交的消息,只有设置了 Key为“abc”的才会投递到队列中。
Fanout Exchange: 该类型 Exchange 不需要 Key。它采取广播模式,一个消息进来时,便投递到与该交换机绑定的所有队列中。
Topic Exchange: 对 Key 进行模式匹配后再投递。比如符号“#”匹配一个或多个词,符号“.”正好匹配一个词。例如“abc.#”匹配“abc.def.ghi”,“abc.”只匹配“abc.def”。
就分布式系统而言,实现高可用(High Availability,HA)的策略基本一致,即副本思想,当主节点宕机之后,作为副本的备节点迅速“顶上去”继续提供服务。此外,单机的吞吐量是极为有限的,为了提升性能,通常都采用“人海战术”,也就是所谓的集群模式。
RabbitMQ 集群配置方式主要包括以下几种。
RabbitMQ 节点类型有以下几种。
问题说明:RabbitMQ 要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知给至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行操作(增删改查),直到节点恢复。
解决方案:设置两个磁盘节点,至少有一个是可用的,可以保存元数据的更改。
Erlang Cookie
Erlang Cookie 是保证不同节点可以相互通信的密钥,要保证集群中的不同节点相互通信必须共享相同的
Erlang Cookie。具体的目录存放在 /var/lib/rabbitmq/.erlang.cookie
。
它的起源要从 rabbitmqctl 命令的工作原理说起。RabbitMQ 底层基于 Erlang 架构实现,所以 rabbitmqctl 会启动 Erlang 节点,并基于 Erlang 节点使用 Erlang 系统连接 RabbitMQ 节点,在连接过程中需要正确的 Erlang Cookie 和节点名称,Erlang 节点通过交换 Erlang Cookie 以获得认证。
镜像队列
RabbitMQ 的 Cluster 集群模式一般分为两种,普通模式和镜像模式。
优点主要有以下几点:
缺点主要有:
RocketMQ 由阿里研发团队开发的分布式队列,侧重于消息的顺序投递,具有高吞吐量、可靠性等特征。RocketMQ 于 2013 年开源,2016 年捐赠给 Apache 软件基金会,并于 2017 年 9 月成为 Apache 基金会的顶级项目。
RocketMQ 用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的改进,在消息可靠性上比 Kafka 更好,目前最新版本为 4.3.1。RocketMQ 已经被业界多个大型互联网公司采用。
在阿里内部,RocketMQ 很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转(在 2017 年的双 11 当天,整个阿里巴巴集团通过 RocketMQ 流转的线上消息达到了万亿级,峰值 TPS 达到 5600 万),在阿里大中台策略上发挥着举足轻重的作用。
RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:
操作系统
安装环境
RocketMQ 是一个具有高性能、高可靠、低延迟、分布式的万亿级容量,且可伸缩的分布式消息和流平台。它由 Name Servers、Brokers、 Producers 和 Consumers 四个部分组成。其架构如下图所示(取自官网)。
NameServer 集群
NameServer 是一个功能齐全的服务器,其角色类似 Kafka 中的 ZooKeeper,支持 Broker 的动态注册与发现。主要包括两个功能:
NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线,Broker 仍然可以向其它 NameServer 同步其路由信息,Produce、Consumer 仍然可以动态感知 Broker 的路由信息。
Broker 集群
Broker 主要负责消息的存储、投递、查询以及服务高可用保证。为了实现这些功能 Broker 包含了以下几个重要子模块。
Producer 集群
充当消息生产者的角色,支持分布式集群方式部署。Producers 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递。投递的过程支持快速失败并且低延迟。
Consumer 集群
充当消息消费者的角色,支持分布式集群方式部署。支持以 Push、pull 两种模式对消息进行消费。同时也支持集群方式和广播形式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
毫无悬念,RocketMQ 实现高可用(HA)的方案仍然是基于最淳朴的“副本思想”,但与 Kafka、Redis、Etcd 采用的副本机制有所不同:RocketMQ 的 Master 和 Slave 没有 Election 机制,也没有 Failover 机制。
RocketMQ 不具备选举功能,在集群模式下,Master、Slave 角色需预先设置,是固定的;Master 与 Slave 配对是通过指定相同的 brokerName 参数来实现,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。一个 Master 下面可以挂载多个 Slave,同一个 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。当 Master 节点宕机后,消费者仍然可以从 Slave 消费,从而保证生产者已经 Push 的消息不丢失;由于该 Master 宕机,生产者将消息 Push 到其它 Master,不影响可用性。RocketMQ 的 Broker 有 4 种部署方式。
1. 单个 Master 模式
除了配置简单,没什么优点。
它的缺点是不可靠。该机器重启或宕机,将导致整个服务不可用,因此,生产环境几乎不采用这种方案。
2. 多个 Master 模式
配置简单,性能最高,是它的优点。
它的缺点是:可能会有少量消息丢失(异步刷盘丢失少量消息,同步刷盘不丢失),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性。
特别说明:当使用多 Master 无 Slave 的集群搭建方式时,Master 的 brokerRole 配置必须为 ASYNC_MASTER
。如果配置为
SYNC_MASTER
,则 producer 发送消息时,返回值的 SendStatus 会一直是 SLAVE_NOT_AVAILABLE
。
3. 多 Master 多 Slave 模式:异步复制
其优点为:即使磁盘损坏,消息丢失得非常少,消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样。
它的缺点为:Master 宕机或磁盘损坏时会有少量消息丢失。
4. 多 Master 多 Slave 模式:同步双写
其优点为:数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
其缺点为:性能比异步复制模式稍低,大约低 10% 左右,发送单个消息的 RT 会稍高,目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
优点主要包括以下几点。
缺点主要有:
目前,消息队列相关的开源软件非常多,本文仅介绍了生产环境中最常见的 4 种。这些消息队列各有所长,没有哪一种消息队列具备 “一统江湖”的优势,某种程度上,增加了选型的难度。不像分布式缓存和分布式锁,Redis、Etcd 具备“绝对”优势,选型无需纠结。
所谓实践是检验真理的唯一标准,实际应用中的表现比文字更具说服力。在 RocketMQ 官方文档中,关于 RocketMQ 的研发背景是这样说的:在我们的研究中,随着使用 Queue 和 Topic 的增加,ActiveMQ IO 模块很快达到了瓶颈。我们试图通过节流、断路器或降级来解决这个问题,但效果不佳。所以我们开始关注当时流行的消息解决方案 Kafka。不幸的是,Kafka 不能满足我们的要求,特别是在低延迟和高可靠性方面。
简而言之,ActiveMQ 和 Kafka 的性能都不能满足阿里的超大规模应用场景。在此背景下,阿里自研了 RocketMQ,并捐赠给了开源社区,目前有超过 100 家企业在使用其开源版本。关于 ActiveMQ 、Kafka 以及 RocketMQ 的比较如下所示(取自 RocketMQ 官网文档):
消息队列利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq 等,也有直接使用数据库 Redis 充当消息队列的案例。而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
以下是四种消息队列的差异对比(图片源地址):