使用NATS Java客户端实现流数据持久化Pub/Sub

NATS Server本身不支持持久化,本例中使用的是NATS的JetStream

[TOC]

生产者

:one: 创建连接

这部分与原生的NATS并无二致

//DurablePub.java
Options options = new Options.Builder()
    .server("nats://localhost:4222")
    .reconnectWait(Duration.ofSeconds(1))
    .build();
Connection nc = Nats.connect(options);
复制代码

:two: 配置和创建流

相比原生NATS Server,这里多了流的属性配置,需要稍加注意:

//DurablePub.java
private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.*";
复制代码
//DurablePub.java
//Configure and create Stream
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
    .name(STREAM)
    .subjects(SUBJECTS)
    .retentionPolicy(RetentionPolicy.WorkQueue)
    .replicas(1)
    .discardPolicy(DiscardPolicy.Old)
    .duplicateWindow(Duration.ofSeconds(30))
    .build();

StreamInfo streamInfo = nc.jetStreamManagement().addStream(streamConfiguration);
复制代码

这里我配置的属性解释如下:

  • name: 流的名称
  • subjects: 流中的subjects
  • retentionPolicy: 消息的保留策略,默认为LimitsPolicy,即通过各种限制条件来决定消息的保留
  • replicas: 消息在集群中的副本数量,只有集群才用得到,最大值为5
  • discardPolicy: 丢弃策略,默认为DiscardOld,即消息存储到达上限时,将老的消息删除
  • duplicateWindow:对消息去重时使用的时间窗,建议尽可能小

全属性见下表(引自官网):

属性 描述
MaxAge 流中消息的最大年龄,以微秒为单位
MaxBytes 流的最大存储容量,当合并的流大小超过这个大小时,旧的消息就会被删除
MaxMsgSize 流能接收的最大消息尺寸
MaxMsgs 流中能存储的最大消息条数,当数量超过这个值时,旧的消息就会被删除
MaxConsumers 流最多能有多少个消费者,为-1时无限制
Name 流的名称,不能带有空格,tab或者.
NoAck 禁用流接收的消息的ACK
Replicas 消息在集群中的副本数量,最大值为5
Retention 消息的保留策略,有LimitsPolicy(默认),InterestPolicyWorkQueuePolicy
Discard 当流触及限制后,DiscardNew策略会拒绝新的消息,DiscardOld(默认)会删除旧的消息
Storage 消息的存储方式,有filememory两种
Subjects 待消费的subjects集合,支持通配符
Duplicates 消息去重使用的时间窗

保留策略(Retention)

保留策略 描述
LimitsPolicy 对消息的数量、存储容量和年龄进行限制
WorkQueuePolicy 直到被一个观察者消耗之前,消息都会保存
InterestPolicy 只要有消费者处于活跃状态,消息就会被保存下来

上面提到的MaxMsgsMaxMsgsMaxAge用来对消息进行限制,它们也是LimitsPolicy策略仅能使用的属性。

WorkQueuePolicy策略下,一旦有消费者收到确认,消息就会被立即删除。而启用InterestPolicy时,如果没有消费者在线就会立即将消息删除。

需要注意,在WorkQueuePolicyInterestPolicy下,MaxMsgsMaxMsgsMaxAge这三个属性依然是生效的,并且是作为前置条件存在。

确认模型(Acknowledgement Models)

消费者有三种确认模式:

模式 描述
AckExplicit 要求每个消息都进行手动确认,这也是拉模型下唯一支持的方式
AckAll 这个模式下,如果你确认了第100个消息,那么199个消息都会自动确认,适用于批处理任务,以减少确认带来的额外开销
AckNone 不支持任何确认

:three: 发送消息

在配置和创建好流之后,就可以发送消息了

//Pub
for (int i = 0; i < 10; i++) {
  nc.jetStream().publish("CONTRACT.EFFECT", generateData().getBytes(StandardCharsets.UTF_8));
}
复制代码
//Generate Data
private static String generateData() {
  Contract contract = new Contract(UUID.randomUUID().toString(), "EFFECT", new Date());
  GsonBuilder builder = new GsonBuilder();
  Gson gson = builder.create();
  return gson.toJson(contract);
}

 private static class Contract {

    public String contractId;

    public String status;

    public Date signDate;

    public Contract(String contractId, String status, Date signDate) {
      this.contractId = contractId;
      this.status = status;
      this.signDate = signDate;
    }
  }
复制代码

:four:最后关闭连接

nc.flush(Duration.ZERO);
nc.close();
复制代码

消费者

消费者的连接方面同生产者,这里就直接跳过了

:one: 配置和创建消费者:

private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.EFFECT";
private static final String CONSUMER = "consumer-1";
复制代码
//Configure and create consumer
ConsumerConfiguration configuration = ConsumerConfiguration.builder()
    //Durable Consumer Name
    .durable(CONSUMER)
    .filterSubject(SUBJECTS)
    .replayPolicy(ReplayPolicy.Instant)
    //This requires every message to be specifically acknowledged, it's the only supported option for pull-based Consumers
    .ackPolicy(AckPolicy.Explicit)
    .ackWait(Duration.ofSeconds(30))
    .deliverPolicy(DeliverPolicy.All)
    .maxDeliver(20)
    .rateLimit(100)
    .maxAckPending(20000)
    .build();

ConsumerInfo consumerInfo = nc.jetStreamManagement()
    .addOrUpdateConsumer(STREAM, configuration);
复制代码

:two:创建拉模式的配置(推模式就是PushSubscribeOptions,大体类似)

PullSubscribeOptions pullSubscribeOptions = PullSubscribeOptions
          .builder()
          .configuration(configuration)
          .build();
复制代码

:three:创建订阅,并批量读取,打印消息内容并且手动确认

JetStreamSubscription jetStreamSubscription = js.subscribe(SUBJECTS, pullSubscribeOptions);

Iterator<Message> iter = jetStreamSubscription.iterate(10, Duration.ofMillis(1000));
while (iter.hasNext()) {
  Message message = iter.next();
  System.out.printf("Message Received : %s\n",
      new String(message.getData(), StandardCharsets.UTF_8));
  message.ack();
}
复制代码

消费者的配置属性列表如下:

属性 描述
AckPolicy 消息的确认方式,支持AckNone``, ``AckAllAckExplicit
AckWait 在消息被重新投递之前,允许消息在多长时间之内处于未确认状态
DeliverPolicy 消费者的起始位置策略,支持DeliverAll, DeliverLast, DeliverNew, DeliverByStartSequenceDeliverByStartTime
DeliverySubject 传递观察到的消息的subject,如果没有设置的话,会创建一个拉模式的消费者
Durable 消费者的名称
FilterSubject 当从一个有许多subject或通配符的流中消费时,只选择一个特定的传入subject,支持通配符。
MaxDeliver 特定消息的最大传递次数,以此来避免使你的系统陷入崩溃的毒药类型的消息(如陷入死循环)
OptStartSeq 当第一次从流中消费消息时,从集合中的这个特定的消息开始读取
ReplayPolicy 消息的发送方式,支持ReplayInstantReplayOriginal
SampleFrequency 确认的消息应占观测样本的百分比,区间为0-100
OptStartTime 当第一次从流中消费消息时,从这个时间或之后的消息开始
RateLimit 以bit/s为单位的消息传递速率
MaxAckPending 未确认的最大消息数量,一旦达到此限制,将暂停发送消息

消费者起始位置(Consumer Starting Position)

配置消费者时可以决定从什么位置开始消费,NATS支持以下DeliverPolicy:

策略 描述
all 传递所有可用的消息
last 传递最近的消息,类似于tail -n 1 -f
new 只传递在订阅之后新到来的消息
by_start_time 传递特定时间之后的消息,需要设置OptStartTime
by_start_sequence 从流中特定序号的消息开始,需要设置OptStartSeq

结语

NATS的使用方面比RabbitMQ,Kafka这类消息中间件要简单的多,但是Java的客户端感觉用起来并不是那么友好,而且官方的Spring客户端也搁置了,这是一个比较麻烦的点,需要做一些额外的工作来简化使用。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享