更多,请关注:liangye-xo.xyz/
解读服务端启动流程
先来个 demo:
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 服务端启动器
ServerBootstrap b = new ServerBootstrap();
// bossGroup - ServerChannel会去使用
// workerGroup - 就有 ServerChannel产生的客户端 Channel会去使用
b.group(bossGroup, workerGroup)
// 设置服务端 Channel类型, 内部创建出来一个反射工厂 ReflectiveChannelFactory, 其提供了一个 newInstance方法用于创建 Channel实例
.channel(NioServerSocketChannel.class)
// 保存 Server端的自定义选项
.option(ChannelOption.SO_BACKLOG, 100)
// 配置用户自定义的 Server端 pipeline处理器, 后续创建出来 NioServerChannel实例后, 会去将用户自定义的 handler添加到其 pipeline中去
.handler(new LoggingHandler(LogLevel.INFO))
// 配置服务端上连接进来的客户端, 设置客户端 Channel内部的 Pipeline初始信息
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 获取服务端 pipeline
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 以上步骤并没有创建出任何的 Channel, 只是将后续创建 Channel的配置信息保存到 ServerBootstrap中去而已
// Start the server.
// b.bind(port) 这里实际上返回来的是与 bind操作相关的 promise - 当 register0()对应操作结束时, 实现 channel注册与 bind的对接
// promise.sync() - 阻塞当前主线程直到绑定相关操作执行完毕, 才会被唤醒
ChannelFuture f = b.bind(PORT).sync();
/** 在这一步进行一下调试, 会发现 f.isSuccess() == true **/
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
doBind
// 真正的去 bind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
// regFuture其实就是 register()返回来的 promise - 关联异步任务 register0() - Channel相关的 EventLoop会去执行
final ChannelFuture regFuture = initAndRegister();
//以服务端启动来讲, 这里是 NioServerSocketChannel
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 判断 Channel是否已经注册完毕
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 来分析这个分支
// Registration future is almost always fulfilled already, but just in case it's not.
// 这一步, 主线程来创建了一个 promise - 其实就是来与注册任务执行结果对接起来
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 这里又来实现异步操作了
// 为 register0对应的 promise添加监听器(其实, 说是 "回调对象更为恰当一些"), 实现任务完后的回调 - EventLoop会去执行
regFuture.addListener(new ChannelFutureListener() {
// 可以看到, 此回调方法中主要来处理可注册任务失败与否对应的逻辑处理
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 获取
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause); // 表示注册失败
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered(); // 表示注册成功
doBind0(regFuture, channel, localAddress, promise);
}
}
});
// 这里来返回了与 bind操作相关的 promise
return promise;
}
}
复制代码
bind之 initAndRegister
// 顾名思义, 进行 Channel的初始化及其注册
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// channelFactory - ReflectiveChannelFactory
// 以服务端启动来将, 这里就是通过反射来创建 NioServerSocketChannel实例(调用无参构造)
// 以客户端连接来说, 这里通过反射创建的便是 NioSocketChannel实例
// 对于服务端主要干的事:设置 Channel非阻塞、内部创建出来了 pipeline(head、tail)
// 以及设置 Channel感兴趣事件:Accept,创建出 Unsafe实例,类型为 NioMessageUnsafe
channel = channelFactory.newChannel(); // 这里来获取的是 Netty层面的 Channel - 实际上是 Jdk层面的 Channel的封装
// 这一步主要来给当前服务端 Chanel的 pipeline添加了 ChannelInitializer
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// config包含着 ServerBootstrap
// group()这里实际上获取的是 ServerBootstrap中的 bossGroup - NioEventLoopGroup
// 由此可知, 这里最终来调用的是 NioEventLoopGroup.register(channel)
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
复制代码
可以看到,代码中有去调用了 channelFactory.newChannel(),这里的 channelFactory是 ReflectiveChannelFactory类型,在 channel方法中进行了指定
Channel创建
# ReflectiveChannelFactory.newChannel()
@Override
public T newChannel() {
try {
// 通过反射来去创建 clazz的实例
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
复制代码
可以看到,这里通过反射进行了实例的构造,而目前是服务器启动流程,结合 channel方法,不难想出这里实际上调用的是 无参的 NioServerSocketChannel构造
public NioServerSocketChannel() {
// newSocket - 创建出 jdk层面的 ServerSocketChannel
// Netty需要对 Jdk层面的 Channel进行封装成 Netty层面的 Channel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 这里来创建 jdk层面的 ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
// 这里来通过 jdk层面的 SelectorProvider创建出 jdk层面的 ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
复制代码
我们知道,通过 SelectorProvidor.openServerSocketChannel()来创建出来的实际上是 Jdk层面的 ServerSockerChannel,而这是 NioServerSocketChannel的构造,不难得出结论:Netty的 NioServerSocketChannel实际上就是对 Nio的 ServerSockerChannel的装饰增强
继续看 this的调用
public NioServerSocketChannel(ServerSocketChannel channel) {
// 参数一:parent - null
// 参数二:jdk层面的 ServerSocketChannel
// 参数三:感兴趣的事件 Accept - 服务端, 当前服务端 Channel最终是要被注册到 Selector上去, 因此需要此信息
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// AbstractNioMessageChannel是NioServerSocketChannel的父类
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 参数一:parent - null
// 参数二:jdk层面的 ServerSocketChannel
// 参数三:感兴趣的事件 Accept - 服务端, 当前服务端 Channel最终是要被注册到 Selector上去, 因此需要此信息
super(parent, ch, readInterestOp);
}
/** AbstractNioChannel是AbstractNioMessageChannel的父类 **/
/** 服务端类型 **/
// 参数一:parent - null
// 参数二:jdk层面的 ServerSocketChannel
// 参数三:感兴趣的事件 Accept - 服务端, 当前服务端 Channel最终是要被注册到 Selector上去, 因此需要此信息
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 注意, 这里只是来进行将 Acctpt事件对应值赋值到变量上而已, 还没有进行真正的注册!
this.readInterestOp = readInterestOp;
try {
// 配置当前 Channel为非阻塞状态 - 这也是使用多路复用选择器 selector的前提
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
复制代码
可以看到,上述来设置 channel状态为非阻塞,这不就是来实现 多路复用的前提吗?
即,NioServerSocketChannel构造中已设置自身为了非阻塞状态
调用 super,来到 AbstractChannel构造中:
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 为每个 Channel创建 ChannelId对象 - Netty层面
id = newId();
// 对于服务端而言, 这里是:NioMessageUnsafe实例
unsafe = newUnsafe();
// 创建出来当前 Channel内部的 pipeline
// 创建出来的 pipeline内部有两个默认的处理器:HeadContext、TailContext
// Head <---> Tail
pipeline = newChannelPipeline();
}
复制代码
可以看到,在创建 Channel的过程中,将 pipeline进行了初始化:newChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 可以看到, 创建出来的 pipeline不是空的, 里面默认有两个处理器:head、tail
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
可以看到,pipeline默认初始化时附带了两个 handler:Head、Tail
其实是:Head <—> Tail的关系
此时,已经完成了 Channel的创建,接下来进行 Channel的初始化:init()
Channel初始化
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
// 获取服务端的 pipeline
ChannelPipeline p = channel.pipeline();
// workerGroup
final EventLoopGroup currentChildGroup = childGroup;
/** 以下便是来获取为客户端配置的一些属性参数等 **/
// ChannelInitializer
final ChannelHandler currentChildHandler = childHandler;
// 获取 childOption设置的参数
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
// 获取 childAttrs设置的参数
// Netty的 Channel都是实现了 AttributeMap接口的, 因此可以在启动类中配置一些自定义数据, 这样创建出来的 Channel实例, 就会包含这些数据了
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
// 有这步, 可以看出服务端 pipeline:head <-> ChannelInitializer <-> tail
// 简单理解:ChannelInitializer就是时压缩包, 当 Channel被激活时, 该压缩包就会被解压, 解压后进行移除
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
// 获取服务端 pipeline
final ChannelPipeline pipeline = ch.pipeline();
// 获取在服务端配置的 handler - 这里指的便是 LoggingHandler
ChannelHandler handler = config.handler();
// 将 handler添加到服务端的 pipeline中去
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
复制代码
简单来看,init()主要干了一件事:为 ServerSocketChannel的 pipeline中添加了一个特殊的 handler:ChannelInitializer,可以把它理解为是一个特殊的 handler,即 压缩包
// ChannelInitialize本身不是 handler, 而是通过了 handlerAdapter去适配了 handler
// 其存在意义:延迟初始化 pipeline, 什么时候进行初始化 ? 当 pipeline上的 Channel被激活时, 真正添加 handler的逻辑才会去执行
// 简单理解:ChannelInitializer就是时压缩包, 当 Channel被激活时, 该压缩包就会被解压, 解压后进行移除
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
复制代码
Channel注册
// config包含着 ServerBootstrap
// group()这里实际上获取的是 ServerBootstrap中的 bossGroup - NioEventLoopGroup
// 由此可知, 这里最终来调用的是 NioEventLoopGroup.register(channel)
ChannelFuture regFuture = config().group().register(channel);
复制代码
# SingleThreadEventLoop.register
// 返回值 ChannelFuture, 可知 register是可以来实现异步操作的
@Override
public ChannelFuture register(Channel channel) {
// 可以看到, 这里对 channel进行了些许封装 - 赋予 Channel可执行异步操作的能力(监听器 - 关联事件完成后, 自动去执行回调)
return register(new DefaultChannelPromise(channel, this));
}
复制代码
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 对于服务端:
// promise.channel() - NioServerSockerChanel
// promise.channel().unsafe() - NioMessageUnsafe - 这块调试一下就能知道结果了
// 参数一:NioEventLoop 参数二:以服务端启动分析, 这块包含的便是 NioServerSocketChannel的 ChannelPromise
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
接着来到,AbstractChannel内部类 AbstractUnsafe.register()
// 参数一:NioEventLoop 参数二:以服务端启动分析, 这块包含的便是 NioServerSocketChannel的 ChannelPromise
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
// 判断当前 Channel是否有注册过
if (isRegistered()) {
// 通过 promise设置失败结果, 并且会去回调监听者, 执行失败的逻辑
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 因为当前方法是在 Unsafe类中, 因此这里通过 AbstractChannel.this去获取外部对象, 这里我们来考虑是:NioServerSocketChannel的场景
// 这里就是来绑定下关系 (Channel & NioEventLoop), 后续 Channel上的事件或者任务都会依赖当前 EventLoop
AbstractChannel.this.eventLoop = eventLoop;
// 这里来判读当前线程是不是与 EventLoop绑定的线程
// 这块通常不会成立, 以服务端启动分析, 这里线程便是主线程, 非 EventLoop中所绑定线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else { // 这块设计十分巧妙, 如果当前线程不是与 EventLoop绑定的线程, 那么就将注册的任务转交给 EventLoop去执行, 添加到 EventLoop的任务队列中去
// 这也就实现了单线程下线程安全, 且也实现了主线程执行的非阻塞
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise); // 以服务端来讲, 这里的 promise包含着 NioServerSocketChannel
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
复制代码
由上述不难看出,Channel的注册任务并非是由主线程去完成的,而是由 Channel所绑定的 EventLoop去完成,即将 Channel注册作为一个任务提交到 EventLoop任务队列中去
不得不说,这块设计比较巧妙,一方面保证多线程下单线程执行的线程安全,另一方面实现了 Channel注册操作的异步化,即不阻塞 Main线程
注册操作结果如何得知?通过 NioServerSocketChannel.bind()返回了 ChannelFuture,这里实际上就是 register操作对应的 ChannelPromise,调用 isSuccess()就能够知道对应操作的执行结果了,这不就实现了异步操作吗?
至此,bind之 initAndRegister解读完毕!
解读 eventLoop中线程创建时机
经过前述分析,在 NioServerSocketChannel创建时,完成了对其内部 ChannelPipeline的创建,并且在 init()方法中对 ChannelPipeline进行了初始化,即其默认会有两个处理器:HeadContext、TailContext
但是,我们并没有看到 NioServerSocketChannel相关的 EventLoop中线程的创建,而 Channel与 EventLoop(BossGroup所管理的)的绑定是在 register()方法中的,那么 thread的创建究竟是在什么时候?
其实是在第一次往 EventLoop提交异步任务时,会去创建出线程
以服务端启动流程来将,第一次提交的任务:
# AbstractChannel.register()
...
// 这里来判读当前线程是不是与 EventLoop绑定的线程
// 这块通常不会成立, 以服务端启动分析, 这里线程便是主线程, 非 EventLoop中所绑定线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else { // 这块设计十分巧妙, 如果当前线程不是与 EventLoop绑定的线程, 那么就将注册的任务转交给 EventLoop去执行, 添加到 EventLoop的任务队列中去
// 这也就实现了单线程下线程安全, 且也实现了主线程执行的非阻塞
try {
// 须清楚一点, 到这里为止, eventLoop对应的线程尚未被创建, 线程的创建时机是怎样的 ?
// 当往 eventLoop中第一次提交任务时, 会去创建线程!
// 提交了异步任务 ①
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise); // 以服务端来讲, 这里的 promise包含着 NioServerSocketChannel
}
});
}
复制代码
显然,由此此时 EventLoop线程没有创建,此时执行的线程定是 Main线程
来看看 execute()方法中主要干了啥
SingleThreadEventExecutor.execute
创建 NioEventLoopGroup时,其内 child数组实例化时对应的元素便是 SingleThreadEventExecutor实例
// 以服务端启动分析, 这里的 task便是服务端 channel的注册任务
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
// 以服务端启动分析, 这里的 task便是服务端 channel的注册任务
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task); // 往任务队列中新增任务
// 第一次这块会是 true, 即 Main线程来执行这块逻辑
if (!inEventLoop) {
startThread(); // 关键
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
复制代码
startThread()方法是关键
private void startThread() {
// 由此来保证 eventLoop对应线程不会重复被创建 - 当然第一次这能走得通
if (state == ST_NOT_STARTED) {
// cas设置 state
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread(); // 第一次走到这时, 在此方法中会来完成 eventLoop对应线程的创建
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
复制代码
可以看到,start == ST_NOT_STARTED也就保证了该方法只会执行一次
doStartThread(),我们主要来看关键代码:
private void doStartThread() {
assert thread == null;
// executor这个是关键:在 Group构造时分析过了, 这里的 executor类型是 ThreadPerTaskExecutor,
// 其内部包含着线程工厂(DefaultThreadFactory类型), 以此来进行线程的创建.
executor.execute(new Runnable() {
// 这里设计得挺好的, 通过往 executor中提交任务, 提交的任务会去创建出一个线程来执行
@Override
public void run() {
// 将当前创建出来的线程赋予到 thread上, 依赖完成对 EventLoop中线程的绑定与创建
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
...
复制代码
可以看到,这一步来调用了 executor.execute()
executor是什么?是 ThreadPerTaskExecutor实例,详情可以去看 NioEventLoopGroup创建时对其内部管理的 NioEventLoop的创建
executor其内部属性 ThreadFactory,会去真正进行 thread的创建
# ThreadPerTaskExecutor
// 可以看到, 每提交一个任务会去创建一个线程并启动它
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
复制代码
可以看到,这里来完成了对线程的创建,并且在执行任务时完成了对 EventLoop中 thread的绑定
至此,我们知道了 eventLoop中线程的创建时机,便是在第一次往 eventLoop中添加异步任务时会去创建,当然,对应线程只会被创建一次
再加点彩蛋,来看看线程创建时会去干啥吧!
# DefaultThreadFactory.newThread
@Override
public Thread newThread(Runnable r) {
// 创建出线程, 并为线程指定名称 - 类似 nioEventLoopGroup-2-1
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
复制代码
这一步完成了对所创建线程的一个命名,如我们所常见的:nioEventLoopGroup-2-1
至于 prefix的属性值的赋予,其实也是在 Group的构造时被赋值了的
至此,我们已经清楚了 EventLoop中线程的创建时机
解读 ChannelInitializer
在 ServerBootstrap.init()中去添加了 ChannelInitializer
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
// 获取服务端的 pipeline
ChannelPipeline p = channel.pipeline();
// workerGroup
final EventLoopGroup currentChildGroup = childGroup;
/** 以下便是来获取为客户端配置的一些属性参数等 **/
// ChannelInitializer - 这个是为客户端所配置的
final ChannelHandler currentChildHandler = childHandler;
// 获取 childOption设置的参数
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
// 获取 childAttrs设置的参数
// Netty的 Channel都是实现了 AttributeMap接口的, 因此可以在启动类中配置一些自定义数据, 这样创建出来的 Channel实例, 就会包含这些数据了
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
// 由这步, 可以看出服务端 pipeline:head <-> ChannelInitializer <-> tail
// 简单理解:ChannelInitializer就是压缩包, 当 Channel被激活时, 该压缩包就会被解压, 解压后进行移除
p.addLast(new ChannelInitializer<Channel>() {
// 以服务端启动分析可知, 这里参数传递过来的实际上是 NioServerSocketChannel
@Override
public void initChannel(final Channel ch) {
// 获取服务端 pipeline
final ChannelPipeline pipeline = ch.pipeline();
// 获取在服务端配置的 handler - 这里指的便是 LoggingHandler
ChannelHandler handler = config.handler();
// 将 handler添加到服务端的 pipeline中去
if (handler != null) {
pipeline.addLast(handler);
}
// 可以看到, 这里来提交了一个异步任务 ②
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 这里来往 NioServerSocketChannel的 pipeline中新增了一个处理器 ServerBootstrapAcceptor
// 这个处理器十分特殊, 实际上是来执行客户端连接的一个处理器
// 此时 pipeline:head <-> LoggingHandler <-> ServerBootstrapAcceptor <-> tail
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
复制代码
怎么去理解 ChannelInitializer呢?
简单理解,ChannelInitializer是一个 压缩包,在一个必要的实际会去进行 解压,然后将自己进行移除
由上可以看出,再其解压时,根据前面的 demo分析可知,其往 pipeline中添加了两个处理器:LoggingHandler、ServerBootstrapAcceptor
那么这个必要的时机是什么?什么时候会去进行 ChannelInitializer的解压操作呢?
需清楚一点,此时 pipeline中:head <-> ChannelInitializer <-> tail
在正式分析解压时机之前,先来看看 addLast()中主要来干了什么
// 对于 ChannelInitializer的 addLast是由 Main线程去执行的
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 这一步将 ChannelInitializer封装成了 ChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) { // 对于 ChannelInitializer, 此时 channel尚未注册
newCtx.setAddPending(); // 设置 handler状态为已经入队状态
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
// 不难看出, 所谓的 addLast其实是去将 handler(可以看到, pipeline中存储着的实际上是封装后的 handler)添加到 tail的前面
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
复制代码
这里实际上还涉及了一个关键方法:callHandlerCakkbackLater
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
// 这一步将 ChannelInitializer封装后的 ChannelHandlerContext再进行了一次封装 - PendingHandlerAddedTask
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
// 这里来执行的便是单向链表的入队操作
if (pending == null) {
pendingHandlerCallbackHead = task; // 可以看到, 这一步 pendingHandlerCallbackHead中存储的最里层所封装着的实际上就是 ChannelInitializer
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
复制代码
由上可以看出,ChannelInitializer被封装成了 ChannelHandlerContext,然后添加到了 pipeline中,即所谓的 addLast()其实就是去将 handler添加到 tailContext的前面
并且,这一步将 ChannelInitializer封装后的 ChannelHandlerContext再进行了一次封装 – PendingHandlerAddedTask
有这其实也可以看出,pipeline中并不是直接去加入了 handler本身,加入的是对 handler封装后的 ChannelHandlerContext,其本身代表着便是 Handler与 Pipeline连接的一个关系
现在开始分析解压时机
其实解压时机便是,在 Channel对应的 EventLoop线程被创建之后,从任务队列中执行的第一个任务中会触发 ChannelInitializer的解压
那么第一个任务是什么时候被提交的呢?
在 AbstractChannel.register()中
// 参数一:NioEventLoop 参数二:以服务端启动分析, 这块包含的便是 NioServerSocketChannel的 ChannelPromise
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
// 判断当前 Channel是否有注册过
if (isRegistered()) {
// 通过 promise设置失败结果, 并且会去回调监听者, 执行失败的逻辑
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 因为当前方法是在 Unsafe类中, 因此这里通过 AbstractChannel.this去获取外部对象, 这里我们来考虑是:NioServerSocketChannel的场景
// 这里就是来绑定下关系 (Channel & NioEventLoop), 后续 Channel上的事件或者任务都会依赖当前 EventLoop
AbstractChannel.this.eventLoop = eventLoop;
// 这里来判读当前线程是不是与 EventLoop绑定的线程
// 这块通常不会成立, 以服务端启动分析, 这里线程便是主线程, 非 EventLoop中所绑定线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else { // 这块设计十分巧妙, 如果当前线程不是与 EventLoop绑定的线程, 那么就将注册的任务转交给 EventLoop去执行, 添加到 EventLoop的任务队列中去
// 这也就实现了单线程下线程安全, 且也实现了主线程执行的非阻塞
try {
// 须清楚一点, 到这里为止, eventLoop对应的线程尚未被创建, 线程的创建时机是怎样的 ?
// 当往 eventLoop中第一次提交任务时, 会去创建线程!
// 提交了异步任务 ①
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise); // 以服务端来讲, 这里的 promise包含着 NioServerSocketChannel
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
复制代码
register0()方法实际上执行者是 EventLoop对应的线程
// 须清楚一点;该方法的执行是由 EventLoop来进行的, 与 Main线程无任何关系
// 参数 promise表示注册结果的, 外部可以向它注册监听器, 以来完成注册后的逻辑
// 这块也好理解, 只有注册成功后才能去执行 bind操作嘛. -即退出该方法后会去执行一个回调, 以此来执行 bind0()
// 现在是来执行第一个任务, 接来下是 init()里添加的另一个异步任务,最后一个才会是 bind0()里提交的任务 ③
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 表示当前 Channel是不是第一次进行注册
boolean firstRegistration = neverRegistered;
doRegister(); // 关键, 在此方法中完成了 channel的注册逻辑, 即将 channel注册到 selector上
// 这里就是来设置下注册相关的状态位
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// invokeHandlerAddIfNeeded来完成对 ChannelInitializer的解压缩, 并且将自己移除了出去
pipeline.invokeHandlerAddedIfNeeded();// invokeHandlerAddIfNeeded中实际上会去添加异步任务 ②
// 这一步会去回调注册相关的 promise上注册的那些 listener, 如 "主线程"在 regFuture上注册的监听者
safeSetSuccess(promise); // 这里回调到 doBind0()方法时又会去添加了一个异步任务 ③
// 这里传递了一个入站事件, 从 pipeline头开始传递, 此时关注的 handler可以来做一些事情
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// 以服务端启动流程来进行分析:
// 这一步实际上是来判断当前 NioServerSocketChannel是否有完成绑定
// false - 当前是在 register0中, 只来完成了底层 ServerSocketChannel的注册而已
// 这块也好理解, 只有注册成功后才能去执行 bind操作嘛. -即退出该方法后会去执行一个回调, 以此来执行 bind0()
// bind()实际上也是由 eventLoop线程去干的是事
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
复制代码
关键方法,invokeHandlerAddedIfNeeded()
invokeHandlerAddIfNeeded来完成对 ChannelInitializer的解压缩, 并且将自己移除了出去
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
// 需清楚, 此方法的执行是由 EventLoop线程来做的
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
// 这里就是来获取存储 task的单向链表 - 以服务端启动分析, 此时获取的便是对 ChannelInitializer最外层包装后的 task
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// 遍历单向链表, 执行结果
while (task != null) {
task.execute();
task = task.next;
}
}
复制代码
task.execute()执行的是什么,经过前面分析,我们知道实际上是:PenddingHandlerAddedTask.execute(),其最里层所封装的就是 ChannelInitializer
@Override
void execute() {
// 这一步, 来获取到的便是 EventLoop
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx); // 会来执行此处
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
复制代码
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded(); // 执行这
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
if (setAddComplete()) {
// 以服务端启动视角来看, handler()来获取的便是 ChannelInitializer
handler().handlerAdded(this);
}
}
复制代码
可以看到,接着来执行的便是:ChannelInitializer.handlerAdded()方法
# ChannelInitializer.handlerAdded()
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
// 可以看到, 此处来执行了 initChannel
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
复制代码
关键来了,执行了 initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// initChannel这一步真正实现了对接!
initChannel((C) ctx.channel()); // 这一步, 以服务端启动来分析, 来执行的便是我们在 ServerBootStrap添加的 ChannelInitializer里对应的回调方法 initChannel
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// 在 try方法中已经完成了对 ChannelInitializer的解压, 现在 结构:head <-> ChannelInitializer <-> LoggingHandler <-> Tail
// 获取 pipeline
ChannelPipeline pipeline = ctx.pipeline();
// 显然当前 pipeline中存在 ChannelInitializer的上下对象的 ChannelHandlerContext
if (pipeline.context(this) != null) {
pipeline.remove(this); // 可以看到, 这一步将 ChannelInitializer自身从 pipeline中移除出去了
}
}
return true;
}
return false;
}
复制代码
由上,便真正实现了真正的对接!initChannel()会去执行 ServerBootstrap.init()中所添加的 ChannelInitializer对应的 initChannel(),即这一步便实现了对 ChannelInitializer解压
并且解压后,执行了 pipeline.remove(this),this此时便是 ChannelInitializer,这又将 ChannelInitializer从 NioServerSocketChannel对应的 pipeline中移除了出去
至此,ChannelInitializer的解读完毕!