Netty源码(十)启动

前言

本章学习Netty启动流程,分为客户端和服务端。

  • 理解Future和Promise
  • 服务端启动:创建Channel、初始化Channel、注册Channel、bind
  • 客户端启动:创建Channel、初始化Channel、注册Channel、connect
  • 出栈事件的传播:服务端read/bind、客户端connect

一、Future与Promise

先来回顾一下JUC的Future。一般我们将Callable提交到Executor得到JUC的Future,通过Future可以得到异步任务的执行结果。

public interface Future<V> {
    // 取消任务
    // mayInterruptIfRunning=true,可以中断执行中的任务;否则只能取消未执行的任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 任务是否在正常执行完成前被取消
    boolean isCancelled();
    // 返回任务是否结束,包括正常终止、异常终止、取消等情况。
    boolean isDone();
    // 等待任务执行完成后返回结果V,如果任务执行发生异常,会抛出ExecutionException
    V get() throws InterruptedException, ExecutionException;
    // 等待任务执行完成后返回结果V,超时抛出TimeoutException
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

Future与Promise.png

Netty的Future继承了JUC的Future。主要扩展了几点:

  • JUC的isDone只能表示异步任务的终态,但是具体是因为什么原因结束的任务,客户端不知道。所以Netty的Future区分了isDone的细分状态:成功、异常、取消
  • 和JUC的get方法类似,提供sync和wait方法返回当前Future,根据Future的成功/异常/取消可以做一些其他事情。
  • 增加监听器,监听Future处理完成事件
  • 增加getNow方法,支持非阻塞获取执行结果
public interface Future<V> extends java.util.concurrent.Future<V> {
    // isDone为true,且是执行成功的场景
    boolean isSuccess();
    // isDone为true,且是取消的场景
    boolean isCancellable();
    // 如果sDone为false,返回null
    // 否则,返回执行过程中的异常
    Throwable cause();
		// 为当前future添加监听器
    // 如果调用时,future没执行完,那在future.isDone=true的时候,会通知Listener
    // 如果调用时,future已经执行完了,会立即同步通知Listener
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
		// 删除监听器
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // await等待任务执行完毕,与JDK的get不同,不会抛出任务执行异常
		Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeoutMillis);
    // 等待任务执行完成,如果任务执行发生异常会抛出 类似JDK Future的get
    // 而wait只会等待任务完成,不会主动抛出任务执行异常
    Future<V> sync() throws InterruptedException;
    // 与sync的区别是,不会抛出中断异常,而是设置Thread的interrupt标志位为true并吃掉中断异常
    Future<V> syncUninterruptibly();
    // 立即返回执行结果,如果future没执行完,这里返回null
    // 不能依赖getNow=null判断任务是否执行完,还是要通过isDone判断任务是否执行完
    V getNow();
}
复制代码

Future是面向方法调用方的,用来获取异步执行结果;Promise是面向方法执行方的,用来设置任务执行结果。一般来说,当Promise设置结果后,Future的状态就会改变,继而唤醒阻塞等待的方法调用方。

Netty的Promise继承了Future,负责设置异步任务执行成功或失败

public interface Promise<V> extends Future<V> {
    // 标记future成功,如果标记失败会抛出IllegalStateException
    Promise<V> setSuccess(V result);
    // 标记future成功,成功标记返回true
    boolean trySuccess(V result);
    // 标记future失败,如果标记失败会抛出IllegalStateException
    Promise<V> setFailure(Throwable cause);
    // 标记future失败,成功标记返回true
    boolean tryFailure(Throwable cause);
    // 设置future不可取消
    boolean setUncancellable();
}
复制代码

ChannelFuture是关联了Channel实例的Future,并且Future任务返回为Void。

public interface ChannelFuture extends Future<Void> {
    /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     */
    Channel channel();
}
复制代码

ChannelPromise是关联了Channel实例的Promise,并且任务返回为Void。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    @Override
    Channel channel();
    // setSuccess(null)的快捷方法
    ChannelPromise setSuccess();
    // trySuccess(null)的快捷方法
    boolean trySuccess();
}
复制代码

接下来看一下Future和Promise的默认实现。

首先是AbstractFuture,实现了JUC的Future的get方法,其实都是委托NettyFuture的await方法,只是最后适配了JDK的接口实现。言外之意,NettyFuture的await约等于JDKFuture的get

public abstract class AbstractFuture<V> implements Future<V> {
    @Override
    public V get() throws InterruptedException, ExecutionException {
        await();
        Throwable cause = cause();
        if (cause == null) {
            return getNow();
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        throw new ExecutionException(cause);
    }
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (await(timeout, unit)) {
            Throwable cause = cause();
            if (cause == null) {
                return getNow();
            }
            if (cause instanceof CancellationException) {
                throw (CancellationException) cause;
            }
            throw new ExecutionException(cause);
        }
        throw new TimeoutException();
    }
}
复制代码

DefaultPromise是Netty的Future和Promise的骨架实现。

成员变量如下:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    // result原子更新器
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // 任务执行结果
    private volatile Object result;
    // 对应EventLoop
    private final EventExecutor executor;
    /**
     * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
     * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
     *
     * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
     */
    // GenericFutureListener或DefaultFutureListeners
    private Object listeners;
    /**
     * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
     */
    // 等待synchronized(this)锁的线程数
    private short waiters;

    /**
     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
     * executor changes.
     */
    // 是否已经通知了所有listeners
    private boolean notifyingListeners;
}
复制代码

接下来看几个代表性方法。

首先是Future接口的抽象方法await。注意这里的checkDeadLock方法,是一种防御性编程。框架的提供者不知道调用方是否理解框架,防止调用方错误调用await方法导致程序异常,不如主动检测来提高程序的鲁棒性。另外这也意味着,await方法只会在非Promise对应的EventLoop线程中被调用,而唤醒操作只有对应EventLoop才能唤醒

@Override
public Promise<V> await() throws InterruptedException {
    // 判断future是否已经完成
    if (isDone()) {
        return this;
    }
    // 如果如果当前线程被中断,抛出InterruptedException
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    // 校验调用await方法的线程(当前线程)是否是对应EventLoop线程
    // 如果是的话要抛出异常,否则会造成其他线程一直wait在这个Future上
    checkDeadLock();
    // Object.wait,等待EventLoop线程唤醒
    synchronized (this) {
        while (!isDone()) {
            incWaiters(); // waiters++
            try {
                wait();
            } finally {
                decWaiters(); // waiters--
            }
        }
    }
    return this;
}

protected void checkDeadLock() {
  EventExecutor e = executor();
  if (e != null && e.inEventLoop()) {
    throw new BlockingOperationException(toString());
  }
}
复制代码

可超时的await方法,底层只是循环调用jdk的wait(timout)方法,不说了。接下来看一下sync方法的实现。sync方法也是调用await方法实现阻塞等待,唯一不同的是,如果Future由于异常结束,会抛出异常

@Override
public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}
private void rethrowIfFailed() {
  Throwable cause = cause();
  if (cause == null) {
    return;
  }
  PlatformDependent.throwException(cause);
}
复制代码

getNow方法,直接取成员变量result返回,但是对特殊情况返回null。

@Override
public V getNow() {
    Object result = this.result;
    // 异常 / 成功但是V是void / 无法取消
    if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
        return null;
    }
    return (V) result;
}
复制代码

然后来看个Promise抽象方法的实现,setSuccess

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
  // 如果result = null,设置为SUCCESS常量Object,呼应上面的getNow
  return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
  // cas设置this.result = objectResult
  if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
      RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
    // notifyAll
    if (checkNotifyWaiters()) {
      // 通知所有listeners
      notifyListeners();
    }
    return true;
  }
  return false;
}
// 如果有等待Future的线程,全部唤醒
private synchronized boolean checkNotifyWaiters() {
  if (waiters > 0) {
    notifyAll();
  }
  return listeners != null;
}
// 通知所有listeners
private void notifyListenersNow() {
  Object listeners;
  // 修改notifyingListeners标志位,并获取所有待通知listeners
  synchronized (this) {
    if (notifyingListeners || this.listeners == null) {
      return;
    }
    notifyingListeners = true;
    listeners = this.listeners;
    this.listeners = null;
  }
  // 通知listeners
  for (;;) {
    if (listeners instanceof DefaultFutureListeners) {
      // 如果是DefaultFutureListeners循环里面所有Listener通知
      notifyListeners0((DefaultFutureListeners) listeners);
    } else {
      // 如果是GenericFutureListener,直接通知这个Listener
      notifyListener0(this, (GenericFutureListener<?>) listeners);
    }
    // 每次通知完成后,再次检测是否有新的listeners需要被通知,如果有继续循环
    synchronized (this) {
      if (this.listeners == null) {
        notifyingListeners = false;
        return;
      }
      listeners = this.listeners;
      this.listeners = null;
    }
  }
}
// 通知多个listeners
private void notifyListeners0(DefaultFutureListeners listeners) {
  GenericFutureListener<?>[] a = listeners.listeners();
  int size = listeners.size();
  for (int i = 0; i < size; i ++) {
    notifyListener0(this, a[i]);
  }
}
// 通知单个listener
private static void notifyListener0(Future future, GenericFutureListener l) {
  try {
    l.operationComplete(future);
  } catch (Throwable t) {
  }
}
复制代码

DefaultChannelPromise是一个常见的默认Future和Promise实现,绑定了channel。

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
    private final Channel channel;
    public DefaultChannelPromise(Channel channel) {
        this.channel = checkNotNull(channel, "channel");
    }
    
    @Override
    public Channel channel() {
        return channel;
    }
}
复制代码

二、服务端启动

Server端启动的入口在AbstractBootstrap#doBind方法。

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1. 创建并注册Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 2. 绑定端口
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
    	// 如果注册没完成,异步处理...
    }
}
复制代码

无论是客户端还是服务端都需要创建/初始化/注册Channel,只不过Channel的类型不同。根据BootStrap配置,Server端是NioServerSocketChannel,而Client端是NioSocketChannel。

Channel初始化并注册的入口在AbstractBootstrap#initAndRegister

// 构建Channel实例的工厂,根据配置channel类型不同,创建不同Channel实例
private volatile ChannelFactory<? extends C> channelFactory;
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 1. 创建Channel
        channel = channelFactory.newChannel();
        // 2. 初始化Channel
        init(channel);
    } catch (Throwable t) {
      // 关闭channel返回失败
    }
    // 3. 注册Channel
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        // 关闭channel返回失败
    }
    return regFuture;
}
复制代码

1、创建Channel

首先ReflectiveChannelFactory会根据channel的类型,通过反射创建不同的channel实例,比如ServerBootStrap一般配置了channel(NioServerSocketChannel.class),这里就会创建NioServerSocketChannel实例。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException();
        }
    }

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException();
        }
    }
}
复制代码

NioServerSocketChannel的构造方法并不简单,继承结构很深。

NioServerSocketChannel.png

首先NioServerSocketChannel创建了JDK的ServerSocketChannel,将JDK的channel和关注的ACCEPT事件传入父类构造。

public NioServerSocketChannel() {
  this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 1. 创建JDK的ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
  try {
    return provider.openServerSocketChannel();
  } catch (IOException e) {
    throw new ChannelException();
  }
}
public NioServerSocketChannel(ServerSocketChannel channel) {
    // 2. 调用父类构造 传入JDK channel和ACCEPT事件枚举
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码

AbstractNioMessageChannel继续往上透传。

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
}
复制代码

AbstractNioChannel将JDK的channel和关注事件保存下来,并设置channel非阻塞。

public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;
    protected final int readInterestOp;
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
     	  // 设置非阻塞
        ch.configureBlocking(false);
    }
}
复制代码

最后AbstractChannel创建了io.netty.channel.Channel.Unsafe真正负责底层通讯,对于NioServerSocketChannel由AbstractNioMessageChannel创建了NioMessageUnsafe,创建了DefaultChannelPipeline负责传播入栈出栈事件并组合ChannelHandler链表。

protected AbstractChannel(Channel parent) {
  this.parent = parent;
  id = newId();
  unsafe = newUnsafe();
  pipeline = newChannelPipeline();
}
// AbstractNioMessageChannel -> NioMessageUnsafe
protected abstract AbstractUnsafe newUnsafe();

protected DefaultChannelPipeline newChannelPipeline() {
  return new DefaultChannelPipeline(this);
}
复制代码

2、初始化Channel

接下来会调用AbstractBootstrap的init抽象方法,由ServerBootStrap和BootStrap分别实现。服务端由ServerBootStrap初始化NioServerSocketChannel

@Override
void init(Channel channel) {
    // 1.设置ServerChannel的option和attr
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    // 2.设置ServerChannel的handler
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            // 针对ServerChannel的Handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            // 额外加入Netty自己的ServerBootstrapAcceptor负责处理连接事件
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 构造ServerBootstrapAcceptor时,将child相关配置都传入
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
复制代码

Server端初始化NioServerSocketChannel主要做了两件事情:

  • 根据配置的option和attr,将这些配置放到channel的config和attr里。
  • 在channel的Pipeline中,加入ChannelInitializer,这个特殊的ChannelHandler会在Channel注册到Selector上后,触发initChannel钩子。这里加入了Netty自己的ServerBootstrapAcceptor负责处理ACCEPT事件。

构造ServerBootstrapAcceptor这个Netty的InboundChannelHandler时,将配置的childXXX都传入了,为了处理客户端channel连接,对客户端channel初始化和绑定selector。

接下来看一下DefaultChannelPipeline的addLast方法如何将ChannelHandler转换为ChannelHandlerContext加入Pipeline构成链表。

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 检测非线程安全的handler是否被重复添加进不同的pipeline
        // @Sharable注解注释的handler认为是线程安全的,可以共享
        checkMultiplicity(handler);

        // 创建DefaultChannelHandlerContext
        newCtx = newContext(group, filterName(name, handler), handler);

        // 链表操作 head -> newCtx -> tail
        addLast0(newCtx);

        // 当channel没注册到selector上时,先构建一个Task放在pendingHandlerCallbackHead链表里
        // 待channel注册到selector上时,再执行ChannelHandler.handlerAdded钩子
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
	// 当channel已经注册完成,异步执行ChannelHandler.handlerAdded钩子
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    // channel已经注册完成,同步执行ChannelHandler.handlerAdded钩子
    callHandlerAdded0(newCtx);
    return this;
}
复制代码

这里有几个关键点。

1、@Shareable注解

Netty为了防止客户端错误的使用ChannelHandler,默认ChannelHandler不能作为单例被多次加入不同的Pipeline中,只有当客户端确认编写的ChannelHandler是线程安全的并用@Shareable注解标记,才允许将单例Handler多次加入不同的Pipeline。

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        // Sharable注解的作用,如果Handler是线程安全的才能被多次add到不同的Pipeline中
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException();
        }
        h.added = true;
    }
}
复制代码

2、ChannelHandlerContext

第二个是Pipeline中的链表结构,是由ChannelHandlerContext连接而成,并非ChannelHandler。

当addLast没传入指定EventLoopGroup时,默认情况下创建的DefaultChannelHandlerContext是不会直接绑定EventLoop的,如果希望某个Handler单独分配一个线程执行,addLast可以传入一个EventLoopGroup。

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 从EventLoopGroup中选择一个EventLoop与Context做绑定
private EventExecutor childExecutor(EventExecutorGroup group) {
  if (group == null) {
    return null;
  }
  //...
}
复制代码

DefaultChannelHandlerContext继承AbstractChannelHandlerContext,没有什么特殊的实现,只是实现了handler方法获取与当前上下文绑定的ChannelHandler。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}
复制代码

那么Context如果没直接绑定EventLoop,如何实现Netty的串行化处理呢?实际上Context如果没绑定EventLoop,默认会取channel绑定的EventLoop。见AbstractChannelHandlerContext的executor方法返回EventLoop。

@Override
public EventExecutor executor() {
    if (executor == null) {
        return channel().eventLoop();
    } else {
        return executor;
    }
}
复制代码

3、HeadContext与TailContext

在ChannelPipeline中,除了用户通过addLast方法加入的Context之外,Context双向链表头部会有一个HeadContext。HeadContext主要负责传播入栈事件、操作unsafe实例进行IO操作

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {
    private final Unsafe unsafe;
  @Override
  public void bind(
    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
  }

  @Override
  public void connect(
    ChannelHandlerContext ctx,
    SocketAddress remoteAddress, SocketAddress localAddress,
    ChannelPromise promise) {
    unsafe.connect(remoteAddress, localAddress, promise);
  }
    @Override
    public void read(ChannelHandlerContext ctx) {
    	unsafe.beginRead();
    }
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    	unsafe.write(msg, promise);
    }
    @Override
    public void flush(ChannelHandlerContext ctx) {
    	unsafe.flush();
    }
}
复制代码

此外Context双向链表末尾有一个TailContext,主要用于传播出栈事件、回收资源

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, TailContext.class);
        setAddComplete();
    }
  
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
      onUnhandledInboundMessage(ctx, msg);
    }

}
// AbstractChannelHandlerContext兜底处理未处理msg,释放资源
protected void onUnhandledInboundMessage(Object msg) {
  try {
    logger.debug(
      "Discarded inbound message {} that reached at the tail of the pipeline. " +
      "Please check your pipeline configuration.", msg);
  } finally {
    ReferenceCountUtil.release(msg);
  }
}
复制代码

4、何时触发channelAdded钩子

addLast方法一般情况下会触发ChannelHandler的channelAdded钩子,但是在Channel初始化阶段这里并不会触发channelAdded,而是推迟到Channel注册到Selector上之后触发。

所以第一个触发channelAdded钩子的场景是注册Channel完成后触发,在addLast方法内会构造一个Task组成的链表,当Channel注册完毕后执行所有Task。

// DefaultChannelPipeline
private PendingHandlerCallback pendingHandlerCallbackHead;
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    // 构造一个callback,加入callback链表,等待后续合适时机触发
    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        while (pending.next != null) {
            pending = pending.next;
        }
        pending.next = task;
    }
}
private abstract static class PendingHandlerCallback implements Runnable {
  final AbstractChannelHandlerContext ctx;
  PendingHandlerCallback next;

  PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
    this.ctx = ctx;
  }

  abstract void execute();
}
复制代码

除了上一个场景以外,判断当前线程是否是EventLoop对应线程,如果是的话同步执行钩子,否则提交到EventLoop执行钩子。

3、注册Channel

initAndRegister方法的最后一步,是将初始化好的channel实例,注册到JDK的selector上。

final ChannelFuture initAndRegister() {
    // ...
    // 3. 注册Channel
    ChannelFuture regFuture = config().group().register(channel);
    // ...
    return regFuture;
}
复制代码

首先根据配置获取对应的EventLoopGroup,对于Server端EventLoopGroup实例是MultithreadEventLoopGroup

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
复制代码

首先next方法选择EventLoopGroup中的某个EventLoop。

接着调用SingleThreadEventLoop#register方法注册channel,这里会把channel封装到一个DefaultChannelPromise中调用另外一个重载register方法,注意到这里将EventLoop自身绑定到Promise中了(Netty串行化的秘诀就是把执行线程和实例绑定)。

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
复制代码

最终调用Unsafe的register方法,注册ChannelPromise。

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
复制代码

这里调用的是AbstractChannel内部类AbstractUnsafe的register方法,AbstractUnsafe是Unsafe的骨架实现。这里做的最重要的事情有两点,一点是将EventLoop与当前Channel实例绑定,另一点就是register0真正的注册逻辑

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  // 不允许重复注册
  if (isRegistered()) {
    promise.setFailure(...);
    return;
  }
  // 判断EventLoop是不是Unsafe支持的EventLoop,防止传入错误的EventLoop
  if (!isCompatible(eventLoop)) {
    promise.setFailure(...));
    return;
  }

  // 【重要】绑定channel与eventLoop
  AbstractChannel.this.eventLoop = eventLoop;

  if (eventLoop.inEventLoop()) {
    register0(promise);
  } else {
    // 服务端启动往往是main线程,与EventLoop不是同一个线程,执行这里异步调用register0
    try {
      eventLoop.execute(new Runnable() {
        @Override
        public void run() {
          register0(promise);
        }
      });
    } catch (Throwable t) {
      // ...
    }
  }
}
复制代码

NioEventLoop的抽象父类,SingleThreadEventExecutor将task放入队列返回,EventLoop后续再深入了解。

// 默认情况下immediate=true
private void execute(Runnable task, boolean immediate) {
   // main != this.thread -> inEventLoop = false
    boolean inEventLoop = inEventLoop(); 
    // 将task加入任务队列
    addTask(task);
    if (!inEventLoop) {
        // 如果当前线程的不是EventLoop绑定的线程,尝试开启这个EventLoop对应的线程
        startThread();
        if (isShutdown()) {
          // ... 
        }
    }
    // 唤醒EventLoop
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}
复制代码

看一下AbstractUnsafe#register0方法的实现。

private void register0(ChannelPromise promise) {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // 注册到JDK的Selector上
        doRegister();
        neverRegistered = false;
        registered = true;
        // 触发init阶段添加的pendingHandlerCallbackHead任务,目的是执行ChannelHandler的channelAdded钩子
        // 这里会触发ChannelInitializer的init方法
        pipeline.invokeHandlerAddedIfNeeded();

        // 调用promise的trySuccess方法,唤醒阻塞在future上的线程
        safeSetSuccess(promise);
        // 触发入栈channelRegistered事件
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                // 如果是首次注册,触发入栈channelActive事件
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // 非首次注册,关注selector上的READ事件
                beginRead();
            }
        }
}
复制代码

首先doRegister会注册Channel到JDK的Selector上,doRegister是抽象方法,由AbstractNioChannel实现。

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
复制代码

接着,如果是首次注册,Pipeline#invokeHandlerAddedIfNeeded会触发之前init阶段缓存的handlerAdded钩子任务(见2-4何时触发channelAdded),也就是说这里会触发配置的ChannelInitializer的initChannel方法,将配置的ChannelHandler加入ChannelPipeline

执行完上面一步后,设置Promise结果为成功,唤醒阻塞在Future上的线程并触发所有Listener。

最后触发入栈channelRegistered事件,如果首次注册还会触发入栈channelActive事件,非首次注册是将channel关注事件设置为READ。

其实channelActive事件最终也是为了设置关注事件为READ。这里Pipeline会将入栈channelActive事件从HeadContext开始传播

final AbstractChannelHandlerContext head;
@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}
复制代码

HeadContext传播完channelActive事件后,会走readIfIsAUtoRead方法,默认情况下会直接触发channel的read方法。

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    readIfIsAutoRead();
}
private void readIfIsAutoRead() {
  if (channel.config().isAutoRead()) {
    channel.read();
  }
}
复制代码

read事件是为了设置SelectionKey关注的事件为READ(ACCEPT),先看看read出栈事件的传播链路。

read.png

AbstractChannel又会把read出栈事件放到Pipeline中传播。

@Override
public Channel read() {
    pipeline.read();
    return this;
}
复制代码

DefaultChannelPipeline将read事件交给TailContext传播。

@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}
复制代码

这个read事件从tail一直传播到head,head调用unsafe的beginRead方法修改关注事件为READ。

// HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
复制代码

最终调用AbstractNioChannel的doBeginRead方法,对于NioServerSocketChannel来说,这里并不是关注READ事件,而是关注ACCEPT事件。将关注事件注册到JDK的selectionKey上。

// 构造Channel时传入关注事件 = OP_ACCEPT = 1 << 4
protected final int readInterestOp; 
@Override
protected void doBeginRead() throws Exception {
  // ...
  final int interestOps = selectionKey.interestOps();
  if ((interestOps & readInterestOp) == 0) {
    selectionKey.interestOps(interestOps | readInterestOp);
  }
}
复制代码

4、绑定端口

回到AbstractBootstrap#doBind方法。此时initAndRegister可能仍然在执行register方法(EventLoop线程在执行)也可能register方法已经执行完了,会同步或异步地调用doBind0方法。

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1. 创建并注册Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    // 2. 绑定端口
    if (regFuture.isDone()) {
        // register方法已经执行完了,同步执行doBind0
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // register方法没执行完,异步执行doBind0
       // (虽然addListener还会校验isDone,可能也是一个同步操作,为了方便理解就认为是异步吧)
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {

              doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}
复制代码

doBind0基于串行化设计,仍然使用EventLoop线程执行Channel的bind方法。

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
复制代码

看一下Bind出栈事件的出栈链路,和read出栈事件一样。

bind2.png

AbstractChannel#bind调用channel对应的pipeline传播bind出栈事件。

private final DefaultChannelPipeline pipeline;
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
复制代码

DefaultChannelPipeline直接触发TailContext的bind方法

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
复制代码

这里实际执行的是抽象AbstractChannelHandlerContext的bind方法。

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // 找到下一个关注bind事件的Handler
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    // 串行化执行invokeBind方法
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
  ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
}
复制代码

bind事件会从TailContext一直传播到HeadContext。比如EchoServer案例中,我们加入了两个Handler,一个是LoggingHandler负责打印日志,一个是Netty服务端自带的ServerBootstrapAcceptor负责接入客户端Channel。

LoggingHandler关注bind事件,所以会被执行。

@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    if (logger.isEnabled(internalLevel)) {
        logger.log(internalLevel, format(ctx, "BIND", localAddress));
    }
    // 传播给下一个Context
    ctx.bind(localAddress, promise);
}
复制代码

LoggingHandler最后又调用了Context.bind,那么bind出栈事件会继续向下传播。虽然下一个Handler是ServerBootstrapAcceptor,但是由于ServerBootstrapAcceptor是个纯粹的InboundHandler,所以并不会被实际执行。

对于EchoServer来说,真正的下一个Handler是HeadContext。而HeadContext就是操作Unsafe进行底层IO操作。

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
复制代码

AbstractChannel.AbstractUnsafe调用子类NioServerSocketChannel的doBind方法,执行JDK的Channel绑定端口。

// AbstractChannel.AbstractUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    // 调用子类doBind
     doBind(localAddress);
    if (!wasActive && isActive()) {
        // 如果之前initAndRegister没触发过channelActive,这里触发一下
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}
// NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
  if (PlatformDependent.javaVersion() >= 7) {
    javaChannel().bind(localAddress, config.getBacklog());
  } 
}
复制代码

三、客户端启动

客户端启动的流程与服务端大致一样:

  • 创建Channel
  • 初始化Channel
  • 注册Channel
  • 连接服务端connect,传播connect出栈事件

重点关注几个点:

  • 创建的Channel类型不同,NioSocketChannel的构造方法
  • Channel关注的事件不同,服务端是ACCEPT,客户端是READ
  • 初始化Channel时,走的是BootStrap的init方法
  • connect出栈事件传播

Bootstrap#doResolveAndConnect是客户端启动的入口。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 1. 创建、初始化、注册Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
		// 2. connect
    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
      // ... 通过注册Listener的方式,异步处理,忽略
    }
}
复制代码

1、创建Channel

NioSocketChannel.png

NioSocketChannel的继承体系与NioServerSocketChannel大致一样,不同点是直接父类AbstractNioByteChannel。

public NioSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}
public NioSocketChannel(SocketChannel socket) {
    this(null, socket);
}
// 1. 创建JDK的SocketChannel
private static SocketChannel newSocket(SelectorProvider provider) {
  return provider.openSocketChannel();
}
// 2. 调用父类构造
public NioSocketChannel(Channel parent, SocketChannel socket) {
  super(parent, socket);
  config = new NioSocketChannelConfig(this, socket.socket());
}
复制代码

AbstractNioByteChannel关注的事件是READ,不同于AbstractNioMessageChannel是ACCEPT。

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}
复制代码

2、初始化Channel

BootStrapinit方法如下,相对于ServerBootStrap而言比较简单,没有增加特殊Handler也没有涉及childGroup配置。重点在ChannelPipeline的addLast方法,这个在Server端启动已经看过了。

void init(Channel channel) {
    ChannelPipeline p = channel.pipeline();
    // 将配置的handler加入ChannelPipeline
    p.addLast(config.handler());
    // 父类方法设置Channel的option
    setChannelOptions(channel, newOptionsArray(), logger);
    // 父类方法设置Channel的attr
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
复制代码

3、注册Channel

客户端注册NioSocketChannel到Selector上的流程与服务端NioServerSocketChannel一致。

注册完成后会触发channelAdded钩子,触发配置的ChannelInitializer的initChannel方法。

4、connect

connect.png

Bootstrap最终调用doConnect,执行Channel的connect方法,Channel是ChannelOutboundInvoker可以触发出栈事件。

private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
		// EventLoop串行化
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}
复制代码

接着调用NioSocketChannel父类AbstractChannel的connect方法,这里直接调用DefaultChannelPipeline的connect方法,DefaultChannelPipeline也是一个ChannelOutboundInvoker可以触发出栈事件。

// AbstractChannel
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, localAddress, promise);
}
复制代码

Pipeline调用TailContext触发connect出栈事件,TailContext继承AbstractChannelHandlerContext也是一个ChannelOutboundInvoker

@Override
public final ChannelFuture connect(
        SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}
复制代码

TailContext的抽象父类AbstractChannelHandlerContext传播connect事件给下一个Handler,一直传播到HeadContext这个ChannelOutboundHandler

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // 找到下一个关注connect事件的handler上下文执行
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
      // ...
    }
    return promise;
}

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
	((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}
复制代码

HeadContext调用Unsafe执行实际的connect操作。

@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) {
    unsafe.connect(remoteAddress, localAddress, promise);
}
复制代码

AbstractNioChannel.AbstractNioUnsafe#connect

  • 调用SocketChannel执行非阻塞connect
  • 通过提交一个延迟任务到EventLoop解决连接超时问题
  • 给Future新增Listener,处理连接完成后的异步回调
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  boolean wasActive = isActive();
  // 1. SocketChannel.connect,如果非阻塞这里返回false
  if (doConnect(remoteAddress, localAddress)) {
    fulfillConnectPromise(promise, wasActive);
  } else {
    connectPromise = promise;
    requestedRemoteAddress = remoteAddress;
     // 2. 处理连接超时,通过提交一个延迟任务到EventLoop中,默认超时时间30s
    int connectTimeoutMillis = config().getConnectTimeoutMillis();
    if (connectTimeoutMillis > 0) {
      connectTimeoutFuture = eventLoop().schedule(new Runnable() {
        @Override
        public void run() {
          ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
          ConnectTimeoutException cause =
            new ConnectTimeoutException("connection timed out: " + remoteAddress);
          if (connectPromise != null && connectPromise.tryFailure(cause)) {
            close(voidPromise());
          }
        }
      }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
    // 3. 添加一个Listener,当非阻塞的SocketChannel连接完成时,收到回调
    promise.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isCancelled()) {
          if (connectTimeoutFuture != null) {
            connectTimeoutFuture.cancel(false);
          }
          connectPromise = null;
          close(voidPromise());
        }
      }
    });
  }
}
复制代码

NioSocketChannel#doConnect,执行JDK的SocketChannel的connect方法,AbstractNioChannel的构造方法在Server端启动看到过,一定会设置Channel非阻塞,所以这里会将Channel关注事件设置为CONNECT

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
  // ...
  // 执行JDK的SocketChannel的connect方法
  boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
  // 如果SocketChannel非阻塞,这里返回false,设置关注事件为CONNECT
  if (!connected) {
    selectionKey().interestOps(SelectionKey.OP_CONNECT);
  }
  return connected;
}
复制代码

这里不去关注EventLoop如何轮询Selector.select,关注CONNECT事件发生后的回调,入口是NioUnsafe#finishConnect,实现是AbstractNioUnsafe#finishConnect

public final void finishConnect() {
  assert eventLoop().inEventLoop();

  try {
    boolean wasActive = isActive();
    // 1. 调用JDK的SocketChannel.finishConnect
    doFinishConnect();
    // 2. 设置Promise结果,触发ChannelActive入栈事件
    fulfillConnectPromise(connectPromise, wasActive);
  } catch (Throwable t) {
    fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
  } finally {
    // 3. 取消之前提交到EventLoop的连接超时检测任务
    if (connectTimeoutFuture != null) {
      connectTimeoutFuture.cancel(false);
    }
    connectPromise = null;
  }
}

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
  // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
  // We still need to ensure we call fireChannelActive() in this case.
  boolean active = isActive();
  // trySuccess() will return false if a user cancelled the connection attempt.
  boolean promiseSet = promise.trySuccess();
  // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
  // because what happened is what happened.
  if (!wasActive && active) {
    pipeline().fireChannelActive();
  }
  // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
  if (!promiseSet) {
    close(voidPromise());
  }
}
复制代码

总结

  • Future是面向方法调用方的,用来获取异步执行结果;Promise是面向方法执行方的,用来设置任务执行结果
  • 服务端启动:创建Channel、初始化Channel、注册Channel、bind。
  • 客户端启动:创建Channel、初始化Channel、注册Channel、connect。
  • Channel注册到Selector上后,触发ChannelAdded事件,执行ChannelInitializer的initChannel方法。
  • 一个ChannelHandler封装为一个ChannelHandlerContext放入ChannelPipeline。一个ChannelPipeline组装了ChannelHandlerContext形成的双向链表,头节点是HeadContext,尾节点是TailContext。出栈事件从TailContext开始传播到HeadContext,入栈事件从HeadContext传播到TailContext。

Pipeline.png

  • 出栈事件的传播方式,案例:服务端注册Channel后触发的read出栈事件(设置关注事件为ACCEPT)、服务端bind出栈事件、客户端connect出栈事件。出栈事件是通过ChannelOutboundInvoker->ChannelOutboundHandler->Unsafe的方式传播的。

    其中涉及的关键类如下:Channel(ChannelOutboundInvoker)->Pipeline(ChannelOutboundInvoker)->TailContext(ChannelOutboundInvoker)->ChannelOutboundHandlers->HeadContext(ChannelOutboundHandler) ->Unsafe。

  • ChannelActive触发时机:服务端bind完成后,客户端connect完成后。

  • ChannelInitializer的作用:当服务端NioServerSocketChannel/客户端NioSocketChannel注册完成后,触发initChannel方法加入业务ChannelHandler。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享