NATS Server本身不支持持久化,本例中使用的是NATS的JetStream
[TOC]
生产者
:one: 创建连接
这部分与原生的NATS并无二致
//DurablePub.java
Options options = new Options.Builder()
.server("nats://localhost:4222")
.reconnectWait(Duration.ofSeconds(1))
.build();
Connection nc = Nats.connect(options);
复制代码
:two: 配置和创建流
相比原生NATS Server,这里多了流的属性配置,需要稍加注意:
//DurablePub.java
private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.*";
复制代码
//DurablePub.java
//Configure and create Stream
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
.name(STREAM)
.subjects(SUBJECTS)
.retentionPolicy(RetentionPolicy.WorkQueue)
.replicas(1)
.discardPolicy(DiscardPolicy.Old)
.duplicateWindow(Duration.ofSeconds(30))
.build();
StreamInfo streamInfo = nc.jetStreamManagement().addStream(streamConfiguration);
复制代码
这里我配置的属性解释如下:
name
: 流的名称subjects
: 流中的subjectsretentionPolicy
: 消息的保留策略,默认为LimitsPolicy
,即通过各种限制条件来决定消息的保留replicas
: 消息在集群中的副本数量,只有集群才用得到,最大值为5discardPolicy
: 丢弃策略,默认为DiscardOld
,即消息存储到达上限时,将老的消息删除duplicateWindow
:对消息去重时使用的时间窗,建议尽可能小
全属性见下表(引自官网):
属性 | 描述 |
---|---|
MaxAge | 流中消息的最大年龄,以微秒为单位 |
MaxBytes | 流的最大存储容量,当合并的流大小超过这个大小时,旧的消息就会被删除 |
MaxMsgSize | 流能接收的最大消息尺寸 |
MaxMsgs | 流中能存储的最大消息条数,当数量超过这个值时,旧的消息就会被删除 |
MaxConsumers | 流最多能有多少个消费者,为-1 时无限制 |
Name | 流的名称,不能带有空格,tab或者. |
NoAck | 禁用流接收的消息的ACK |
Replicas | 消息在集群中的副本数量,最大值为5 |
Retention | 消息的保留策略,有LimitsPolicy (默认),InterestPolicy ,WorkQueuePolicy |
Discard | 当流触及限制后,DiscardNew 策略会拒绝新的消息,DiscardOld (默认)会删除旧的消息 |
Storage | 消息的存储方式,有file 和memory 两种 |
Subjects | 待消费的subjects 集合,支持通配符 |
Duplicates | 消息去重使用的时间窗 |
保留策略(Retention)
保留策略 | 描述 |
---|---|
LimitsPolicy |
对消息的数量、存储容量和年龄进行限制 |
WorkQueuePolicy |
直到被一个观察者消耗之前,消息都会保存 |
InterestPolicy |
只要有消费者处于活跃状态,消息就会被保存下来 |
上面提到的MaxMsgs
,MaxMsgs
,MaxAge
用来对消息进行限制,它们也是LimitsPolicy
策略仅能使用的属性。
在WorkQueuePolicy
策略下,一旦有消费者收到确认,消息就会被立即删除。而启用InterestPolicy
时,如果没有消费者在线就会立即将消息删除。
需要注意,在WorkQueuePolicy
和InterestPolicy
下,MaxMsgs
,MaxMsgs
,MaxAge
这三个属性依然是生效的,并且是作为前置条件存在。
确认模型(Acknowledgement Models)
消费者有三种确认模式:
模式 | 描述 |
---|---|
AckExplicit |
要求每个消息都进行手动确认,这也是拉模型下唯一支持的方式 |
AckAll |
这个模式下,如果你确认了第100 个消息,那么1 –99 个消息都会自动确认,适用于批处理任务,以减少确认带来的额外开销 |
AckNone |
不支持任何确认 |
:three: 发送消息
在配置和创建好流之后,就可以发送消息了
//Pub
for (int i = 0; i < 10; i++) {
nc.jetStream().publish("CONTRACT.EFFECT", generateData().getBytes(StandardCharsets.UTF_8));
}
复制代码
//Generate Data
private static String generateData() {
Contract contract = new Contract(UUID.randomUUID().toString(), "EFFECT", new Date());
GsonBuilder builder = new GsonBuilder();
Gson gson = builder.create();
return gson.toJson(contract);
}
private static class Contract {
public String contractId;
public String status;
public Date signDate;
public Contract(String contractId, String status, Date signDate) {
this.contractId = contractId;
this.status = status;
this.signDate = signDate;
}
}
复制代码
:four:最后关闭连接
nc.flush(Duration.ZERO);
nc.close();
复制代码
消费者
消费者的连接方面同生产者,这里就直接跳过了
:one: 配置和创建消费者:
private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.EFFECT";
private static final String CONSUMER = "consumer-1";
复制代码
//Configure and create consumer
ConsumerConfiguration configuration = ConsumerConfiguration.builder()
//Durable Consumer Name
.durable(CONSUMER)
.filterSubject(SUBJECTS)
.replayPolicy(ReplayPolicy.Instant)
//This requires every message to be specifically acknowledged, it's the only supported option for pull-based Consumers
.ackPolicy(AckPolicy.Explicit)
.ackWait(Duration.ofSeconds(30))
.deliverPolicy(DeliverPolicy.All)
.maxDeliver(20)
.rateLimit(100)
.maxAckPending(20000)
.build();
ConsumerInfo consumerInfo = nc.jetStreamManagement()
.addOrUpdateConsumer(STREAM, configuration);
复制代码
:two:创建拉模式的配置(推模式就是PushSubscribeOptions
,大体类似)
PullSubscribeOptions pullSubscribeOptions = PullSubscribeOptions
.builder()
.configuration(configuration)
.build();
复制代码
:three:创建订阅,并批量读取,打印消息内容并且手动确认
JetStreamSubscription jetStreamSubscription = js.subscribe(SUBJECTS, pullSubscribeOptions);
Iterator<Message> iter = jetStreamSubscription.iterate(10, Duration.ofMillis(1000));
while (iter.hasNext()) {
Message message = iter.next();
System.out.printf("Message Received : %s\n",
new String(message.getData(), StandardCharsets.UTF_8));
message.ack();
}
复制代码
消费者的配置属性列表如下:
属性 | 描述 |
---|---|
AckPolicy | 消息的确认方式,支持AckNone``, ``AckAll 和 AckExplicit |
AckWait | 在消息被重新投递之前,允许消息在多长时间之内处于未确认状态 |
DeliverPolicy | 消费者的起始位置策略,支持DeliverAll , DeliverLast , DeliverNew , DeliverByStartSequence 和DeliverByStartTime |
DeliverySubject | 传递观察到的消息的subject,如果没有设置的话,会创建一个拉模式的消费者 |
Durable | 消费者的名称 |
FilterSubject | 当从一个有许多subject或通配符的流中消费时,只选择一个特定的传入subject,支持通配符。 |
MaxDeliver | 特定消息的最大传递次数,以此来避免使你的系统陷入崩溃的毒药类型的消息(如陷入死循环) |
OptStartSeq | 当第一次从流中消费消息时,从集合中的这个特定的消息开始读取 |
ReplayPolicy | 消息的发送方式,支持ReplayInstant 和ReplayOriginal |
SampleFrequency | 确认的消息应占观测样本的百分比,区间为0-100 |
OptStartTime | 当第一次从流中消费消息时,从这个时间或之后的消息开始 |
RateLimit | 以bit/s为单位的消息传递速率 |
MaxAckPending | 未确认的最大消息数量,一旦达到此限制,将暂停发送消息 |
消费者起始位置(Consumer Starting Position)
配置消费者时可以决定从什么位置开始消费,NATS支持以下DeliverPolicy
:
策略 | 描述 |
---|---|
all |
传递所有可用的消息 |
last |
传递最近的消息,类似于tail -n 1 -f |
new |
只传递在订阅之后新到来的消息 |
by_start_time |
传递特定时间之后的消息,需要设置OptStartTime |
by_start_sequence |
从流中特定序号的消息开始,需要设置OptStartSeq |
结语
NATS的使用方面比RabbitMQ,Kafka这类消息中间件要简单的多,但是Java的客户端感觉用起来并不是那么友好,而且官方的Spring客户端也搁置了,这是一个比较麻烦的点,需要做一些额外的工作来简化使用。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END