前言
本章学习Netty启动流程,分为客户端和服务端。
- 理解Future和Promise
- 服务端启动:创建Channel、初始化Channel、注册Channel、bind
- 客户端启动:创建Channel、初始化Channel、注册Channel、connect
- 出栈事件的传播:服务端read/bind、客户端connect
一、Future与Promise
先来回顾一下JUC的Future。一般我们将Callable提交到Executor得到JUC的Future,通过Future可以得到异步任务的执行结果。
public interface Future<V> {
// 取消任务
// mayInterruptIfRunning=true,可以中断执行中的任务;否则只能取消未执行的任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成前被取消
boolean isCancelled();
// 返回任务是否结束,包括正常终止、异常终止、取消等情况。
boolean isDone();
// 等待任务执行完成后返回结果V,如果任务执行发生异常,会抛出ExecutionException
V get() throws InterruptedException, ExecutionException;
// 等待任务执行完成后返回结果V,超时抛出TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
Netty的Future继承了JUC的Future。主要扩展了几点:
- JUC的isDone只能表示异步任务的终态,但是具体是因为什么原因结束的任务,客户端不知道。所以Netty的Future区分了isDone的细分状态:成功、异常、取消。
- 和JUC的get方法类似,提供sync和wait方法返回当前Future,根据Future的成功/异常/取消可以做一些其他事情。
- 增加监听器,监听Future处理完成事件。
- 增加getNow方法,支持非阻塞获取执行结果。
public interface Future<V> extends java.util.concurrent.Future<V> {
// isDone为true,且是执行成功的场景
boolean isSuccess();
// isDone为true,且是取消的场景
boolean isCancellable();
// 如果sDone为false,返回null
// 否则,返回执行过程中的异常
Throwable cause();
// 为当前future添加监听器
// 如果调用时,future没执行完,那在future.isDone=true的时候,会通知Listener
// 如果调用时,future已经执行完了,会立即同步通知Listener
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// await等待任务执行完毕,与JDK的get不同,不会抛出任务执行异常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeoutMillis);
// 等待任务执行完成,如果任务执行发生异常会抛出 类似JDK Future的get
// 而wait只会等待任务完成,不会主动抛出任务执行异常
Future<V> sync() throws InterruptedException;
// 与sync的区别是,不会抛出中断异常,而是设置Thread的interrupt标志位为true并吃掉中断异常
Future<V> syncUninterruptibly();
// 立即返回执行结果,如果future没执行完,这里返回null
// 不能依赖getNow=null判断任务是否执行完,还是要通过isDone判断任务是否执行完
V getNow();
}
复制代码
Future是面向方法调用方的,用来获取异步执行结果;Promise是面向方法执行方的,用来设置任务执行结果。一般来说,当Promise设置结果后,Future的状态就会改变,继而唤醒阻塞等待的方法调用方。
Netty的Promise继承了Future,负责设置异步任务执行成功或失败。
public interface Promise<V> extends Future<V> {
// 标记future成功,如果标记失败会抛出IllegalStateException
Promise<V> setSuccess(V result);
// 标记future成功,成功标记返回true
boolean trySuccess(V result);
// 标记future失败,如果标记失败会抛出IllegalStateException
Promise<V> setFailure(Throwable cause);
// 标记future失败,成功标记返回true
boolean tryFailure(Throwable cause);
// 设置future不可取消
boolean setUncancellable();
}
复制代码
ChannelFuture是关联了Channel实例的Future,并且Future任务返回为Void。
public interface ChannelFuture extends Future<Void> {
/**
* Returns a channel where the I/O operation associated with this
* future takes place.
*/
Channel channel();
}
复制代码
ChannelPromise是关联了Channel实例的Promise,并且任务返回为Void。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override
Channel channel();
// setSuccess(null)的快捷方法
ChannelPromise setSuccess();
// trySuccess(null)的快捷方法
boolean trySuccess();
}
复制代码
接下来看一下Future和Promise的默认实现。
首先是AbstractFuture,实现了JUC的Future的get方法,其实都是委托NettyFuture的await方法,只是最后适配了JDK的接口实现。言外之意,NettyFuture的await约等于JDKFuture的get。
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}
复制代码
DefaultPromise是Netty的Future和Promise的骨架实现。
成员变量如下:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// result原子更新器
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 任务执行结果
private volatile Object result;
// 对应EventLoop
private final EventExecutor executor;
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
*/
// GenericFutureListener或DefaultFutureListeners
private Object listeners;
/**
* Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
*/
// 等待synchronized(this)锁的线程数
private short waiters;
/**
* Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
* executor changes.
*/
// 是否已经通知了所有listeners
private boolean notifyingListeners;
}
复制代码
接下来看几个代表性方法。
首先是Future接口的抽象方法await。注意这里的checkDeadLock方法,是一种防御性编程。框架的提供者不知道调用方是否理解框架,防止调用方错误调用await方法导致程序异常,不如主动检测来提高程序的鲁棒性。另外这也意味着,await方法只会在非Promise对应的EventLoop线程中被调用,而唤醒操作只有对应EventLoop才能唤醒。
@Override
public Promise<V> await() throws InterruptedException {
// 判断future是否已经完成
if (isDone()) {
return this;
}
// 如果如果当前线程被中断,抛出InterruptedException
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 校验调用await方法的线程(当前线程)是否是对应EventLoop线程
// 如果是的话要抛出异常,否则会造成其他线程一直wait在这个Future上
checkDeadLock();
// Object.wait,等待EventLoop线程唤醒
synchronized (this) {
while (!isDone()) {
incWaiters(); // waiters++
try {
wait();
} finally {
decWaiters(); // waiters--
}
}
}
return this;
}
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
复制代码
可超时的await方法,底层只是循环调用jdk的wait(timout)方法,不说了。接下来看一下sync方法的实现。sync方法也是调用await方法实现阻塞等待,唯一不同的是,如果Future由于异常结束,会抛出异常。
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
复制代码
getNow方法,直接取成员变量result返回,但是对特殊情况返回null。
@Override
public V getNow() {
Object result = this.result;
// 异常 / 成功但是V是void / 无法取消
if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
return (V) result;
}
复制代码
然后来看个Promise抽象方法的实现,setSuccess。
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// 如果result = null,设置为SUCCESS常量Object,呼应上面的getNow
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// cas设置this.result = objectResult
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// notifyAll
if (checkNotifyWaiters()) {
// 通知所有listeners
notifyListeners();
}
return true;
}
return false;
}
// 如果有等待Future的线程,全部唤醒
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
// 通知所有listeners
private void notifyListenersNow() {
Object listeners;
// 修改notifyingListeners标志位,并获取所有待通知listeners
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
// 通知listeners
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
// 如果是DefaultFutureListeners循环里面所有Listener通知
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// 如果是GenericFutureListener,直接通知这个Listener
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
// 每次通知完成后,再次检测是否有新的listeners需要被通知,如果有继续循环
synchronized (this) {
if (this.listeners == null) {
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
// 通知多个listeners
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
// 通知单个listener
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
}
}
复制代码
DefaultChannelPromise是一个常见的默认Future和Promise实现,绑定了channel。
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
public DefaultChannelPromise(Channel channel) {
this.channel = checkNotNull(channel, "channel");
}
@Override
public Channel channel() {
return channel;
}
}
复制代码
二、服务端启动
Server端启动的入口在AbstractBootstrap#doBind方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 创建并注册Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 2. 绑定端口
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 如果注册没完成,异步处理...
}
}
复制代码
无论是客户端还是服务端都需要创建/初始化/注册Channel,只不过Channel的类型不同。根据BootStrap配置,Server端是NioServerSocketChannel,而Client端是NioSocketChannel。
Channel初始化并注册的入口在AbstractBootstrap#initAndRegister。
// 构建Channel实例的工厂,根据配置channel类型不同,创建不同Channel实例
private volatile ChannelFactory<? extends C> channelFactory;
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 创建Channel
channel = channelFactory.newChannel();
// 2. 初始化Channel
init(channel);
} catch (Throwable t) {
// 关闭channel返回失败
}
// 3. 注册Channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
// 关闭channel返回失败
}
return regFuture;
}
复制代码
1、创建Channel
首先ReflectiveChannelFactory会根据channel的类型,通过反射创建不同的channel实例,比如ServerBootStrap一般配置了channel(NioServerSocketChannel.class),这里就会创建NioServerSocketChannel实例。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException();
}
}
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException();
}
}
}
复制代码
NioServerSocketChannel的构造方法并不简单,继承结构很深。
首先NioServerSocketChannel创建了JDK的ServerSocketChannel,将JDK的channel和关注的ACCEPT事件传入父类构造。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 1. 创建JDK的ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException();
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 2. 调用父类构造 传入JDK channel和ACCEPT事件枚举
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码
AbstractNioMessageChannel继续往上透传。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
}
复制代码
AbstractNioChannel将JDK的channel和关注事件保存下来,并设置channel非阻塞。
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
// 设置非阻塞
ch.configureBlocking(false);
}
}
复制代码
最后AbstractChannel创建了io.netty.channel.Channel.Unsafe真正负责底层通讯,对于NioServerSocketChannel由AbstractNioMessageChannel创建了NioMessageUnsafe,创建了DefaultChannelPipeline负责传播入栈出栈事件并组合ChannelHandler链表。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
// AbstractNioMessageChannel -> NioMessageUnsafe
protected abstract AbstractUnsafe newUnsafe();
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
复制代码
2、初始化Channel
接下来会调用AbstractBootstrap的init抽象方法,由ServerBootStrap和BootStrap分别实现。服务端由ServerBootStrap初始化NioServerSocketChannel。
@Override
void init(Channel channel) {
// 1.设置ServerChannel的option和attr
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
// 2.设置ServerChannel的handler
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 针对ServerChannel的Handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 额外加入Netty自己的ServerBootstrapAcceptor负责处理连接事件
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 构造ServerBootstrapAcceptor时,将child相关配置都传入
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
复制代码
Server端初始化NioServerSocketChannel主要做了两件事情:
- 根据配置的option和attr,将这些配置放到channel的config和attr里。
- 在channel的Pipeline中,加入ChannelInitializer,这个特殊的ChannelHandler会在Channel注册到Selector上后,触发initChannel钩子。这里加入了Netty自己的ServerBootstrapAcceptor负责处理ACCEPT事件。
构造ServerBootstrapAcceptor这个Netty的InboundChannelHandler时,将配置的childXXX都传入了,为了处理客户端channel连接,对客户端channel初始化和绑定selector。
接下来看一下DefaultChannelPipeline的addLast方法如何将ChannelHandler转换为ChannelHandlerContext加入Pipeline构成链表。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检测非线程安全的handler是否被重复添加进不同的pipeline
// @Sharable注解注释的handler认为是线程安全的,可以共享
checkMultiplicity(handler);
// 创建DefaultChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
// 链表操作 head -> newCtx -> tail
addLast0(newCtx);
// 当channel没注册到selector上时,先构建一个Task放在pendingHandlerCallbackHead链表里
// 待channel注册到selector上时,再执行ChannelHandler.handlerAdded钩子
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 当channel已经注册完成,异步执行ChannelHandler.handlerAdded钩子
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// channel已经注册完成,同步执行ChannelHandler.handlerAdded钩子
callHandlerAdded0(newCtx);
return this;
}
复制代码
这里有几个关键点。
1、@Shareable注解
Netty为了防止客户端错误的使用ChannelHandler,默认ChannelHandler不能作为单例被多次加入不同的Pipeline中,只有当客户端确认编写的ChannelHandler是线程安全的并用@Shareable注解标记,才允许将单例Handler多次加入不同的Pipeline。
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// Sharable注解的作用,如果Handler是线程安全的才能被多次add到不同的Pipeline中
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException();
}
h.added = true;
}
}
复制代码
2、ChannelHandlerContext
第二个是Pipeline中的链表结构,是由ChannelHandlerContext连接而成,并非ChannelHandler。
当addLast没传入指定EventLoopGroup时,默认情况下创建的DefaultChannelHandlerContext是不会直接绑定EventLoop的,如果希望某个Handler单独分配一个线程执行,addLast可以传入一个EventLoopGroup。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 从EventLoopGroup中选择一个EventLoop与Context做绑定
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
//...
}
复制代码
DefaultChannelHandlerContext继承AbstractChannelHandlerContext,没有什么特殊的实现,只是实现了handler方法获取与当前上下文绑定的ChannelHandler。
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
}
复制代码
那么Context如果没直接绑定EventLoop,如何实现Netty的串行化处理呢?实际上Context如果没绑定EventLoop,默认会取channel绑定的EventLoop。见AbstractChannelHandlerContext的executor方法返回EventLoop。
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
复制代码
3、HeadContext与TailContext
在ChannelPipeline中,除了用户通过addLast方法加入的Context之外,Context双向链表头部会有一个HeadContext。HeadContext主要负责传播入栈事件、操作unsafe实例进行IO操作。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
}
复制代码
此外Context双向链表末尾有一个TailContext,主要用于传播出栈事件、回收资源。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(ctx, msg);
}
}
// AbstractChannelHandlerContext兜底处理未处理msg,释放资源
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
复制代码
4、何时触发channelAdded钩子
addLast方法一般情况下会触发ChannelHandler的channelAdded钩子,但是在Channel初始化阶段这里并不会触发channelAdded,而是推迟到Channel注册到Selector上之后触发。
所以第一个触发channelAdded钩子的场景是注册Channel完成后触发,在addLast方法内会构造一个Task组成的链表,当Channel注册完毕后执行所有Task。
// DefaultChannelPipeline
private PendingHandlerCallback pendingHandlerCallbackHead;
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
// 构造一个callback,加入callback链表,等待后续合适时机触发
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
private abstract static class PendingHandlerCallback implements Runnable {
final AbstractChannelHandlerContext ctx;
PendingHandlerCallback next;
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
abstract void execute();
}
复制代码
除了上一个场景以外,判断当前线程是否是EventLoop对应线程,如果是的话同步执行钩子,否则提交到EventLoop执行钩子。
3、注册Channel
initAndRegister方法的最后一步,是将初始化好的channel实例,注册到JDK的selector上。
final ChannelFuture initAndRegister() {
// ...
// 3. 注册Channel
ChannelFuture regFuture = config().group().register(channel);
// ...
return regFuture;
}
复制代码
首先根据配置获取对应的EventLoopGroup,对于Server端EventLoopGroup实例是MultithreadEventLoopGroup。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
复制代码
首先next方法选择EventLoopGroup中的某个EventLoop。
接着调用SingleThreadEventLoop#register方法注册channel,这里会把channel封装到一个DefaultChannelPromise中调用另外一个重载register方法,注意到这里将EventLoop自身绑定到Promise中了(Netty串行化的秘诀就是把执行线程和实例绑定)。
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
复制代码
最终调用Unsafe的register方法,注册ChannelPromise。
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
这里调用的是AbstractChannel内部类AbstractUnsafe的register方法,AbstractUnsafe是Unsafe的骨架实现。这里做的最重要的事情有两点,一点是将EventLoop与当前Channel实例绑定,另一点就是register0真正的注册逻辑。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 不允许重复注册
if (isRegistered()) {
promise.setFailure(...);
return;
}
// 判断EventLoop是不是Unsafe支持的EventLoop,防止传入错误的EventLoop
if (!isCompatible(eventLoop)) {
promise.setFailure(...));
return;
}
// 【重要】绑定channel与eventLoop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 服务端启动往往是main线程,与EventLoop不是同一个线程,执行这里异步调用register0
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// ...
}
}
}
复制代码
NioEventLoop的抽象父类,SingleThreadEventExecutor将task放入队列返回,EventLoop后续再深入了解。
// 默认情况下immediate=true
private void execute(Runnable task, boolean immediate) {
// main != this.thread -> inEventLoop = false
boolean inEventLoop = inEventLoop();
// 将task加入任务队列
addTask(task);
if (!inEventLoop) {
// 如果当前线程的不是EventLoop绑定的线程,尝试开启这个EventLoop对应的线程
startThread();
if (isShutdown()) {
// ...
}
}
// 唤醒EventLoop
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
复制代码
看一下AbstractUnsafe#register0方法的实现。
private void register0(ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 注册到JDK的Selector上
doRegister();
neverRegistered = false;
registered = true;
// 触发init阶段添加的pendingHandlerCallbackHead任务,目的是执行ChannelHandler的channelAdded钩子
// 这里会触发ChannelInitializer的init方法
pipeline.invokeHandlerAddedIfNeeded();
// 调用promise的trySuccess方法,唤醒阻塞在future上的线程
safeSetSuccess(promise);
// 触发入栈channelRegistered事件
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 如果是首次注册,触发入栈channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 非首次注册,关注selector上的READ事件
beginRead();
}
}
}
复制代码
首先doRegister会注册Channel到JDK的Selector上,doRegister是抽象方法,由AbstractNioChannel实现。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
复制代码
接着,如果是首次注册,Pipeline#invokeHandlerAddedIfNeeded会触发之前init阶段缓存的handlerAdded钩子任务(见2-4何时触发channelAdded),也就是说这里会触发配置的ChannelInitializer的initChannel方法,将配置的ChannelHandler加入ChannelPipeline。
执行完上面一步后,设置Promise结果为成功,唤醒阻塞在Future上的线程并触发所有Listener。
最后触发入栈channelRegistered事件,如果首次注册还会触发入栈channelActive事件,非首次注册是将channel关注事件设置为READ。
其实channelActive事件最终也是为了设置关注事件为READ。这里Pipeline会将入栈channelActive事件从HeadContext开始传播。
final AbstractChannelHandlerContext head;
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
复制代码
HeadContext传播完channelActive事件后,会走readIfIsAUtoRead方法,默认情况下会直接触发channel的read方法。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
复制代码
read事件是为了设置SelectionKey关注的事件为READ(ACCEPT),先看看read出栈事件的传播链路。
AbstractChannel又会把read出栈事件放到Pipeline中传播。
@Override
public Channel read() {
pipeline.read();
return this;
}
复制代码
DefaultChannelPipeline将read事件交给TailContext传播。
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
复制代码
这个read事件从tail一直传播到head,head调用unsafe的beginRead方法修改关注事件为READ。
// HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
复制代码
最终调用AbstractNioChannel的doBeginRead方法,对于NioServerSocketChannel来说,这里并不是关注READ事件,而是关注ACCEPT事件。将关注事件注册到JDK的selectionKey上。
// 构造Channel时传入关注事件 = OP_ACCEPT = 1 << 4
protected final int readInterestOp;
@Override
protected void doBeginRead() throws Exception {
// ...
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
复制代码
4、绑定端口
回到AbstractBootstrap#doBind方法。此时initAndRegister可能仍然在执行register方法(EventLoop线程在执行)也可能register方法已经执行完了,会同步或异步地调用doBind0方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 创建并注册Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 2. 绑定端口
if (regFuture.isDone()) {
// register方法已经执行完了,同步执行doBind0
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// register方法没执行完,异步执行doBind0
// (虽然addListener还会校验isDone,可能也是一个同步操作,为了方便理解就认为是异步吧)
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
复制代码
doBind0基于串行化设计,仍然使用EventLoop线程执行Channel的bind方法。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
复制代码
看一下Bind出栈事件的出栈链路,和read出栈事件一样。
AbstractChannel#bind调用channel对应的pipeline传播bind出栈事件。
private final DefaultChannelPipeline pipeline;
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
复制代码
DefaultChannelPipeline直接触发TailContext的bind方法。
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
复制代码
这里实际执行的是抽象AbstractChannelHandlerContext的bind方法。
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 找到下一个关注bind事件的Handler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
// 串行化执行invokeBind方法
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
}
复制代码
bind事件会从TailContext一直传播到HeadContext。比如EchoServer案例中,我们加入了两个Handler,一个是LoggingHandler负责打印日志,一个是Netty服务端自带的ServerBootstrapAcceptor负责接入客户端Channel。
LoggingHandler关注bind事件,所以会被执行。
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
// 传播给下一个Context
ctx.bind(localAddress, promise);
}
复制代码
LoggingHandler最后又调用了Context.bind,那么bind出栈事件会继续向下传播。虽然下一个Handler是ServerBootstrapAcceptor,但是由于ServerBootstrapAcceptor是个纯粹的InboundHandler,所以并不会被实际执行。
对于EchoServer来说,真正的下一个Handler是HeadContext。而HeadContext就是操作Unsafe进行底层IO操作。
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
复制代码
AbstractChannel.AbstractUnsafe调用子类NioServerSocketChannel的doBind方法,执行JDK的Channel绑定端口。
// AbstractChannel.AbstractUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
// 调用子类doBind
doBind(localAddress);
if (!wasActive && isActive()) {
// 如果之前initAndRegister没触发过channelActive,这里触发一下
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
// NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
}
}
复制代码
三、客户端启动
客户端启动的流程与服务端大致一样:
- 创建Channel
- 初始化Channel
- 注册Channel
- 连接服务端connect,传播connect出栈事件
重点关注几个点:
- 创建的Channel类型不同,NioSocketChannel的构造方法
- Channel关注的事件不同,服务端是ACCEPT,客户端是READ
- 初始化Channel时,走的是BootStrap的init方法
- connect出栈事件传播
Bootstrap#doResolveAndConnect是客户端启动的入口。
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 1. 创建、初始化、注册Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 2. connect
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// ... 通过注册Listener的方式,异步处理,忽略
}
}
复制代码
1、创建Channel
NioSocketChannel的继承体系与NioServerSocketChannel大致一样,不同点是直接父类AbstractNioByteChannel。
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
// 1. 创建JDK的SocketChannel
private static SocketChannel newSocket(SelectorProvider provider) {
return provider.openSocketChannel();
}
// 2. 调用父类构造
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
复制代码
AbstractNioByteChannel关注的事件是READ,不同于AbstractNioMessageChannel是ACCEPT。
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
复制代码
2、初始化Channel
BootStrap的init方法如下,相对于ServerBootStrap而言比较简单,没有增加特殊Handler也没有涉及childGroup配置。重点在ChannelPipeline的addLast方法,这个在Server端启动已经看过了。
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
// 将配置的handler加入ChannelPipeline
p.addLast(config.handler());
// 父类方法设置Channel的option
setChannelOptions(channel, newOptionsArray(), logger);
// 父类方法设置Channel的attr
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
复制代码
3、注册Channel
客户端注册NioSocketChannel到Selector上的流程与服务端NioServerSocketChannel一致。
注册完成后会触发channelAdded钩子,触发配置的ChannelInitializer的initChannel方法。
4、connect
Bootstrap最终调用doConnect,执行Channel的connect方法,Channel是ChannelOutboundInvoker可以触发出栈事件。
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// EventLoop串行化
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
复制代码
接着调用NioSocketChannel父类AbstractChannel的connect方法,这里直接调用DefaultChannelPipeline的connect方法,DefaultChannelPipeline也是一个ChannelOutboundInvoker可以触发出栈事件。
// AbstractChannel
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
复制代码
Pipeline调用TailContext触发connect出栈事件,TailContext继承AbstractChannelHandlerContext也是一个ChannelOutboundInvoker。
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
复制代码
TailContext的抽象父类AbstractChannelHandlerContext传播connect事件给下一个Handler,一直传播到HeadContext这个ChannelOutboundHandler。
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 找到下一个关注connect事件的handler上下文执行
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
// ...
}
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}
复制代码
HeadContext调用Unsafe执行实际的connect操作。
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
复制代码
AbstractNioChannel.AbstractNioUnsafe#connect:
- 调用SocketChannel执行非阻塞connect
- 通过提交一个延迟任务到EventLoop解决连接超时问题
- 给Future新增Listener,处理连接完成后的异步回调
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
// 1. SocketChannel.connect,如果非阻塞这里返回false
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// 2. 处理连接超时,通过提交一个延迟任务到EventLoop中,默认超时时间30s
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// 3. 添加一个Listener,当非阻塞的SocketChannel连接完成时,收到回调
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
}
复制代码
NioSocketChannel#doConnect,执行JDK的SocketChannel的connect方法,AbstractNioChannel的构造方法在Server端启动看到过,一定会设置Channel非阻塞,所以这里会将Channel关注事件设置为CONNECT。
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// ...
// 执行JDK的SocketChannel的connect方法
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
// 如果SocketChannel非阻塞,这里返回false,设置关注事件为CONNECT
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
return connected;
}
复制代码
这里不去关注EventLoop如何轮询Selector.select,关注CONNECT事件发生后的回调,入口是NioUnsafe#finishConnect,实现是AbstractNioUnsafe#finishConnect。
public final void finishConnect() {
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
// 1. 调用JDK的SocketChannel.finishConnect
doFinishConnect();
// 2. 设置Promise结果,触发ChannelActive入栈事件
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// 3. 取消之前提交到EventLoop的连接超时检测任务
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
复制代码
总结
- Future是面向方法调用方的,用来获取异步执行结果;Promise是面向方法执行方的,用来设置任务执行结果。
- 服务端启动:创建Channel、初始化Channel、注册Channel、bind。
- 客户端启动:创建Channel、初始化Channel、注册Channel、connect。
- Channel注册到Selector上后,触发ChannelAdded事件,执行ChannelInitializer的initChannel方法。
- 一个ChannelHandler封装为一个ChannelHandlerContext放入ChannelPipeline。一个ChannelPipeline组装了ChannelHandlerContext形成的双向链表,头节点是HeadContext,尾节点是TailContext。出栈事件从TailContext开始传播到HeadContext,入栈事件从HeadContext传播到TailContext。
-
出栈事件的传播方式,案例:服务端注册Channel后触发的read出栈事件(设置关注事件为ACCEPT)、服务端bind出栈事件、客户端connect出栈事件。出栈事件是通过ChannelOutboundInvoker->ChannelOutboundHandler->Unsafe的方式传播的。
其中涉及的关键类如下:Channel(ChannelOutboundInvoker)->Pipeline(ChannelOutboundInvoker)->TailContext(ChannelOutboundInvoker)->ChannelOutboundHandlers->HeadContext(ChannelOutboundHandler) ->Unsafe。
-
ChannelActive触发时机:服务端bind完成后,客户端connect完成后。
-
ChannelInitializer的作用:当服务端NioServerSocketChannel/客户端NioSocketChannel注册完成后,触发initChannel方法加入业务ChannelHandler。