Netty 源码解读
bind启动源码
我们的服务是通过下面代码进行绑定端口并启动的,所以我们从bind()方法开始分析
ChannelFuture f = b.bind(port).sync();
复制代码
一直跟进到io.netty.bootstrap.AbstractBootstrap#doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
//1 初始化和注册逻辑
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone())
ChannelPromise promise = channel.newPromise();
//2 端口绑定逻辑
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
复制代码
1 初始化和注册逻辑
继续跟进initAndRegister()方法,根绝下面的注释,你类比一下Java NIO的代码,你会不会有莫名的亲切感?
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//(1)创建ServerSocketChannel
channel = channelFactory.newChannel();
//初始化Channel,不展开说明了
init(channel);
} catch (Throwable t) {
if (channel != null)
channel.unsafe().closeForcibly();
}
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
//(2)注册selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
复制代码
(1)创建ServerSocketChannel
回看一下抛弃服务的代码
.channel(NioServerSocketChannel.class)
复制代码
这里就是把这个NioServerSocketChannel实例化,见io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
try {
//这里这个clazz指的就是NioServerSocketChannel.class,具体怎么传进来的,大家可以自己看代码,这里不介绍了。
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
复制代码
通过反射进行实例化我们就直接看NioServerSocketChannel的构造函数即可看看都干了什么,见io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
/**
* Create a new instance
*/
public NioServerSocketChannel() {
这里主要干了两件事情:
(1)newSocket(DEFAULT_SELECTOR_PROVIDER),其实这里就是创建了Java NIO中的ServerSocketChannel
(2)this方法继续调用构造函数
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//(2)处会调用这个构造函数
public NioServerSocketChannel(ServerSocketChannel channel) {
//我们主要关心这个调用父类的构造函数干了什么,看见SelectionKey.OP_ACCEPT有没有很熟悉的样子
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码
见io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
//我们仍然继续往里跟进
super(parent);
this.ch = ch;
//这个地方把SelectionKey.OP_ACCEPT事件保存到成员变量中了哦
this.readInterestOp = readInterestOp;
try {
//设置成非阻塞模式,有没有感觉又很熟悉了?
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
复制代码
见io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
//这里把Java NIO的ServerSocketChannel保存进了成员变量
this.parent = parent;
//给这个Channel一个id
id = newId();
//这个先忽略,以后介绍
unsafe = newUnsafe();
//看看这个地方,是不是生成了一个Pipeline和一个Channel关联了?
pipeline = newChannelPipeline();
}
复制代码
我们来心中回顾一下创建一个NioServerSocketChannel都干了哪些事情:
- (1)创建了一个Java NIO底层的SeverSocketChannel,作为自己的成员变量;
- (2)给自己弄了个id;
- (3)创建了一个Pipeline作为自己的成员变量;
- (4)设置成了非阻塞模式。
(2)注册selector
config().group()
拿到的就是我们的bossGroup(具体怎么区分的,可以自己通过丢弃服务代码b.group(bossGroup, workerGroup)跟进去看一下),那整个config().group().register(channel);
我们就可以理解为在EventLoop上注册我们的Channel。
接下来我们先忽略Netty如何选择bossGroup哪个EventLoop直接跳到在EventLoop上注册的方法,见io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//用NioServerSocketChannel的unsafe注册,注意这个地方传参是把当前的EventLoop和NioServerSocketChannel都传进去了
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
继续跟进,见io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
//现在咱们再mian主线程里,所以会走else
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//这个地方涉及到eventLoop的启用,我们下一章讲
eventLoop.execute(new Runnable() {
@Override
public void run() {
//所以会在当前eventLoop进行注册
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
复制代码
往register0里继续跟,见io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//其他不关心继续往里跟这个方法
doRegister();
...
}
复制代码
跟进doRegister(),见io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//看到这段代码是不是又有莫名的亲切感,让我们回想一下《从Java NIO到Netty(二)》的内容
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
复制代码
在上述代码中,javaChannel()拿到的就是Java NIO的ServerSocketChannel,这个register方法也是Java NIO的那个register方法,把这个ServerSocketChannel注册到当前eventLoop的selector上面去。
那可能有人问了,ServerSocketChannel应该把OP_ACCEPT事件注册到selector上面去,这里怎么体现的呢?别着急,往下看。
2 端口绑定逻辑
当创建完NioServerSocketChannel后,会调用doBind0把创建好的NioServerSocketChannel传进来,见io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
//在NioServerSocketChannel的eventLoop上执行bind
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
复制代码
继续跟进,见io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
boolean wasActive = isActive();
try {
//(1)端口绑定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
(2)在pipeline传播ChannelActive事件:主要关心如何把OP_ACCEPT事件注册到selector上面的
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
复制代码
(1)端口绑定
继续往里跟,见io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//这个地方是不是又感觉很熟悉?就是调用Java NIO的bind进行端口绑定的,见《从Java NIO到Netty(一)》
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
复制代码
(2)在pipeline传播ChannelActive事件
往里跟,见io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//这个地方就是把事件继续向下一个链表节点传播
ctx.fireChannelActive();
//主要看这个
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//往这里面走
channel.read();
}
}
复制代码
继续往里走直到走到io.netty.channel.nio.AbstractNioChannel#doBeginRead(如果不知道怎么走的建议debug模式自己往里跟)
@Override
protected void doBeginRead() throws Exception {
//这个官方注释也表述很清楚了,这两个方法最终会调用到这个地方
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//还记得我们之前创建NioServerSocketChannel时把SelectionKey.OP_ACCEPT事件赋值给了这个readInterestOp吗?看这个地方就把这个事件加到这个selectionKey中了
selectionKey.interestOps(interestOps | readInterestOp);
}
}
复制代码
综上,我们bind源码就算讲解完了,有没有心中有数都干了什么?可以对比《从Java NIO到Netty(四)》中的这块代码看,只不过多了selector在eventloop上而已。那么我们下一章就讲一下两个EventLoopGroup的源码。
//打开ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//监听端口9999
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//创建Selector
Selector selector = Selector.open();
//将选serverSocketChannel注册到selector,并在注册过程中指出该serverSocketChannel可以进行Accept操作
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
复制代码