这是我参与更文挑战的第26天,活动详情查看:更文挑战
一、前言
对应,简单 server
的 demo
,如下:
public class NettyServer {
public static void main(String[] args) {
// 线程组: Acceptor线程
EventLoopGroup parentGroup = new NioEventLoopGroup();
// 线程组: Processor / Handler
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
// 相当于Netty的服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(parentGroup, childGroup)
// 监听端口的ServerSocketChannel
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// 处理每个连接的SocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
// 针对网络请求的处理逻辑
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
// 同步等待启动服务器监控端口
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
// 同步等待关闭启动服务器的结果
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
复制代码
具体服务端创建,流程如下:
二、直接怼源码
ServerBootstrap
中 bind()
方法实现:
// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// 1. 创建 socket
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// 2. 校验参数
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
// 3. 真正执行,重要:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并绑定
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 若有异常则返回
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 如果没有注册成功:
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 添加对应的监听器
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
}
复制代码
doBind()
方法具体做了哪些事情:
-
调用
initAndRegister()
初始化并注册Channel
,同时返回一个ChannelFuture
实例regFuture
,所以猜测出initAndRegister()
是一个异步的过程。 -
通过
regFuture.cause()
方法判断initAndRegister()
的过程是否发生异常,如果发生异常则直接返回。 -
regFuture.isDone()
表示initAndRegister()
是否执行完毕,如果执行完毕则调用doBind0()
进行Socket
绑定。
如果
initAndRegister()
还没有执行结束,regFuture
会添加一个ChannelFutureListener
回调监听,当initAndRegister()
执行结束后会调用operationComplete()
,同样通过doBind0()
进行端口绑定。
服务端启动的全流程可分为如下四步:
-
创建服务端
Channel
:本质是创建JDK
底层原生的Channel
,并初始化几个重要的属性,包括id
、unsafe
、pipeline
等。 -
初始化服务端
Channel
:设置Socket
参数以及用户自定义属性,并添加两个特殊的处理器ChannelInitializer
和ServerBootstrapAcceptor
。 -
注册服务端
Channel
:调用JDK
底层将Channel
注册到Selector
上。 -
端口绑定:调用
JDK
底层进行端口绑定,并触发channelActive
事件,把OP_ACCEPT
事件注册到Channel
的事件集合中。
(1)创建服务端 Channel
此过程,可分为 3 步:
- 反射创建实例:
ReflectiveChannelFactory
通过反射创建NioServerSocketChannel
实例 - 创建
SelectorProvider
- 设置参数:设置
Channel
为非阻塞模式,为Channel
创建id
、unsafe
、pipeline
三个重要的成员变量
开始点,从这入口:
// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// 方法名:初始化 ServerSocketChannel,注册到 Selector 多路复用轮询组件上
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 创建 ServerSocketChannel
// 并且配置 blocking 为 false
channel = channelFactory.newChannel();
// 2. 初始化一些配置:端口、网络参数
// 并将这个 channel 注册到 Selector 上,关注 OP_ACCEPT 网络事件,不断轮询
init(channel);
} catch (Throwable t) {
// ... ...
}
// 3. 把 ServerSocketChannel 注册到 EventLoopGroup 上
// 用 selector 轮询各种 channel 的网络事件
// 让 EventLoopGroup 中的独立线程采用一个 Selector 来注册 channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
}
复制代码
channel = channelFactory.newChannel();
这句其实对应原生 NIO
就是:
// 主要实现:
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
复制代码
1)反射创建实例
ReflectiveChannelFactory
通过反射创建NioServerSocketChannel
实例。
// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// 对应 demo 中的:.channel(NioServerSocketChannel.class)
// 监听端口的ServerSocketChannel
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
// 入口:创建反射工厂
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
}
复制代码
- 那么,直接进入
ReflectiveChannelFactory
// 定位:io.netty.channel
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
// ... ...
@Override
public T newChannel() {
try {
// 反射方式,对应:NioServerSocketChannel.class
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
复制代码
因为我们设置的 channel
是 NioServerSocketChannel
,所以下一步就是初始化这个。
- 初始化
NioServerSocketChannel
// 定位:io.netty.channel.socket.nio
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
// 1. SelectorProvider 会根据操作系统类型和版本的不同,返回不同的实现类
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER
= SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 2. 主要:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 就是通过原生的 NIO 的API,来创建一个 ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
// 3. 发现这块:设置感兴趣的事件,OP_ACCEPT
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
复制代码
2)创建 SelectorProvider
提示:SelectorProvider
是 JDK NIO
中的抽象类实现,通过 openServerSocketChannel()
方法可以用于创建服务端的 ServerSocketChannel
。
// 定位:java.nio.channels.spi
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
// 主要看这句:
provider = sun.nio.ch.DefaultSelectorProvider.create();
}
}
}
// 定位:sun.nio.ch
// 根据操作系统类型和版本的不同,返回不同的实现类
// 具体可以参考 DefaultSelectorProvider 的源码实现:
public class DefaultSelectorProvider {
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
}
复制代码
3)初始化配置
初始化时,会先初始化父类。
- 设置非阻塞
// 定位:io.netty.channel.nio
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 设置成 非阻塞
ch.configureBlocking(false);
}
// ... ...
}
}
复制代码
- 设置成员变量
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId(); // Channel 全局唯一 id
unsafe = newUnsafe(); // unsafe 操作底层读写
pipeline = newChannelPipeline(); // pipeline 负责业务处理器编排
}
}
复制代码
(2)初始化服务端 Channel
再次从这入手:
// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// 方法名:初始化 ServerSocketChannel,注册到 Selector 多路复用轮询组件上
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 创建 ServerSocketChannel
// ... ...
// 2. 初始化一些配置:端口、网络参数
// 并将这个 channel 注册到 Selector 上,关注 OP_ACCEPT 网络事件,不断轮询
init(channel);
} catch (Throwable t) {
// ... ...
}
// 3. 注册
// ... ...
return regFuture;
}
}
复制代码
这次主要看init(channel);
:
-
init()
是AbstractBootstrap
抽象方法,需要子类实现 -
因为是服务端(
server
),选择的是ServerBootstrap
中
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
// 1. 设置 Socket 参数
setChannelOptions(channel, options, logger);
}
// 2. 保存用户自定义属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
// 3. 添加特殊的 Handler 处理器
// 对网络请求链路中加入内置的一个处理逻辑
p.addLast(new ChannelInitializer<Channel>() {
// ... ...
});
}
}
复制代码
(3)注册服务端 Channel
入口:
// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// 方法名:初始化 ServerSocketChannel,注册到 Selector 多路复用轮询组件上
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 创建 ServerSocketChannel
// ... ...
// 2. 初始化一些配置:端口、网络参数
// ... ...
} catch (Throwable t) {
// ... ...
}
// 此时重要:
// 3. 把 ServerSocketChannel 注册到 EventLoopGroup 上
// 用 selector 轮询各种 channel 的网络事件
// 让 EventLoopGroup 中的独立线程采用一个 Selector 来注册 channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
}
复制代码
那么直接来看这行代码:ChannelFuture regFuture = config().group().register(channel);
:
- 因为使用
NioEventLoopGroup
,而此类又继承MultithreadEventLoopGroup
- 所以直接到
MultithreadEventLoopGroup
看register()
实现
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
// 选择一个 eventLoop 注册
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
复制代码
之后会走到 SingleThreadEventLoop
。
然后发现,会调用 AbstractChannel
的方法,如下:
// 定位:io.netty.channel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ... ...
// 重要:
register0(promise);
// ... ...
}
private void register0(ChannelPromise promise) {
try {
// ... ...
// 重要:
// 调用 JDK 底层的 register() 进行注册
doRegister();
// ... ...
} catch (Throwable t) {
// ... ...
}
}
}
复制代码
这时候,通过 doRegister()
去找子类的实现方法:
- 因为使用是
NioChannel
,所以自然找到AbstractNioChannel
代码如下:
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 调用 JDK 底层的 register() 进行注册
selectionKey = javaChannel()
.register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
}
复制代码
这个相对于原生 NIO
的方法就是:
channel.register(selector, SelectionKey.OP_CONNECT);
复制代码
(4)端口绑定
接下来继续看 ServerBootstrap
的 bind()
方法:
// 同步等待启动服务器监控端口
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
复制代码
进入源码,最后会调用 doBind()
:
// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) {
// ... ...
// 重要:
doBind0(regFuture, channel, localAddress, promise);
// ... ...
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}
复制代码
继续走:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
// 省略其他代码
boolean wasActive = isActive();
try {
// 调用 JDK 底层进行端口绑定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// ... ...
}
复制代码
最后,首先看下调用 JDK
底层进行端口绑定的 doBind()
方法:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
复制代码
相对于原生 NIO
,代码:
SocketChannel channel = SocketChannel.open();
channel.register(selector, SelectionKey.OP_CONNECT);
复制代码