Netty源码(十三)出栈事件和编码

前言

本章是Netty源码最后一章,学习Netty的出栈事件处理。案例仍然使用上一章的自定义协议。

  • ChannelOutboundBuffer缓冲区。
  • 用户代码使用channel.write,触发出栈write事件。
  • 用户代码使用channel.flush,触发出栈flush事件。
  • 如何使用Netty提供MessageToByteEncoder实现自定义编码。

一、ChannelOutboundBuffer

ChannelOutboundBuffer1.png

ChannelOutboundBuffer和Channel是一对一关系,一个客户端和服务端的通讯,持有一个出栈缓冲区,出栈缓冲区的生命周期与Channel是一样的。

public final class ChannelOutboundBuffer {
    private final Channel channel;
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final Unsafe unsafe;
    protected abstract class AbstractUnsafe implements Unsafe {
        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
    }
}
复制代码

ChannelOutboundBuffer是一个缓冲区,用户代码write只会将字节写入这个缓冲区,需要执行flush将缓冲区中的数据写入底层Channel。

ChannelOutboundBuffer2.png

即使是writeAndFlush,其底层仍然是先执行write,然后执行flush

// AbstractChannelHandlerContext#invokeWriteAndFlush
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    }
    //...
}
复制代码

1、流控

ChannelOutboundBuffer出栈缓冲区可以控制用户写入速率,unwritable变量标识一个Channel是否可读

private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

private volatile int unwritable; 
复制代码

Channel是否可读,通过Channel关联的Unsafe拿到ChannelOutboundBuffer实例,调用缓冲区的isWritable方法判断。

// Channel
@Override
public boolean isWritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    return buf != null && buf.isWritable();
}
复制代码

ChannelOutboundBuffer判断unwritable=0即为可写

// ChannelOutboundBuffer
public boolean isWritable() {
    return unwritable == 0;
}
复制代码

unwritable的最后一位用于Netty框架判断是否可写

// ChannelOutboundBuffer
// Netty设置可写
private void setWritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        // 1取反 与 oldValue,设置最低一位为0,代表可写
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            // ...
        }
    }
}
// Netty设置不可写
private void setUnwritable(boolean invokeLater) {
  for (;;) {
    final int oldValue = unwritable;
    // oldValue 或 1,设置最低一位为1,代表不可写
    final int newValue = oldValue | 1;
    // 尝试更新状态为不可写,并触发fireChannelWritabilityChanged
    if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
      //...
    }
  }
}
复制代码

另外,unwritable前31位用于用户自定义是否可写,给用户代码扩展使用。

// ChannelOutboundBuffer
private static int writabilityMask(int index) {
  if (index < 1 || index > 31) {
    throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
  }
  return 1 << index;
}
// 设置用户自定义可写标志位
private void setUserDefinedWritability(int index) {
    final int mask = ~writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & mask;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            //...
        }
    }
}
// 清除用户自定义可写标志位
private void clearUserDefinedWritability(int index) {
    final int mask = writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | mask;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            //...
        }
    }
}
// 获取index位置的标志位是否为0
public boolean getUserDefinedWritability(int index) {
  return (unwritable & writabilityMask(index)) == 0;
}
复制代码

待处理字节数totalPendingSize发生改变时,可能会触发unwritable标识的更新

// ChannelOutboundBuffer
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
// 待处理字节数
private volatile long totalPendingSize;

// 增加待处理字节数
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // 更新等待flush的数据大小
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // 如果等待flush的数据大于高水位(默认64KB),设置标志位不可写,并fireChannelWritabilityChanged
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

// 减少待处理字节数
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
      return;
    }
    // 更新待处理字节数
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    // 如果待处理字节数小于低水位线(默认32K),设置可写并可能触发fireChannelWritabilityChanged
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
      setWritable(invokeLater);
    }
}
复制代码

待处理字节数变化是否需要改变可写标志位,这取决于ChannelConfig配置的水位线,默认情况下超出64K待处理字节会设置为不可写,当待处理字节减少到32K以下会重新设置为可写。

public class DefaultChannelConfig implements ChannelConfig {
    private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
}

public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

    public static final WriteBufferWaterMark DEFAULT =
            new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);
}
复制代码

**写入超过64K数据,会发生什么?**写入1M数据,发现没有任何影响。

// Example
private static void write64K(Channel channel) throws IOException {
    for (int i = 0; i < 1024; i++) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        buffer.writeBytes(new byte[1024]);
        channel.write(buffer);
    }
    channel.flush();
    channel.close();
}
复制代码

那么,如何使用ChannelOutboundBuffer提供的流控功能呢?

回顾ShardingProxy中的两段代码,当write数据发现不可写时,同步等待Channel可写,注意这里不是Netty的IO线程,是用户业务线程。

public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
    
    @Override
    public void writeQueryData(final ChannelHandlerContext context,
                               final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) throws SQLException {
        while (queryCommandExecutor.next()) {
            count++;
            // channel不可写时,flush一次,然后阻塞等待channel可写
            while (!context.channel().isWritable() && context.channel().isActive()) {
                context.flush(); // 1 
                backendConnection.getResourceSynchronizer().doAwait(); // 2
            }
            // ...
            // DatabaseCommunicationEngine缓存的MergedResult一行一行取出来写入Packet
            DatabasePacket dataValue = queryCommandExecutor.getQueryData();
            context.write(dataValue);
            // ...
        }
    }
}
复制代码

IO线程发现通道可写时,唤醒阻塞等待的线程。

public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelWritabilityChanged(final ChannelHandlerContext context) {
        if (context.channel().isWritable()) {
            backendConnection.getResourceSynchronizer().doNotify(); // 3
        }
    }
}
复制代码

问题:假设执行context.flush在用户业务线程执行,线程为A,肯定会提交一个异步任务到EventLoop执行(代码1)。如果异步任务在EventLoop的线程B中立即执行完成,通过notify唤醒其他阻塞线程(代码3)。然后线程A才真正调用await阻塞等待(代码2),那么线程A将永远不会被唤醒

ShardingProxy.png

所以ShardingProxy只会在出栈缓冲区数据量非常大的情况下会进行流量控制,配置高水位16MB,低水位8MB,理论上要刷写8MB数据到对端,线程B才会执行唤醒,此时线程A应该已经在等待同步了。

private void groupsNio(final ServerBootstrap bootstrap) {
    workerGroup = new NioEventLoopGroup();
    bootstrap.group(bossGroup, workerGroup)
             // ...
            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
             // ...
}
复制代码

所以ShardingProxy中的ResourceSynchronizer同步器,仅仅是个轻量级同步器,使用受场景限制。

2、数据结构

ChannelOutboundBuffer使用两个链表结构队列存储数据。

unflushedEntry代表未发送entry,write方法就是将数据写入entry,放入这个链表。

flushedEntry代表”已发送”entry,flush方法会将unflushedEntry链表中的元素都移动到flushedEntry,循环所有flushedEntry中的元素执行flush并移除。

public final class ChannelOutboundBuffer {
    // 已发送entry,flush之前,会将unflushedEntry中的entry移动到flushedEntry
    private Entry flushedEntry;
    // 未发送entry,write方法会将ByteBuf加入到这里链表里
    private Entry unflushedEntry;
    // 最后一个entry,代表最后一个写入该缓冲区的bytebuf
    private Entry tailEntry;
    // 执行flush的entry数量
    private int flushed;
}
复制代码

每个链表节点封装为一个Entry存放待发数据和链表指针

static final class Entry {
		// 对象回收器钩子,使用了对象池
    private final Handle<Entry> handle;
    // 指向下一个entry
    Entry next;
    // 可以认为是ByteBuf
    Object msg;
    // ... 省略其他
}
复制代码

addMessage.png

当用户通过write方法写入数据时,WriteTask最终会将ByteBuf作为这里的入参msg传入,封装为entry加入unflushedEntry。

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // 封装ByteBuf到Entry
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    // 处理tailEntry
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // 增加待处理字节数
    incrementPendingOutboundBytes(entry.pendingSize, false);
}
复制代码

addFlush.png

用户调用flush方法,将缓冲区的数据刷入对端。首先会移动所有unflushedEntry到flushedEntry,可以看到这里仅仅通过移动一个指针,就将unflushed队列中的数据全量灌入了flushed队列。

// 移动所有unflushedEntry到flushedEntry
public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        // 移动一个指针,将unflushedEntry里的数据全量放入flushedEntry
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        // 统计待flush数量
        do {
            flushed ++;
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}
复制代码

flushEntry.png

然后将flushedEntry里的entry循环写入对端并移除。

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        // 如果发送数量归0
        // 已发送链表清空
        flushedEntry = null;
        // 如果移除的节点是最后一个节点,清空未发送链表和tail节点
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        // 将已发送节点指针移动到下一个
        flushedEntry = e.next;
    }
}
复制代码

二、write

write事件处理。write仅仅是写入内存(ChannelOutboundBuffer),并没有立即发送给对端,需要通过flush或writeAndflush将内存中的数据发送给对端

// Example
private static void justWrite(Channel channel) throws InterruptedException {
    while (channel.isActive()) {
        String json = "{\"data\": \"hello\"}";
        ByteBuf magic = ByteBufAllocator.DEFAULT.buffer().writeShort(XHeader.MAGIC);
        channel.write(magic);
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer().writeByte(XHeader.VERSION_1).writeByte(XHeader.REQUEST).writeInt(json.length()).writeBytes(json.getBytes());
        channel.write(byteBuf);
        Thread.sleep(1000);
        channel.flush();
    }
}
复制代码

write.png

write出栈事件和其他出栈事件的传播路径相似,但是往往是个异步的过程,因为业务线程和IO线程往往是两个线程。所以在执行Channel.write时,实际TailContex提交一个WriteTask到Channel对应EventLoop中,并唤醒Selector

// TailContext(AbstractChannelHandlerContext)
private void write(Object msg, boolean flush, ChannelPromise promise) {
    // ...
    // 找到下一个Handler
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // ... 暂时不会走这,因为业务线程和IO线程不同
    } else {
        // 业务线程执行write,提交WriteTask到channel对应EventLoop
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}
复制代码

1、创建WriteTask

WriteTask绑定了下一个Handler(Context)、需要写入的msg、channelPromise、是否需要flush。因为WriteTask使用到了对象池,所以每次创建以后需要执行init方法给成员变量赋值。

static final class WriteTask implements Runnable {
    // WriteTask对象池
    private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
        @Override
        public WriteTask newObject(Handle<WriteTask> handle) {
            return new WriteTask(handle);
        }
    });

    static WriteTask newInstance(AbstractChannelHandlerContext ctx,
            Object msg, ChannelPromise promise, boolean flush) {
        WriteTask task = RECYCLER.get();
        // 初始化WriteTask
        init(task, ctx, msg, promise, flush);
        return task;
    }
  
   	private final Handle<WriteTask> handle;
    private AbstractChannelHandlerContext ctx;
  	private Object msg;
  	private ChannelPromise promise;
  	private int size; // sign bit controls flush
    protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
                                   Object msg, ChannelPromise promise, boolean flush) {
        task.ctx = ctx;
        task.msg = msg;
        task.promise = promise;

        if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {// 默认为true
          // 计算写入对象大小,单位字节,默认 = msg大小 + 32
          task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
          ctx.pipeline.incrementPendingOutboundBytes(task.size);
        } else {
          task.size = 0;
        }
        // 通过位运算,标记这个task是否需要flush
        // 如果或上MIN_VALUE,则task.size < 0
        if (flush) {
          task.size |= Integer.MIN_VALUE;
        }
    }
}
复制代码

WriteTask初始化时,预估写入msg对象的大小,调用DefaultChannelPipeline的incrementPendingOutboundBytes方法记录下来。这里调用channel对应的出栈缓冲区增加待处理的字节大小

// DefaultChannelPipeline
protected void incrementPendingOutboundBytes(long size) {
    ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
    if (buffer != null) {
        buffer.incrementPendingOutboundBytes(size);
    }
}
复制代码

如果ChannelOutboundBuffer认为待处理数据过多,会修改通道不可写,并通过pipeline触发fireChannelWritabilityChanged

// ChannelOutboundBuffer
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // 更新等待处理的数据大小
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // 如果等待处理的数据大于高水位(默认64KB),设置标志位不可写,并fireChannelWritabilityChanged
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

private void setUnwritable(boolean invokeLater) {
    for (;;) {
      final int oldValue = unwritable;
      final int newValue = oldValue | 1;
      // 尝试更新通道不可写,并触发fireChannelWritabilityChanged
      if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
        if (oldValue == 0 && newValue != 0) {
          fireChannelWritabilityChanged(invokeLater);
        }
        break;
      }
    }
}
复制代码

2、提交WriteTask

TailContext会将创建好的WriteTask提交到Channel对应EventLoop,这里会唤醒阻塞在select方法上的Selector。

private static boolean safeExecute(EventExecutor executor, Runnable runnable,
        ChannelPromise promise, Object msg, boolean lazy) {
    // ...
    executor.execute(runnable);
}
复制代码

回顾一下EventLoop提交任务,这里immediate入参为true,表示立即唤醒阻塞的Selector。

// SingleThreadEventExecutor
private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // 将task加入任务队列
    addTask(task);
    if (!inEventLoop) {
        // 如果当前线程的不是EventLoop绑定的线程,尝试开启这个EventLoop对应的线程
        startThread();
        if (isShutdown()) {
					 //...
        }
    }
    // 唤醒EventLoop
    // 重点看NioEventLoop的wakeup方法,会唤醒java.nio.channels.Selector#wakeup
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}
// NioEventLoop
protected void wakeup(boolean inEventLoop) {
  // 如果是另外一个线程唤醒,且之前并非唤醒状态,唤醒selector
  if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
    selector.wakeup();
  }
}
复制代码

3、执行WriteTask

由于前面提交WriteTask的时候,唤醒了可能阻塞在Selector.select方法上的EventLoop线程,所以EventLoop一定能在处理完IO事件以后,立即开始处理任务队列中的WriteTask(见第11章EventLoop)。

这里重点看WriteTask的run方法。

@Override
public void run() {
    try {
        // 减少待发送字节
        decrementPendingOutboundBytes();
        if (size >= 0) {
            // 如果size>=0 表示是write,只需要写入buffer
            ctx.invokeWrite(msg, promise);
        } else {
            // 如果size<0 表示需要writeAndFlush
            ctx.invokeWriteAndFlush(msg, promise);
        }
    } finally {
        // WriteTask执行完毕,回收到对象池
        recycle();
    }
}
复制代码

与创建WriteTask相反,decrementPendingOutboundBytes会减少ChannlOutboundBuffer中记录的待处理字节数。

// WriteTask
private void decrementPendingOutboundBytes() {
    if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
        ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
    }
}
// DefaultChannelPipeline
protected void decrementPendingOutboundBytes(long size) {
    ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
    if (buffer != null) {
      buffer.decrementPendingOutboundBytes(size);
    }
}
复制代码

同样的,如果出栈缓冲区中待处理的字节数小于水位线,尝试设置通道可写,并可能会触发fireChannelWritabilityChanged

// ChannelOutboundBuffer
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    // 更新待处理字节数
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    // 如果待处理字节数小于低水位线(默认32K),设置可写并可能触发fireChannelWritabilityChanged
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        setWritable(invokeLater);
    }
}
private void setWritable(boolean invokeLater) {
    for (;;) {
      final int oldValue = unwritable;
      final int newValue = oldValue & ~1;
      if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
        if (oldValue != 0 && newValue == 0) {
          fireChannelWritabilityChanged(invokeLater);
        }
        break;
      }
    }
}
复制代码

更新完ChannelOutboundBuffer之后,WriteTask会调用TailContext的下一个Context处理写入对象。下一个Context是编码器XRequestEncoder。

/**
 *  *     +---------------------------------------------------------------+
 *  *     | 魔数 2byte | 协议版本号 1byte |  报文类型 1byte | 数据长度 4byte  |
 *  *     +---------------------------------------------------------------+
 */
public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.REQUEST);
        byte[] bytes = objectMapper.writeValueAsBytes(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}
复制代码

经过XRequestEncoder之后,最后传播到HeadContext,HeadContext调用Unsafe执行write方法。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}
复制代码

AbstractUnsafe的write方法,先执行AbstractChannel的filterOutboundMessage钩子方法,然后再次计算msg大小,最后将msg封装为一个Entry,加入ChannelOutboundBuffer中的链表(加入ChannelOutboundBuffer会导致出栈缓冲区的pendingSIze上升,导致通道不可写,见第一部分)。

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
      // ...
    }
    int size;
    try {
        // 一个钩子,在msg加入OutboundBuffer之前被执行
        msg = filterOutboundMessage(msg);
        // 再次计算msg占用空间大小,一般size = ByteBuf#readableBytes
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        // ...
        return;
    }
    // 加入OutboundBuffer中的Entry链表
    outboundBuffer.addMessage(msg, size, promise);
}
复制代码

对于NioSocketChannel,filterOutboundMessage主要是校验msg实例是ByteBuf对象或FileRegion对象,因为经过用户代码各种转换以后,msg无法保证是一个ByteBuf。

// AbstractNioByteChannel
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        // 如果没使用直接内存,尝试转换为DirectBuffer
        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw new UnsupportedOperationException(...);
}
复制代码

三、flush

flush.png

flush的流程与write类似,这里直接定位到Unsafe的flush方法

// AbstractUnsafe
@Override
public final void flush() {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    // 移动unflushEntry到flushedEntry
    outboundBuffer.addFlush();
    // 循环发送flushedEntry里的ByteBuf到对端
    flush0();
}
复制代码

首先,调用出栈缓冲区的addFlush方法,将待发送Entry放入已发送Entry,上面讲过不贴代码了。flush0方法调用抽象AbstractChannel#doWrite方法。

// AbstractUnsafe
protected void flush0() {
    if (inFlush0) {
        return;
    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }
    inFlush0 = true;
    if (!isActive()) {
        // ...
        return;
    }
    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        // ...
    } finally {
        inFlush0 = false;
    }
}
/**
* Flush the content of the given buffer to the remote peer.
*/
// AbstractChannel
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
复制代码

NioSocketChannel的doWrite会循环处理出栈缓冲区中的所有flushedEntry,除非超出16次(writeSpinCount)循环。

// NioSocketChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    // 默认一次flush只能循环16次
    int writeSpinCount = config().getWriteSpinCount();
    do {
        // 当buffer中没有需要flush的Entry,结束
        if (in.isEmpty()) {
            clearOpWrite();
            return;
        }
        // 获取批量写入对端的字节数上限
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        // 转换NettyByteBuf为JDKByteBuffer
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        // ByteBuffer的数量
        int nioBufferCnt = in.nioBufferCount();

        switch (nioBufferCnt) {
            case 0:
                // in中存在FileRegion,不仅仅是bytebuf的情况
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // 单个ByteBuffer写入对端
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                long attemptedBytes = in.nioBufferSize();
                // 将ByteBuffer批量写入对端
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                // 调整批量写入对端的字节数上限
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                // 根据写入的byte数据多少,移除flushedEntry
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);
    // 如果没处理完,设置关注WRITE事件
    incompleteWrite(writeSpinCount < 0);
}
复制代码

doWrite循环内部主要做几个事情

  • ChannelOutboundBuffer.nioBuffers:转换NettyByteBuf为JDKByteBuffer,这样才可以调用底层JDK的Channel写数据。
  • Channel.write:调用JDK的Channel的write方法,将JDKByteBuffer批量写入对端。
  • adjustMaxBytesPerGatheringWrite:调整批量写入对端的字节数上限。
  • ChannelOutboundBuffer.removeBytes:根据写入对端的字节数数量,移除flushedEntry。
  • incompleteWrite:如果循环16次没处理完,设置SelectionKey关注WRITE事件。

adjustMaxBytesPerGatheringWrite如何调整每次写入对端的字节数上限?

private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
    if (attempted == written) {
        if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
            ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
        }
    } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
        ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
    }
}
复制代码

attempted表示循环内每次剩余待flush字节数,这个数字在经过几次循环后会慢慢变小;written表示实际写入对端的字节数;oldMaxBytesPerGatheringWrite表示当前flush对端字节数上限。

当attempted和written一致时,如果attempted * 2 > oldMaxBytesPerGatheringWrite,那么会放宽上限值为attempted * 2,这表明OS可能支持更多字节写入对端,动态调整上限以提高后续刷写的速率

当attempted超过4KB,且written小于attempted/2,那么会收紧上限值为 attempted / 2,这表明OS不能满足目前的刷写速率,需要收紧刷写速率

MaxBytesPerGatheringWrite的初始值是多少呢?

在Channel构造时,会同时构造NioSocketChannelConfig配置,此处通过calculateMaxBytesPerGatheringWrite方法计算得到默认的上限值。

private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
    private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
    private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
        super(channel, javaSocket);
        calculateMaxBytesPerGatheringWrite();
    }
    private void calculateMaxBytesPerGatheringWrite() {
      // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
        int newSendBufferSize = getSendBufferSize() << 1;
        if (newSendBufferSize > 0) {
          	setMaxBytesPerGatheringWrite(newSendBufferSize);
        }
    }
}
复制代码

这个数字取决于JDKSocket的SO_SNDBUF,取决于操作系统,我本地等于128K,则MaxBytesPerGatheringWrite=128K << 1=256K。

// DefaultSocketChannelConfig
@Override
public int getSendBufferSize() {
    try {
        return javaSocket.getSendBufferSize();
    } catch (SocketException e) {
        throw new ChannelException(e);
    }
}
// Socket
public synchronized int getSendBufferSize() throws SocketException {
  	if (isClosed())
    		throw new SocketException("Socket is closed");
  	int result = 0;
  	Object o = getImpl().getOption(SocketOptions.SO_SNDBUF);
  	if (o instanceof Integer) {
    		result = ((Integer)o).intValue();
  	}
  	return result;
}
复制代码

如何配置呢?

配置ChannelOption.SO_SNDBUF,单位为byte字节,可以调用Socket的native方法设置SO_SNDBUF大小。

public class XClient {
    public static void main(String[] args) throws Exception {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
              // ...
        }
    }
}
复制代码

在Channel初始化时,会调用DefaultSocketChannelConfig#setSendBufferSize设置SO_SNDBUF选项。

@Override
public SocketChannelConfig setSendBufferSize(int sendBufferSize) {
    try {
        javaSocket.setSendBufferSize(sendBufferSize);
    } catch (SocketException e) {
        throw new ChannelException(e);
    }
    return this;
}
复制代码

ChannelOutboundBuffer.removeBytes如何根据写入对端字节数,移除flushedEntry?

Channel.write方法是个批量写入对端的方法,所以这里要根据批量写入的字节数,反过来判断哪些entry需要从链表中移除。

逻辑也比较普通,循环至writtenBytes等于0,每次循环要么移除链表头节点entry,要么最后剩余的writtenBytes不满头节点entry的可读字节,则移动头节点entry的读指针。

// writtenBytes代表批量写入对端的字节数
public void removeBytes(long writtenBytes) {
    for (;;) {
        // 获取flushedEntry指向的entry
        Object msg = current();
        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;

        if (readableBytes <= writtenBytes) {
            // 如果这个ByteBuf的可读字节,小于实际写入对端字节数,
            // 表示这个ByteBuf已经写完了,可以移除对应entry
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            // 移除当前指向的flushedEntry,可能触发低水位,通道变为可写
            remove();
        } else { // readableBytes > writtenBytes
            // 剩余的writtenBytes不满头节点的可读字节,移动头节点的读指针
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}
复制代码

四、encode

为了方便用户实现编码,Netty提供了编码骨架抽象类MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
}
复制代码

继承MessageToByteEncoder后,用户只需要实现decode方法即可。实现起来没什么讲究,只需要将实体类按照自己的编码方式写入ByteBuf即可。

如自定义XRequestEncoder,将XRequest写入ByteBuf。

public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.REQUEST);
        byte[] bytes = objectMapper.writeValueAsBytes(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}
复制代码

如自定义XResponseEncoder,将XResponse写入ByteBuf。

public class XResponseEncoder extends MessageToByteEncoder<XResponse> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XResponse msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.RESPONSE);
        byte[] bytes = objectMapper.writeValueAsBytes(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}
复制代码

1、泛型I的作用

MessageToByteEncoder的泛型I,代表了需要进行编码的实体类,MessageToByteEncoder支持在一个Pipeline中存在多个编码器,只需要write方法传入的实体类与泛型I匹配。如果channel.write传入的是XRequest,那么将走XRequestEncoder,如果channel.write传入的是XResponse,那么将走XResponseEncdoer。

MessageToByteEncoder构造时,会创建一个TypeParameterMatcher。

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {

    private final TypeParameterMatcher matcher;
    private final boolean preferDirect;
    protected MessageToByteEncoder(boolean preferDirect) {
        // 传入 当前实例,泛型类,泛型标记 得到一个TypeParameterMatcher
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect;
    }
}
复制代码

TypeParameterMatcher是一个抽象接口,只有一个match方法用于判断入参是否匹配,实现类是ReflectiveMatcher,通过instanceof的方式判断入参msg是否是泛型I对应Class实例。

private static final class ReflectiveMatcher extends TypeParameterMatcher {
    private final Class<?> type;
    ReflectiveMatcher(Class<?> type) {
        this.type = type;
    }
    @Override
    public boolean match(Object msg) {
        return type.isInstance(msg);
    }
}
复制代码

MessageToByteEncoder的write方法使用责任链模式,判断msg instanceof I,如果入参msg是I实例才会执行编码,反之会走下一个Handler(可能是另外一个Encoder)

// 使用匹配器判断msg是否能够处理
public boolean acceptOutboundMessage(Object msg) throws Exception {
  	return matcher.match(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 如果msg是泛型I对应类的实例,当前编码器才会处理
        if (acceptOutboundMessage(msg)) {
            I cast = (I) msg;
            // ...
        } else {
            // 否则传递给下一个Handler
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}
复制代码

2、调用encode

调用用户encode方法逻辑也比较简单。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            // 安全强转
            I cast = (I) msg;
            // 分配ByteBuf,子类可以重写
            buf = allocateBuffer(ctx, cast, preferDirect);
            // 用户encode
          	encode(ctx, cast, buf);
            if (buf.isReadable()) {
                // 如果用户向Bytebuf写入了数据,则继续向后传播,
                // 最后会传播到TailContext写入出栈缓冲区
                ctx.write(buf, promise);
            } else {
                // 用户没有向ByteBuf写入数据,释放资源,向后传播空Buffer
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } 
    }
}
复制代码

给用户的扩展点是allocateBuffer方法,允许用户根据实际情况创建合适的ByteBuf,用于写入数据,这里默认preferDirect=true。

/**
 * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
 * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
 */
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg,
                           boolean preferDirect) throws Exception {
    if (preferDirect) {
        return ctx.alloc().ioBuffer();
    } else {
        return ctx.alloc().heapBuffer();
    }
}
复制代码

总结

  • ChannelOutboundBuffer缓冲区:

    用户write是写入这个缓冲区;用户flush是将这个缓冲区的数据写入对端;用户writeAndFlush是组合了上述两个操作。

    ChannelOutboundBuffer同时具备流控功能,根据用户配置的WriteBufferWaterMark决定高低水位线,默认低水位线是32k,高水位线是64k。

    框架内部会调用incrementPendingOutboundBytes增加当前buffer中的pendingSize,如果超出高水位线,会设置Channel不可写,触发channelWritabilityChanged入栈事件;

    框架内部会调用decrementPendingOutboundBytes减少当前buffer中的pendingSize,如果低于低水位线,会设置Channel可写,触发channelWritabilityChanged入栈事件。

    用户代码可以处理channelWritabilityChanged事件,控制write速率,参照ShardingProxy的MySQLCommandExecuteEngine实现。

    ChannelOutboundBuffer使用两个链表结构队列存储数据。

    unflushedEntry代表未发送entry,write方法就是将数据写入entry,放入这个链表。

    flushedEntry代表”已发送”entry,flush方法会将unflushedEntry链表中的元素都移动到flushedEntry,循环所有flushedEntry中的元素执行flush并移除。

  • 用户代码使用channel.write,触发出栈write事件。

    write.png

    TailContext会提交一个WriteTask到Channel对应的EventLoop处理,而不是直接写入对端。

    创建WriteTask,会增加ChannelOutboundBuffer的pendingSize;

    提交WriteTask,会减少ChannelOutboundBuffer的pendingSize。

    执行WriteTask,首先可能会经过用户定义的编码器,最后进入Unsafe(AbstractUnsafe)将Bytebuf封装为Entry,放入ChannelOutboundBuffer缓冲区,在放入缓冲区的时候,也会增加ChannelOutboundBuffer的pendingSize

    对应ChannelOutboundBuffer的pendingSize增加和减少会触发channelWritabilityChanged事件。

  • 用户代码使用channel.flush,触发出栈flush事件。

    flush.png

    flush事件最终会交由Unsafe处理,将缓存在ChannelOutboundBuffer中的Entry里的ByteBuf通过Channel写入对端。

    Entry里的ByteBuf写完后,会被从链表结构中移除,导致ChannelOutboundBuffer的pendingSize减少,如果低于低水位线,Channel变为可写,会触发channelWritabilityChanged事件。

  • 继承Netty提供MessageToByteEncoder编码器抽象类,可以实现自定义编码。

    MessageToByteEncoder里的泛型,代表当前编码器可以处理的实体类。

    public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
      private final ObjectMapper objectMapper = new ObjectMapper();
    
      @Override
      protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.REQUEST);
        byte[] bytes = objectMapper.writeValueAsBytes(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
      }
    }
    复制代码

    一个ChannelPipeline里,可以有多个编码器,每个编码器负责处理不同Class的编码工作。

    如果编码器负责处理对应Class的编码工作,但是encode方法不写入ByteBuf out,对应msg会被丢弃。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享