高性能万亿级消息吞吐 MQ Pulsar设计原理

肖哥弹架构 跟大家“弹弹” 高性能万亿级消息吞吐Pulsar设计原理
欢迎 点赞,点赞,点赞。

关注本人的公众号Solomon肖哥弹架构获取更多精彩内容

高性能万亿级消息吞吐 MQ Pulsar设计原理

1、MQ 基本介绍

1.1 核心概念

  • Topic:消息主题,一级消息类型,生产者向其发送消息。
  • 生产者:也称为消息发布者,负责生产并发送消息至 Topic。
  • 消费者:也称为消息订阅者,负责从 Topic 接收并消费消息。
  • 消息:生产者向 Topic 发送并最终传送给消费者的数据和(可选)属性的组合。
  • 消息属性:生产者可以为消息定义的属性,包含 Message Key 和 Tag。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

1.2 消息类型

  • 普通消息:消息队列 RocketMQ 版中无特性的消息,区别于有特性的定时/延时消息、顺序消息和事务消息。
  • 事务消息:实现类似 X/Open XA 的分布事务功能,以达到事务最终一致性状态。
  • 定时和延时消息:允许消息生产者对指定消息进行定时(延时)投递,最长支持 40 天。
  • 顺序消息:允许消息消费者按照消息发送的顺序对消息进行消费。

1.3 MQ应用场景

  • 交易系统
  • 红包秒杀
  • 关闭未支付订单
  • 用户信息注册
  • 在线抽奖设计
  • 任务系统设计
  • 跨私网数据交换
  • 跨区域数据同步
  • 异步解耦
  • 削峰填谷
  • 分布式事务消息
  • 大数据分析
  • 分布式模缓存同步
  • 异地多活
  • 异步通知
  • ……..

1.4 MQ消息模型

  • 点对点

  • 工作队列

  • 发布/订阅

  • 路由策略

  • 主题带过滤

  • RPC

1.5 MQ产品发展路线

1.6 MQ应用场景选型

1.7 MQ云原生场景

1.8 Pulsar MQ

Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。 Pulsar 最初由 Yahoo开发,目前由 Apache 软件基金会管理。

1.8.1 Pulsar 集群包含

  • 一组 Broker(服务发布 – 订阅流量)
    • Pulsar Broker 是负责接收和交付消息的组件
  • Bookie(消息存储)
    • Bookie 则是为最终消费前的消息提供持久存储的 Apache BookKeeper 服务器。
  • 一个负责整体协调和配置管理的 Apache ZooKeeper

1.8.2 Pulsar 的关键特性如下:

  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。

  • 极低的发布延迟和端到端延迟。

  • 可无缝扩展到超过一百万个 topic

  • 支持多种topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。

  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。

  • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。

  • 基于 Pulsar Functionsserverless connector 框架 Pulsar IO使得数据更易移入、移出 Apache Pulsar。

  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

1.8.3 pulsar 云原⽣架构

Pulsar 能够无缝扩容、延迟低、吞吐高,支持多租户和跨地域复制。最重要的是,Pulsar 存储、计算分离的架构能够完美解决 Kafka 扩缩容的问题。Pulsar producer 把消息发送给 broker,broker 通过 bookie client 写到第二层的存储 BookKeeper 上。

特点

  • 水平扩容:能够无缝扩容到成百上千个节点。

  • 高吞吐:已经在 Yahoo! 的生产环境中经受了考验,支持每秒数百万条消息的发布 – 订阅(Pub-Sub)。

  • 低延迟:在大规模的消息量下依然能够保持低延迟(小于 5 ms)。

  • 持久化机制:Pulsar 的持久化机制构建在 Apache BookKeeper 上,实现了读写分离。

  • 读写分离:BookKeeper 的读写分离 IO 模型极大发挥了磁盘顺序写性能,对机械硬盘相对比较友好,单台 bookie 节点支撑的 topic 数不受限制。

2、消息系统概述

2.1 Product 分块发送

当启用分块(chunking) 时(chunkingEnabled=true) ,如果消息大小大于允许的最大发布有效载荷大小,则 producer 将原始消息分割成分块的消息,并将它们与块状的元数据一起单独和按顺序发布到 broker

处理一个 producer 和一个订阅 consumer 的分块消息

如下图所示,当生产者向主题发送一批大的分块消息和普通的非分块消息时。 假设生产者发送的消息为 M1,M1 有三个分块 M1-C1,M1-C2 和 M1-C3。

  • 这个 broker 在其管理的ledger里面保存所有的三个块消息,然后以相同的顺序分发给消费者(独占/灾备模式)。
  • 消费者将在内存缓存所有的块消息,直到收到所有的消息块将这些消息合并成为原始的消息M1,发送给处理进程。

多个生产者和一个生产者处理块消息

如下所示,生产者1发布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三个块组成。 生产者2发布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三个块组成。 这些特定消息的所有分块是顺序排列的但是其在 ledger 里面可能不是连续的。 这种方式会给消费者带来一定的内存负担。因为消费者会为每个大消息在内存开辟一块缓冲区,以便将所有的块消息合并为原始的大消息。

2.2 Topic结构

topic的名称为符合良好结构的URL

{persistent(持久)|non-persistent(非持久)}://tenant(租户)/namespace(命令空间)/topic(主题)
复制代码
  • 持久与非持久

    • 主题默认持久化类型,非持久需要指定。
      • 对于持久化的主题,所有的消息都会被持久化的保存到磁盘当中(如果 broker 不是单机模式,消息会被持久化到多块磁盘)
    • 非持久化的主题的数据不会被保存到磁盘里面。
  • 租户

    • 实例中的主题租户,对于Pulsar中的多租户至关重要,并且分布在集群中。
  • 命名空间

    • 将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。
    • 大多数对 topic 的管理都是对命名空间的一项配置。
    • 每个租户里面可以有一个或者多个命名空间。

2.3 消息保留和过期

Pulsar broker默认如下:

  • 立即删除消费者已确认的所有消息
  • 以消息backlog的形式,持久保存所有的未被确认消息

Pulsar有两个特性,让你可以覆盖上面的默认行为。

  • 消息保留使您能够存储消费者已确认的消息
  • 消息到期允许为尚未确认的消息设置生存时间(TTL)

所有消息保留和过期都在[命名空间]中管理级别。

下图说明了这两种概念:

  • 图中上面的是消息存留

    • 存留规则会被用于某namespace下所有的topic,指明哪些消息会被持久存储,即使已经被确认过。 没有被留存规则覆盖的消息将会被删除。 如果没有保留策略,所有已确认的消息都将被删除
  • 图中下面的是消息过期

    • 有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)

2.4 消息去重

消息去重保证了一条消息只能在 Pulsar 服务端被持久化一次。 消息去重是一个 Pulsar 可选的特性,它能够阻止不必要的消息重复,它保证了即使消息被消费了多次,也只会被保存一次。

下图展示了开启和关闭消息去重的场景:

  • 最上面的场景中,消息去重被关闭。 Producer发布消息1到一个topic,消息到达broker后,被持久化到BookKeeper。 然后producer又发送了消息1(可能因为某些重试逻辑),然后消息被接收后又持久化在BookKeeper,这意味着消息重复发生了。

  • 在第二个场景中,producer发送了消息1,消息被broker接收然后持久化,和第一个场景是一样的。 当producer再次发送消息时,broker知道已经收到个消息1,所以不会再持久化消息1.

2.4.1 生产者幂等

另一种可用的消息重复数据消除方法是确保每条消息只生成一次。这种方法通常称为生产者幂等。这种方式的缺点是,把消息去重的工作推给了应用去做。 在 Pulsar 中,消息去重是在 broker上处理的,用户不需要去修改客户端的代码。 相反,你只需要通过修改配置就可以实现。

命名空间级别启用去重

$ bin/pulsar-admin namespaces set-deduplication \
  public/default \
  --enable # or just -e
复制代码

在命名空间级别禁用去重

$ bin/pulsar-admin namespaces set-deduplication \
  public/default \
  --disable # or just -d
复制代码

2.5 消息延迟传递

延时消息功能允许你能够过一段时间才能消费到这条消息,而不是消息发布后,就马上可以消费到。

如下图所示,说明了延时消息的实现机制:

Broker 保存消息是不经过任何检查的。 当消费者消费一条消息时,如果这条消息是延时消息,那么这条消息会被加入到DelayedDeliveryTracker当中。 订阅检查机制会从DelayedDeliveryTracker获取到超时的消息,并交付给消费者。

3、消息系统架构设计

单个 Pulsar 集群由以下三部分组成:

  • 一个或者多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务等。
  • 包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储
  • 一个Zookeeper集群,用来处理多个Pulsar集群之间的协调任务。

3.1 Pulsar 集群

下图为一个 Pulsar 集群

3.1.1 集群

Pulsar 集群由N个Pulsar 实例组成,包含了

  • 一个或者多个Pulsar brokers
  • 一个ZooKeeper协调器,用于集群级别的配置和协调
  • 一组BookKeeper的Bookies用于消息的持久化存储

集群间可以通过异地复制进行消息同步

3.1.2 元数据存储

Pulsar 使用 Zookeeper 存储元数据, 集群配置和协调信息。 包含了:

  • 配置与仲裁存储: 存储租户,命名域和其他需要全局一致的配置项
  • 每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的)等。

3.1.3 brokers持久化存储

bookies特征适合Pulsar的应用场景

  • 为按条目复制的顺序数据提供了非常高效的存储。
  • 保证了多系统挂掉时ledgers的读取一致性。
  • 提供不同的Bookies之间均匀的IO分布的特性。
  • Bookies被设计成可以承载数千的并发读写的ledgers
  • 使用多个磁盘设备,一个用于日志,另一个用于存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开

3.2 Pulsar服务发现机制

客户端能够使用单个 URL 与整个 Pulsar 实例进行通信。 Pulsar内部提供了服务发现的机制。

下面这张图展示了Pulsar服务发现机制:

4、高吞吐量设计

4.1 分区主题提升高吞吐

普通主题仅仅保存单个 broker中,这限制了主题的最大吞吐量分区主题是一种特殊类型的主题,由多个代理处理,因此允许更高的吞吐量。

分区主题实际是通过在底层拥有 N 个内部主题来实现,这个 N 的数量就是等于分区的数量。

  • 当向分区的topic发送消息,每条消息被路由到其中一个broker。 Pulsar自动处理跨broker的分区分布。

如上图:Topic1主题有五个分区(P0到P4)被分成三个代理。 因为分区多于broker数量,其中有两个broker要处理两个分区。第三个broker则只处理一个(分区的分布是Pulsar自动处理的)。

  • 这个topic的消息被广播给两个consumer。

    • 路由模式决定消息应该发送到哪个内部主题
    发送模式 Description
    RoundRobinPartition 如果消息没有指定 key,为了达到最大吞吐量,消息会以 round-robin 方式被路由所有分区。 请注意round-robin并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果为消息指定了key,发往分区的消息会被分区生产者根据 key 做 hash,然后分散到对应的分区上。 这是默认的模式。
    SinglePartition 如果消息没有指定 key,生产者将会随机选择一个分区,并发送所有消息。 如果为消息指定了key,发往分区的消息会被分区生产者根据 key 做 hash,然后分散到对应的分区上。
    CustomPartition 使用自定义消息路由,可以定制消息如何进入特定的分区。 可以使用 Java client 或实现MessageRouter 接口来实现自定义的路由模式。
    • 订阅模式确定消息应传递给哪个消费者
      • 分区topic和普通topic,对于订阅模式如何工作,没有任何不同

备注:

  • 路由和订阅模式可以分开制定。 吞吐能力的要求,决定了分区/路由的方式。

  • 分区只决定生产者的消息到消费者处理及确认消息过程中发生的事情。

注意:

  • 非持久性消息传递通常比持久性消息传递快,因为代理不会持久化消息,一旦消息传递到连接的代理,就会立即将ack发送回生产者。

5、跨机房复制

Pulsar 支持在不同的地理位置生产和消费消息。

​ 例如,应用程序在某一地区或市场中发布数据,但需要在其他地区或市场处理和消费数据。

5.1 跨域复制是怎样工作的

Pulsar 在不同集群之间跨地域复制的过程:

  • 在这个图中,每当P1、P2和P3生产者将消息分别发布到Cluster-A、Cluster-B和Cluster-C集群上的T1主题时,这些消息会立即在集群之间复制。一旦消息被复制,C1和C2消费者就可以从各自的集群中使用这些消息。如果没有地理复制,C1和C2使用者将无法使用P3生产者发布的消息

  • 在Pulsar中基于每个租户启用地理复制, 跨域复制是在命名空间级别管理的

5.2 本地存储和转发

  • 当消息发送到 Pulsar 的主题中,消息首先会被存储在本地的集群,然后再被异步转发到远程集群

    • 在正常情况下,消息会被立即复制,并同时分发给本地的消费者。
  • 应用程序可以在任何集群中创建生产者和消费者,可在 Pulsar 实例中的任意一个集群生产消息和消费消息。

  • 订阅不仅可本地集群订阅,启用复制订阅后在集群之间传输。 一旦启用订阅复制,你能够保持同步的订阅状态。

  • 主题能够异步的跨多个地域进行复制。 如果发生故障,消费者能够在其他的集群从这个失败的点重新消费消息。

如上图: T1主题在Cluster-ACluster-BCluster-C三个集群之间复制。这三个集群中的任何一个集群生成的所有消息都交付给其他集群中的所有订阅。在这种情况下,C1C2消费者接收P1P2P3生产者发布的所有消息。

5.3 主题跨域复制

命名空间级别创建了跨域复制,任何生产者或者消费者在这个命名空间创建的主题都会被复制到所有的集群中。

5.3.1 选择性复制

默认消息被复制到命名空间配置的所有集群 ,可以通过为消息指定需要复制的目标集群列表,来选择性地复制数据到目标集群

构建Message对象可以使用setReplicationClusters 方法来指定目标集群:

//限制复制集群
List<String> restrictReplicationTo = Arrays.asList(
        "us-west",
        "us-east"
);
//生产者
Producer producer = client.newProducer()
        .topic("some-topic")
        .create();
//发送消息
producer.newMessage()
        .value("my-payload".getBytes())
        .setReplicationClusters(restrictReplicationTo)
        .send();
复制代码

5.4 复制订阅

Pulsar 支持

  • 复制订阅关系,所以能够在不到1秒的时间内,在不同集群间保持订阅状态的同步。

  • 主题的上下文信息也能在跨多个物理地域间进行异步复制。

如果发生故障,消费者重启后能够在其他的集群从这个消费失败的点开始消费。

5.4.1 启用复制订阅

默认禁用复制订阅,创建使用者时可启动。

//创建消费者
Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic("my-topic")//主题
            .subscriptionName("my-subscription")//订阅名
            .replicateSubscriptionState(true)//启用复制订阅
            .subscribe();
复制代码

6、多租户

Pulsar 的多租户能力按照设计可满足下列需求:

  • 确保严苛的 SLA 可顺利满足
  • 保证不同租户之间的隔离
  • 针对资源利用率强制实施配额
  • 提供每租户和系统级的安全性
  • 确保低成本运维以及尽可能简单的管理

Apache Pulsar 通过下列方式满足了上述需求:

  • 通过为每个租户进行身份验证、授权和 ACL(访问控制列表)获得所需安全性。
  • 为每个租户强制实施存储配额
  • 以策略的方式定义所有隔离机制,策略可在运行过程中更改,借此降低运维成本并简化管理工作。

6.1 命名空间

命名空间是租户内部逻辑上的命名术语

  • 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。
    • Namespace使得程序可以以层级的方式创建和管理主题 Topic 为my-tenant/app1
    • 它的namespace是app1这个应用,对应的租户是 my-tenant,可以在namespace下创建任意数量的topic

Pulsar 从设计之初就可以支持多租户。因此主题可按照与多租户有关的两个资源进行组织:资产(Property)和名称空间(Namespace)。

  • 资产代表系统中的租户租户可以在自己的资产内配置多个名称空间,每个名称空间可包含任意数量个主题
  • 名称空间是 Pulsar 中每个租户最基本的管理单位
    • 可针对名称空间设置 ACL
    • 调整副本数目设置
    • 管理跨集群的消息数据多地域复制
    • 控制消息的过期
    • …..

**如下图: **一个 Pulsar 部署中包含了三个相互独立的租户

6.2 订阅

订阅是命名好的配置规则,指导消息如何投递给消费者。

  • Pulsar 中有四种订阅模式:
    • 独占
    • 共享
    • 灾备
    • key共享

如下图:Pulsar4种订阅模型关系

  • 消费者如使用传统的”发布-订阅消息“,可以为每个消费者指定一个特定的订阅名称, 这就是独占模式

  • 消费者实现”消息队列“的效果,则多个消费者会拥有相同的订阅名称(如灾备模式,共享模式key共享模式)。

6.2.1 独占模式

只允许单个使用者附加到订阅。如果多个使用者使用相同的订阅订阅订阅主题,则会发生错误。

在下图中,仅允许使用者A-0使用消息

6.2.2 Failover(灾备)

故障转移模式下,多个消费者可以附加到同一订阅,主消费者消费非分区主题或者分区主题中的每个分区的消息,当主消费者断开连接时,所有(未确认和后续的)消息都会传递给队列中的下一个消费者。

在下图中,Consumer-B-0是主Consumer,而Consumer-B-1是在Consumer-B-0断开连接时接收消息的下一个Consumer。

备注:

  • 分区主题
    • Broker按照消费者优先级消费者名称词汇表顺序对消费者进行排序。 然后将主题均匀的分配给优先级最高的消费者。
  • 非分区主题
    • broker 会根据消费者订阅非分区主题的顺序选择消费者。

6.2.3 Shared(共享)

共享循环模式下,多个使用者可以附加到同一订阅。消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

在下图中,Consumer-C-1和Consumer-C-2可以订阅主题,Consumer-C-3和其他也可以订阅。

注意:

  • 共享模式的限制,不保证消息顺序,不能在共享模式下使用累积确认。

6.2.4 Key_Shared(键共享)

键共享模式下,多个使用者可以附加到同一订阅。消息以分发方式在消费者之间传递,并且具有相同密钥或相同排序密钥的消息只传递给一个消费者。无论消息被重新传递多少次,它都被传递给同一个消费者。当用户连接或断开连接时,将导致服务用户更改消息的某些键。

注意:

  • 键共享模式的限制,需要为消息指定键 或 orderingKey。不能在键共享模式下使用累积确认。

可以在 broker.config 中禁用 Key_Shared 模式。

6.3 Pulsar 实现多租户的机制

**核心手段为:**限制,为不同的租户设置权利范围 ,例如:身份验证和授权实现安全隔离,通过流控制、限流调节和存储配额实现共享物理资源的隔离,以及 通过 放置策略实现物理资源的隔离

安全性

  • 授权是在资产层面上管理,权限是在命名空间的层面上管理,也就是在资产内部管理。

  • 身份验证和授权实现了租户间的隔离,租户无法访问自己无权访问的主题或执行无权限的操作。通过插接式(Pluggable)的身份验证和授权机制实现的。

隔离

  • 隔离满足安全方面的需求
  • 多租户应用程序还需要满足 SLA 的要求,为此 Pulsar 还针对健壮性和性能进行了隔离。
    • 软隔离
      • 例如:磁盘配额流控制限流调节
    • 硬隔离
      • 例如:将某租户隔离在提供服务的某个 Broker 子网内部,并使用 BookKeeper bookie 实现存储隔离。

7、层级存储

Pulsar的面向segment的架构允许主题积压增长非常大,有效地没有限制。然而,随着时间的推移,这可能会变得昂贵,有一个降低消耗的办法,那就是使用分层存储(Tiered Storage) 通过分层存储,在 backlog 中的旧消息可以从 BookKeeper 转移到更廉价的存储中,不出其他问题,客户端将仍然可以访问 backlog。默认情况下写入到 BookKeeper 的数据会复制三个物理机副本。 但是,一旦一个 segment 被BookKeep 封存(sealed),该 segment 就变得不可改变,此时就可以复制到长期存储中去了。 长期存储可以达到节省存储费用的目的。通过使用 Reed-Solomon error correction机制,还可减少物理备份数量。

Pulsar 当前支持 S3, Google Cloud Storage (GCS) 和文件系统来做长期存储long term store可以通过 REST API 或者命令行接口,将数据卸载(Offloading)到长期存储中。 用户传入他们想要保留在 BookKeeper 上的 topic 数据总量,Broker 将复制所有 backlog 数据到长期存储。 在 BookKeeper 上的原始数据将会被删除(延迟可配,默认4小时)。


你的点赞与关注 是 Solomon_肖哥弹架构持续的动力。

历史热点文章

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享