前言
本章是Netty源码最后一章,学习Netty的出栈事件处理。案例仍然使用上一章的自定义协议。
- ChannelOutboundBuffer缓冲区。
- 用户代码使用channel.write,触发出栈write事件。
- 用户代码使用channel.flush,触发出栈flush事件。
- 如何使用Netty提供MessageToByteEncoder实现自定义编码。
一、ChannelOutboundBuffer
ChannelOutboundBuffer和Channel是一对一关系,一个客户端和服务端的通讯,持有一个出栈缓冲区,出栈缓冲区的生命周期与Channel是一样的。
public final class ChannelOutboundBuffer {
private final Channel channel;
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Unsafe unsafe;
protected abstract class AbstractUnsafe implements Unsafe {
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
}
}
复制代码
ChannelOutboundBuffer是一个缓冲区,用户代码write只会将字节写入这个缓冲区,需要执行flush将缓冲区中的数据写入底层Channel。
即使是writeAndFlush,其底层仍然是先执行write,然后执行flush。
// AbstractChannelHandlerContext#invokeWriteAndFlush
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
}
//...
}
复制代码
1、流控
ChannelOutboundBuffer出栈缓冲区可以控制用户写入速率,unwritable变量标识一个Channel是否可读。
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
private volatile int unwritable;
复制代码
Channel是否可读,通过Channel关联的Unsafe拿到ChannelOutboundBuffer实例,调用缓冲区的isWritable方法判断。
// Channel
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.isWritable();
}
复制代码
ChannelOutboundBuffer判断unwritable=0即为可写。
// ChannelOutboundBuffer
public boolean isWritable() {
return unwritable == 0;
}
复制代码
unwritable的最后一位用于Netty框架判断是否可写。
// ChannelOutboundBuffer
// Netty设置可写
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
// 1取反 与 oldValue,设置最低一位为0,代表可写
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
// ...
}
}
}
// Netty设置不可写
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
// oldValue 或 1,设置最低一位为1,代表不可写
final int newValue = oldValue | 1;
// 尝试更新状态为不可写,并触发fireChannelWritabilityChanged
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
//...
}
}
}
复制代码
另外,unwritable前31位用于用户自定义是否可写,给用户代码扩展使用。
// ChannelOutboundBuffer
private static int writabilityMask(int index) {
if (index < 1 || index > 31) {
throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
}
return 1 << index;
}
// 设置用户自定义可写标志位
private void setUserDefinedWritability(int index) {
final int mask = ~writabilityMask(index);
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & mask;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
//...
}
}
}
// 清除用户自定义可写标志位
private void clearUserDefinedWritability(int index) {
final int mask = writabilityMask(index);
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | mask;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
//...
}
}
}
// 获取index位置的标志位是否为0
public boolean getUserDefinedWritability(int index) {
return (unwritable & writabilityMask(index)) == 0;
}
复制代码
当待处理字节数totalPendingSize发生改变时,可能会触发unwritable标识的更新。
// ChannelOutboundBuffer
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
// 待处理字节数
private volatile long totalPendingSize;
// 增加待处理字节数
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 更新等待flush的数据大小
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 如果等待flush的数据大于高水位(默认64KB),设置标志位不可写,并fireChannelWritabilityChanged
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
// 减少待处理字节数
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
// 更新待处理字节数
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// 如果待处理字节数小于低水位线(默认32K),设置可写并可能触发fireChannelWritabilityChanged
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
复制代码
待处理字节数变化是否需要改变可写标志位,这取决于ChannelConfig配置的水位线,默认情况下超出64K待处理字节会设置为不可写,当待处理字节减少到32K以下会重新设置为可写。
public class DefaultChannelConfig implements ChannelConfig {
private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
}
public final class WriteBufferWaterMark {
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
public static final WriteBufferWaterMark DEFAULT =
new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);
}
复制代码
**写入超过64K数据,会发生什么?**写入1M数据,发现没有任何影响。
// Example
private static void write64K(Channel channel) throws IOException {
for (int i = 0; i < 1024; i++) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(new byte[1024]);
channel.write(buffer);
}
channel.flush();
channel.close();
}
复制代码
那么,如何使用ChannelOutboundBuffer提供的流控功能呢?
回顾ShardingProxy中的两段代码,当write数据发现不可写时,同步等待Channel可写,注意这里不是Netty的IO线程,是用户业务线程。
public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
@Override
public void writeQueryData(final ChannelHandlerContext context,
final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) throws SQLException {
while (queryCommandExecutor.next()) {
count++;
// channel不可写时,flush一次,然后阻塞等待channel可写
while (!context.channel().isWritable() && context.channel().isActive()) {
context.flush(); // 1
backendConnection.getResourceSynchronizer().doAwait(); // 2
}
// ...
// DatabaseCommunicationEngine缓存的MergedResult一行一行取出来写入Packet
DatabasePacket dataValue = queryCommandExecutor.getQueryData();
context.write(dataValue);
// ...
}
}
}
复制代码
IO线程发现通道可写时,唤醒阻塞等待的线程。
public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelWritabilityChanged(final ChannelHandlerContext context) {
if (context.channel().isWritable()) {
backendConnection.getResourceSynchronizer().doNotify(); // 3
}
}
}
复制代码
问题:假设执行context.flush在用户业务线程执行,线程为A,肯定会提交一个异步任务到EventLoop执行(代码1)。如果异步任务在EventLoop的线程B中立即执行完成,通过notify唤醒其他阻塞线程(代码3)。然后线程A才真正调用await阻塞等待(代码2),那么线程A将永远不会被唤醒。
所以ShardingProxy只会在出栈缓冲区数据量非常大的情况下会进行流量控制,配置高水位16MB,低水位8MB,理论上要刷写8MB数据到对端,线程B才会执行唤醒,此时线程A应该已经在等待同步了。
private void groupsNio(final ServerBootstrap bootstrap) {
workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
// ...
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
// ...
}
复制代码
所以ShardingProxy中的ResourceSynchronizer同步器,仅仅是个轻量级同步器,使用受场景限制。
2、数据结构
ChannelOutboundBuffer使用两个链表结构队列存储数据。
unflushedEntry代表未发送entry,write方法就是将数据写入entry,放入这个链表。
flushedEntry代表”已发送”entry,flush方法会将unflushedEntry链表中的元素都移动到flushedEntry,循环所有flushedEntry中的元素执行flush并移除。
public final class ChannelOutboundBuffer {
// 已发送entry,flush之前,会将unflushedEntry中的entry移动到flushedEntry
private Entry flushedEntry;
// 未发送entry,write方法会将ByteBuf加入到这里链表里
private Entry unflushedEntry;
// 最后一个entry,代表最后一个写入该缓冲区的bytebuf
private Entry tailEntry;
// 执行flush的entry数量
private int flushed;
}
复制代码
每个链表节点封装为一个Entry存放待发数据和链表指针。
static final class Entry {
// 对象回收器钩子,使用了对象池
private final Handle<Entry> handle;
// 指向下一个entry
Entry next;
// 可以认为是ByteBuf
Object msg;
// ... 省略其他
}
复制代码
当用户通过write方法写入数据时,WriteTask最终会将ByteBuf作为这里的入参msg传入,封装为entry加入unflushedEntry。
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 封装ByteBuf到Entry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
// 处理tailEntry
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// 增加待处理字节数
incrementPendingOutboundBytes(entry.pendingSize, false);
}
复制代码
用户调用flush方法,将缓冲区的数据刷入对端。首先会移动所有unflushedEntry到flushedEntry,可以看到这里仅仅通过移动一个指针,就将unflushed队列中的数据全量灌入了flushed队列。
// 移动所有unflushedEntry到flushedEntry
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
// 移动一个指针,将unflushedEntry里的数据全量放入flushedEntry
if (flushedEntry == null) {
flushedEntry = entry;
}
// 统计待flush数量
do {
flushed ++;
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
复制代码
然后将flushedEntry里的entry循环写入对端并移除。
private void removeEntry(Entry e) {
if (-- flushed == 0) {
// 如果发送数量归0
// 已发送链表清空
flushedEntry = null;
// 如果移除的节点是最后一个节点,清空未发送链表和tail节点
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
// 将已发送节点指针移动到下一个
flushedEntry = e.next;
}
}
复制代码
二、write
write事件处理。write仅仅是写入内存(ChannelOutboundBuffer),并没有立即发送给对端,需要通过flush或writeAndflush将内存中的数据发送给对端。
// Example
private static void justWrite(Channel channel) throws InterruptedException {
while (channel.isActive()) {
String json = "{\"data\": \"hello\"}";
ByteBuf magic = ByteBufAllocator.DEFAULT.buffer().writeShort(XHeader.MAGIC);
channel.write(magic);
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer().writeByte(XHeader.VERSION_1).writeByte(XHeader.REQUEST).writeInt(json.length()).writeBytes(json.getBytes());
channel.write(byteBuf);
Thread.sleep(1000);
channel.flush();
}
}
复制代码
write出栈事件和其他出栈事件的传播路径相似,但是往往是个异步的过程,因为业务线程和IO线程往往是两个线程。所以在执行Channel.write时,实际TailContex提交一个WriteTask到Channel对应EventLoop中,并唤醒Selector。
// TailContext(AbstractChannelHandlerContext)
private void write(Object msg, boolean flush, ChannelPromise promise) {
// ...
// 找到下一个Handler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// ... 暂时不会走这,因为业务线程和IO线程不同
} else {
// 业务线程执行write,提交WriteTask到channel对应EventLoop
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
复制代码
1、创建WriteTask
WriteTask绑定了下一个Handler(Context)、需要写入的msg、channelPromise、是否需要flush。因为WriteTask使用到了对象池,所以每次创建以后需要执行init方法给成员变量赋值。
static final class WriteTask implements Runnable {
// WriteTask对象池
private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
@Override
public WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
});
static WriteTask newInstance(AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise, boolean flush) {
WriteTask task = RECYCLER.get();
// 初始化WriteTask
init(task, ctx, msg, promise, flush);
return task;
}
private final Handle<WriteTask> handle;
private AbstractChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size; // sign bit controls flush
protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise, boolean flush) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {// 默认为true
// 计算写入对象大小,单位字节,默认 = msg大小 + 32
task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
ctx.pipeline.incrementPendingOutboundBytes(task.size);
} else {
task.size = 0;
}
// 通过位运算,标记这个task是否需要flush
// 如果或上MIN_VALUE,则task.size < 0
if (flush) {
task.size |= Integer.MIN_VALUE;
}
}
}
复制代码
WriteTask初始化时,预估写入msg对象的大小,调用DefaultChannelPipeline的incrementPendingOutboundBytes方法记录下来。这里调用channel对应的出栈缓冲区增加待处理的字节大小。
// DefaultChannelPipeline
protected void incrementPendingOutboundBytes(long size) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
复制代码
如果ChannelOutboundBuffer认为待处理数据过多,会修改通道不可写,并通过pipeline触发fireChannelWritabilityChanged。
// ChannelOutboundBuffer
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 更新等待处理的数据大小
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 如果等待处理的数据大于高水位(默认64KB),设置标志位不可写,并fireChannelWritabilityChanged
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
// 尝试更新通道不可写,并触发fireChannelWritabilityChanged
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
复制代码
2、提交WriteTask
TailContext会将创建好的WriteTask提交到Channel对应EventLoop,这里会唤醒阻塞在select方法上的Selector。
private static boolean safeExecute(EventExecutor executor, Runnable runnable,
ChannelPromise promise, Object msg, boolean lazy) {
// ...
executor.execute(runnable);
}
复制代码
回顾一下EventLoop提交任务,这里immediate入参为true,表示立即唤醒阻塞的Selector。
// SingleThreadEventExecutor
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// 将task加入任务队列
addTask(task);
if (!inEventLoop) {
// 如果当前线程的不是EventLoop绑定的线程,尝试开启这个EventLoop对应的线程
startThread();
if (isShutdown()) {
//...
}
}
// 唤醒EventLoop
// 重点看NioEventLoop的wakeup方法,会唤醒java.nio.channels.Selector#wakeup
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
// NioEventLoop
protected void wakeup(boolean inEventLoop) {
// 如果是另外一个线程唤醒,且之前并非唤醒状态,唤醒selector
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}
复制代码
3、执行WriteTask
由于前面提交WriteTask的时候,唤醒了可能阻塞在Selector.select方法上的EventLoop线程,所以EventLoop一定能在处理完IO事件以后,立即开始处理任务队列中的WriteTask(见第11章EventLoop)。
这里重点看WriteTask的run方法。
@Override
public void run() {
try {
// 减少待发送字节
decrementPendingOutboundBytes();
if (size >= 0) {
// 如果size>=0 表示是write,只需要写入buffer
ctx.invokeWrite(msg, promise);
} else {
// 如果size<0 表示需要writeAndFlush
ctx.invokeWriteAndFlush(msg, promise);
}
} finally {
// WriteTask执行完毕,回收到对象池
recycle();
}
}
复制代码
与创建WriteTask相反,decrementPendingOutboundBytes会减少ChannlOutboundBuffer中记录的待处理字节数。
// WriteTask
private void decrementPendingOutboundBytes() {
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
}
}
// DefaultChannelPipeline
protected void decrementPendingOutboundBytes(long size) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
复制代码
同样的,如果出栈缓冲区中待处理的字节数小于水位线,尝试设置通道可写,并可能会触发fireChannelWritabilityChanged。
// ChannelOutboundBuffer
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
// 更新待处理字节数
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// 如果待处理字节数小于低水位线(默认32K),设置可写并可能触发fireChannelWritabilityChanged
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
复制代码
更新完ChannelOutboundBuffer之后,WriteTask会调用TailContext的下一个Context处理写入对象。下一个Context是编码器XRequestEncoder。
/**
* * +---------------------------------------------------------------+
* * | 魔数 2byte | 协议版本号 1byte | 报文类型 1byte | 数据长度 4byte |
* * +---------------------------------------------------------------+
*/
public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
out.writeShort(XHeader.MAGIC);
out.writeByte(XHeader.VERSION_1);
out.writeByte(XHeader.REQUEST);
byte[] bytes = objectMapper.writeValueAsBytes(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
复制代码
经过XRequestEncoder之后,最后传播到HeadContext,HeadContext调用Unsafe执行write方法。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
复制代码
AbstractUnsafe的write方法,先执行AbstractChannel的filterOutboundMessage钩子方法,然后再次计算msg大小,最后将msg封装为一个Entry,加入ChannelOutboundBuffer中的链表(加入ChannelOutboundBuffer会导致出栈缓冲区的pendingSIze上升,导致通道不可写,见第一部分)。
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// ...
}
int size;
try {
// 一个钩子,在msg加入OutboundBuffer之前被执行
msg = filterOutboundMessage(msg);
// 再次计算msg占用空间大小,一般size = ByteBuf#readableBytes
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
// ...
return;
}
// 加入OutboundBuffer中的Entry链表
outboundBuffer.addMessage(msg, size, promise);
}
复制代码
对于NioSocketChannel,filterOutboundMessage主要是校验msg实例是ByteBuf对象或FileRegion对象,因为经过用户代码各种转换以后,msg无法保证是一个ByteBuf。
// AbstractNioByteChannel
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
// 如果没使用直接内存,尝试转换为DirectBuffer
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(...);
}
复制代码
三、flush
flush的流程与write类似,这里直接定位到Unsafe的flush方法。
// AbstractUnsafe
@Override
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 移动unflushEntry到flushedEntry
outboundBuffer.addFlush();
// 循环发送flushedEntry里的ByteBuf到对端
flush0();
}
复制代码
首先,调用出栈缓冲区的addFlush方法,将待发送Entry放入已发送Entry,上面讲过不贴代码了。flush0方法调用抽象AbstractChannel#doWrite方法。
// AbstractUnsafe
protected void flush0() {
if (inFlush0) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
if (!isActive()) {
// ...
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
// ...
} finally {
inFlush0 = false;
}
}
/**
* Flush the content of the given buffer to the remote peer.
*/
// AbstractChannel
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
复制代码
NioSocketChannel的doWrite会循环处理出栈缓冲区中的所有flushedEntry,除非超出16次(writeSpinCount)循环。
// NioSocketChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
// 默认一次flush只能循环16次
int writeSpinCount = config().getWriteSpinCount();
do {
// 当buffer中没有需要flush的Entry,结束
if (in.isEmpty()) {
clearOpWrite();
return;
}
// 获取批量写入对端的字节数上限
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 转换NettyByteBuf为JDKByteBuffer
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// ByteBuffer的数量
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
// in中存在FileRegion,不仅仅是bytebuf的情况
writeSpinCount -= doWrite0(in);
break;
case 1: {
// 单个ByteBuffer写入对端
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
long attemptedBytes = in.nioBufferSize();
// 将ByteBuffer批量写入对端
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
// 调整批量写入对端的字节数上限
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
// 根据写入的byte数据多少,移除flushedEntry
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
// 如果没处理完,设置关注WRITE事件
incompleteWrite(writeSpinCount < 0);
}
复制代码
doWrite循环内部主要做几个事情:
- ChannelOutboundBuffer.nioBuffers:转换NettyByteBuf为JDKByteBuffer,这样才可以调用底层JDK的Channel写数据。
- Channel.write:调用JDK的Channel的write方法,将JDKByteBuffer批量写入对端。
- adjustMaxBytesPerGatheringWrite:调整批量写入对端的字节数上限。
- ChannelOutboundBuffer.removeBytes:根据写入对端的字节数数量,移除flushedEntry。
- incompleteWrite:如果循环16次没处理完,设置SelectionKey关注WRITE事件。
adjustMaxBytesPerGatheringWrite如何调整每次写入对端的字节数上限?
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
复制代码
attempted表示循环内每次剩余待flush字节数,这个数字在经过几次循环后会慢慢变小;written表示实际写入对端的字节数;oldMaxBytesPerGatheringWrite表示当前flush对端字节数上限。
当attempted和written一致时,如果attempted * 2 > oldMaxBytesPerGatheringWrite,那么会放宽上限值为attempted * 2,这表明OS可能支持更多字节写入对端,动态调整上限以提高后续刷写的速率。
当attempted超过4KB,且written小于attempted/2,那么会收紧上限值为 attempted / 2,这表明OS不能满足目前的刷写速率,需要收紧刷写速率。
MaxBytesPerGatheringWrite的初始值是多少呢?
在Channel构造时,会同时构造NioSocketChannelConfig配置,此处通过calculateMaxBytesPerGatheringWrite方法计算得到默认的上限值。
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
calculateMaxBytesPerGatheringWrite();
}
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(newSendBufferSize);
}
}
}
复制代码
这个数字取决于JDKSocket的SO_SNDBUF,取决于操作系统,我本地等于128K,则MaxBytesPerGatheringWrite=128K << 1=256K。
// DefaultSocketChannelConfig
@Override
public int getSendBufferSize() {
try {
return javaSocket.getSendBufferSize();
} catch (SocketException e) {
throw new ChannelException(e);
}
}
// Socket
public synchronized int getSendBufferSize() throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
int result = 0;
Object o = getImpl().getOption(SocketOptions.SO_SNDBUF);
if (o instanceof Integer) {
result = ((Integer)o).intValue();
}
return result;
}
复制代码
如何配置呢?
配置ChannelOption.SO_SNDBUF,单位为byte字节,可以调用Socket的native方法设置SO_SNDBUF大小。
public class XClient {
public static void main(String[] args) throws Exception {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_SNDBUF, 1024 * 1024)
// ...
}
}
}
复制代码
在Channel初始化时,会调用DefaultSocketChannelConfig#setSendBufferSize设置SO_SNDBUF选项。
@Override
public SocketChannelConfig setSendBufferSize(int sendBufferSize) {
try {
javaSocket.setSendBufferSize(sendBufferSize);
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
复制代码
ChannelOutboundBuffer.removeBytes如何根据写入对端字节数,移除flushedEntry?
Channel.write方法是个批量写入对端的方法,所以这里要根据批量写入的字节数,反过来判断哪些entry需要从链表中移除。
逻辑也比较普通,循环至writtenBytes等于0,每次循环要么移除链表头节点entry,要么最后剩余的writtenBytes不满头节点entry的可读字节,则移动头节点entry的读指针。
// writtenBytes代表批量写入对端的字节数
public void removeBytes(long writtenBytes) {
for (;;) {
// 获取flushedEntry指向的entry
Object msg = current();
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
// 如果这个ByteBuf的可读字节,小于实际写入对端字节数,
// 表示这个ByteBuf已经写完了,可以移除对应entry
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
// 移除当前指向的flushedEntry,可能触发低水位,通道变为可写
remove();
} else { // readableBytes > writtenBytes
// 剩余的writtenBytes不满头节点的可读字节,移动头节点的读指针
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}
复制代码
四、encode
为了方便用户实现编码,Netty提供了编码骨架抽象类MessageToByteEncoder。
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
}
复制代码
继承MessageToByteEncoder后,用户只需要实现decode方法即可。实现起来没什么讲究,只需要将实体类按照自己的编码方式写入ByteBuf即可。
如自定义XRequestEncoder,将XRequest写入ByteBuf。
public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
out.writeShort(XHeader.MAGIC);
out.writeByte(XHeader.VERSION_1);
out.writeByte(XHeader.REQUEST);
byte[] bytes = objectMapper.writeValueAsBytes(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
复制代码
如自定义XResponseEncoder,将XResponse写入ByteBuf。
public class XResponseEncoder extends MessageToByteEncoder<XResponse> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void encode(ChannelHandlerContext ctx, XResponse msg, ByteBuf out) throws Exception {
out.writeShort(XHeader.MAGIC);
out.writeByte(XHeader.VERSION_1);
out.writeByte(XHeader.RESPONSE);
byte[] bytes = objectMapper.writeValueAsBytes(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
复制代码
1、泛型I的作用
MessageToByteEncoder的泛型I,代表了需要进行编码的实体类,MessageToByteEncoder支持在一个Pipeline中存在多个编码器,只需要write方法传入的实体类与泛型I匹配。如果channel.write传入的是XRequest,那么将走XRequestEncoder,如果channel.write传入的是XResponse,那么将走XResponseEncdoer。
MessageToByteEncoder构造时,会创建一个TypeParameterMatcher。
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean preferDirect;
protected MessageToByteEncoder(boolean preferDirect) {
// 传入 当前实例,泛型类,泛型标记 得到一个TypeParameterMatcher
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}
}
复制代码
TypeParameterMatcher是一个抽象接口,只有一个match方法用于判断入参是否匹配,实现类是ReflectiveMatcher,通过instanceof的方式判断入参msg是否是泛型I对应Class实例。
private static final class ReflectiveMatcher extends TypeParameterMatcher {
private final Class<?> type;
ReflectiveMatcher(Class<?> type) {
this.type = type;
}
@Override
public boolean match(Object msg) {
return type.isInstance(msg);
}
}
复制代码
MessageToByteEncoder的write方法使用责任链模式,判断msg instanceof I,如果入参msg是I实例才会执行编码,反之会走下一个Handler(可能是另外一个Encoder)。
// 使用匹配器判断msg是否能够处理
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 如果msg是泛型I对应类的实例,当前编码器才会处理
if (acceptOutboundMessage(msg)) {
I cast = (I) msg;
// ...
} else {
// 否则传递给下一个Handler
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
复制代码
2、调用encode
调用用户encode方法逻辑也比较简单。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
// 安全强转
I cast = (I) msg;
// 分配ByteBuf,子类可以重写
buf = allocateBuffer(ctx, cast, preferDirect);
// 用户encode
encode(ctx, cast, buf);
if (buf.isReadable()) {
// 如果用户向Bytebuf写入了数据,则继续向后传播,
// 最后会传播到TailContext写入出栈缓冲区
ctx.write(buf, promise);
} else {
// 用户没有向ByteBuf写入数据,释放资源,向后传播空Buffer
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
}
}
}
复制代码
给用户的扩展点是allocateBuffer方法,允许用户根据实际情况创建合适的ByteBuf,用于写入数据,这里默认preferDirect=true。
/**
* Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
* Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
*/
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg,
boolean preferDirect) throws Exception {
if (preferDirect) {
return ctx.alloc().ioBuffer();
} else {
return ctx.alloc().heapBuffer();
}
}
复制代码
总结
-
ChannelOutboundBuffer缓冲区:
用户write是写入这个缓冲区;用户flush是将这个缓冲区的数据写入对端;用户writeAndFlush是组合了上述两个操作。
ChannelOutboundBuffer同时具备流控功能,根据用户配置的WriteBufferWaterMark决定高低水位线,默认低水位线是32k,高水位线是64k。
框架内部会调用incrementPendingOutboundBytes增加当前buffer中的pendingSize,如果超出高水位线,会设置Channel不可写,触发channelWritabilityChanged入栈事件;
框架内部会调用decrementPendingOutboundBytes减少当前buffer中的pendingSize,如果低于低水位线,会设置Channel可写,触发channelWritabilityChanged入栈事件。
用户代码可以处理channelWritabilityChanged事件,控制write速率,参照ShardingProxy的MySQLCommandExecuteEngine实现。
ChannelOutboundBuffer使用两个链表结构队列存储数据。
unflushedEntry代表未发送entry,write方法就是将数据写入entry,放入这个链表。
flushedEntry代表”已发送”entry,flush方法会将unflushedEntry链表中的元素都移动到flushedEntry,循环所有flushedEntry中的元素执行flush并移除。
-
用户代码使用channel.write,触发出栈write事件。
TailContext会提交一个WriteTask到Channel对应的EventLoop处理,而不是直接写入对端。
创建WriteTask,会增加ChannelOutboundBuffer的pendingSize;
提交WriteTask,会减少ChannelOutboundBuffer的pendingSize。
执行WriteTask,首先可能会经过用户定义的编码器,最后进入Unsafe(AbstractUnsafe)将Bytebuf封装为Entry,放入ChannelOutboundBuffer缓冲区,在放入缓冲区的时候,也会增加ChannelOutboundBuffer的pendingSize。
对应ChannelOutboundBuffer的pendingSize增加和减少会触发channelWritabilityChanged事件。
-
用户代码使用channel.flush,触发出栈flush事件。
flush事件最终会交由Unsafe处理,将缓存在ChannelOutboundBuffer中的Entry里的ByteBuf通过Channel写入对端。
Entry里的ByteBuf写完后,会被从链表结构中移除,导致ChannelOutboundBuffer的pendingSize减少,如果低于低水位线,Channel变为可写,会触发channelWritabilityChanged事件。
-
继承Netty提供MessageToByteEncoder编码器抽象类,可以实现自定义编码。
MessageToByteEncoder里的泛型,代表当前编码器可以处理的实体类。
public class XRequestEncoder extends MessageToByteEncoder<XRequest> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception { out.writeShort(XHeader.MAGIC); out.writeByte(XHeader.VERSION_1); out.writeByte(XHeader.REQUEST); byte[] bytes = objectMapper.writeValueAsBytes(msg); out.writeInt(bytes.length); out.writeBytes(bytes); } } 复制代码
一个ChannelPipeline里,可以有多个编码器,每个编码器负责处理不同Class的编码工作。
如果编码器负责处理对应Class的编码工作,但是encode方法不写入ByteBuf out,对应msg会被丢弃。