【Netty】服务端启动源码解析

这是我参与更文挑战的第26天,活动详情查看:更文挑战

一、前言

对应,简单 serverdemo,如下:

public class NettyServer {

    public static void main(String[] args) {
        // 线程组: Acceptor线程
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        // 线程组: Processor / Handler
        EventLoopGroup childGroup = new NioEventLoopGroup(); 

        try {
            // 相当于Netty的服务器
            ServerBootstrap serverBootstrap = new ServerBootstrap(); 
            serverBootstrap
                    .group(parentGroup, childGroup)
                	// 监听端口的ServerSocketChannel
                    .channel(NioServerSocketChannel.class)  
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 处理每个连接的SocketChannel
                    .childHandler(new ChannelInitializer<SocketChannel>() { 
                        @Override
                        protected void initChannel(SocketChannel socketChannel) 
                            throws Exception {
                            // 针对网络请求的处理逻辑
                            socketChannel.pipeline().addLast(new NettyServerHandler()); 
                        }

                    });
            // 同步等待启动服务器监控端口
            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); 
            // 同步等待关闭启动服务器的结果
            channelFuture.channel().closeFuture().sync(); 
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
复制代码

具体服务端创建,流程如下:

服务端启动.png

二、直接怼源码

ServerBootstrapbind() 方法实现:

// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    // 1. 创建 socket 
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    // 2. 校验参数
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }
    
    // 3. 真正执行,重要:
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化并绑定
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        
        // 若有异常则返回
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // 如果没有注册成功:
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // 添加对应的监听器
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                       
                        promise.setFailure(cause);
                    } else {
                        
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
} 
复制代码

doBind() 方法具体做了哪些事情:

  1. 调用 initAndRegister() 初始化并注册 Channel,同时返回一个 ChannelFuture 实例 regFuture,所以猜测出 initAndRegister() 是一个异步的过程。

  2. 通过 regFuture.cause() 方法判断 initAndRegister() 的过程是否发生异常,如果发生异常则直接返回。

  3. regFuture.isDone() 表示 initAndRegister() 是否执行完毕,如果执行完毕则调用 doBind0() 进行 Socket 绑定。

如果 initAndRegister() 还没有执行结束,regFuture 会添加一个 ChannelFutureListener 回调监听,当 initAndRegister() 执行结束后会调用 operationComplete(),同样通过 doBind0() 进行端口绑定。

服务端启动的全流程可分为如下四步:

  1. 创建服务端 Channel:本质是创建 JDK 底层原生的 Channel,并初始化几个重要的属性,包括 idunsafepipeline 等。

  2. 初始化服务端 Channel:设置 Socket 参数以及用户自定义属性,并添加两个特殊的处理器 ChannelInitializerServerBootstrapAcceptor

  3. 注册服务端 Channel:调用 JDK 底层将 Channel 注册到 Selector 上。

  4. 端口绑定:调用 JDK 底层进行端口绑定,并触发 channelActive 事件,把 OP_ACCEPT 事件注册到 Channel 的事件集合中。

(1)创建服务端 Channel

此过程,可分为 3 步:

  1. 反射创建实例:ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例
  2. 创建 SelectorProvider
  3. 设置参数:设置 Channel 为非阻塞模式,为 Channel 创建 idunsafepipeline 三个重要的成员变量

开始点,从这入口:

// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    // 方法名:初始化 ServerSocketChannel,注册到 Selector 多路复用轮询组件上
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1. 创建 ServerSocketChannel
            // 并且配置 blocking 为 false
            channel = channelFactory.newChannel();
            
            // 2. 初始化一些配置:端口、网络参数
            // 并将这个 channel 注册到 Selector 上,关注 OP_ACCEPT 网络事件,不断轮询
            init(channel);
        } catch (Throwable t) {
            // ... ...
        }
        // 3. 把 ServerSocketChannel 注册到 EventLoopGroup 上
        // 用 selector 轮询各种 channel 的网络事件
        // 让 EventLoopGroup 中的独立线程采用一个 Selector 来注册 channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
}
复制代码

channel = channelFactory.newChannel(); 这句其实对应原生 NIO 就是:

// 主要实现:
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
复制代码

1)反射创建实例

ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例。

// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    // 对应 demo 中的:.channel(NioServerSocketChannel.class)  
    // 监听端口的ServerSocketChannel
    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        // 入口:创建反射工厂
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
}
复制代码
  1. 那么,直接进入 ReflectiveChannelFactory
// 定位:io.netty.channel
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    // ... ...
    @Override
    public T newChannel() {
        try {
            // 反射方式,对应:NioServerSocketChannel.class
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}
复制代码

因为我们设置的 channelNioServerSocketChannel,所以下一步就是初始化这个。

  1. 初始化 NioServerSocketChannel
// 定位:io.netty.channel.socket.nio
public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    // 1. SelectorProvider 会根据操作系统类型和版本的不同,返回不同的实现类
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER 
        = SelectorProvider.provider();
    
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    // 2. 主要:
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            // 就是通过原生的 NIO 的API,来创建一个 ServerSocketChannel
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    // 3. 发现这块:设置感兴趣的事件,OP_ACCEPT
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}
复制代码

2)创建 SelectorProvider

提示:SelectorProviderJDK NIO 中的抽象类实现,通过 openServerSocketChannel() 方法可以用于创建服务端的 ServerSocketChannel

// 定位:java.nio.channels.spi
public abstract class SelectorProvider {

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                // 主要看这句:
                provider = sun.nio.ch.DefaultSelectorProvider.create();
            
        }
    }
}


// 定位:sun.nio.ch
// 根据操作系统类型和版本的不同,返回不同的实现类
// 具体可以参考 DefaultSelectorProvider 的源码实现:
public class DefaultSelectorProvider {
    public static SelectorProvider create() {
        String osname = AccessController
            .doPrivileged(new GetPropertyAction("os.name"));
        if (osname.equals("SunOS"))
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        if (osname.equals("Linux"))
            return createProvider("sun.nio.ch.EPollSelectorProvider");
        return new sun.nio.ch.PollSelectorProvider();
    }
}
复制代码

3)初始化配置

初始化时,会先初始化父类。

  1. 设置非阻塞
// 定位:io.netty.channel.nio
public abstract class AbstractNioChannel extends AbstractChannel {
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            // 设置成 非阻塞
            ch.configureBlocking(false);
        } 
        // ... ...
    }
}
复制代码
  1. 设置成员变量
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();  // Channel 全局唯一 id 
        unsafe = newUnsafe(); // unsafe 操作底层读写
        pipeline = newChannelPipeline();  // pipeline 负责业务处理器编排
    }
}
复制代码

(2)初始化服务端 Channel

再次从这入手:

// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    // 方法名:初始化 ServerSocketChannel,注册到 Selector 多路复用轮询组件上
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1. 创建 ServerSocketChannel
            // ... ...
            
            // 2. 初始化一些配置:端口、网络参数
            // 并将这个 channel 注册到 Selector 上,关注 OP_ACCEPT 网络事件,不断轮询
            init(channel);
        } catch (Throwable t) {
            // ... ...
        }
        // 3. 注册
        // ... ...
        return regFuture;
    }
}
复制代码

这次主要看init(channel);

  • init()AbstractBootstrap 抽象方法,需要子类实现

  • 因为是服务端(server),选择的是 ServerBootstrap

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            // 1. 设置 Socket 参数
            setChannelOptions(channel, options, logger);
        }

        // 2. 保存用户自定义属性
        final Map<AttributeKey<?>, Object> attrs = attrs0();
       
      
        // 3. 添加特殊的 Handler 处理器
        // 对网络请求链路中加入内置的一个处理逻辑
        p.addLast(new ChannelInitializer<Channel>() {
           // ... ...
        });
    }
}
复制代码

(3)注册服务端 Channel

入口:

// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    // 方法名:初始化 ServerSocketChannel,注册到 Selector 多路复用轮询组件上
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1. 创建 ServerSocketChannel
            // ... ...
            
            // 2. 初始化一些配置:端口、网络参数
            // ... ... 
        } catch (Throwable t) {
            // ... ...
        }
        // 此时重要:
        // 3. 把 ServerSocketChannel 注册到 EventLoopGroup 上
        // 用 selector 轮询各种 channel 的网络事件
        // 让 EventLoopGroup 中的独立线程采用一个 Selector 来注册 channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
}
复制代码

那么直接来看这行代码:ChannelFuture regFuture = config().group().register(channel);

  • 因为使用 NioEventLoopGroup ,而此类又继承 MultithreadEventLoopGroup
  • 所以直接到 MultithreadEventLoopGroupregister() 实现
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
     // 选择一个 eventLoop 注册
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}
复制代码

之后会走到 SingleThreadEventLoop

然后发现,会调用 AbstractChannel 的方法,如下:

// 定位:io.netty.channel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // ... ... 
        // 重要:
        register0(promise);
        // ... ... 
    }
    
    private void register0(ChannelPromise promise) {
        try {
            // ... ...
            // 重要:
            // 调用 JDK 底层的 register() 进行注册
            doRegister(); 
            // ... ...
        } catch (Throwable t) {
            // ... ...
        }
    }
}
复制代码

这时候,通过 doRegister() 去找子类的实现方法:

  • 因为使用是 NioChannel ,所以自然找到 AbstractNioChannel

代码如下:

public abstract class AbstractNioChannel extends AbstractChannel {
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 调用 JDK 底层的 register() 进行注册
                selectionKey = javaChannel()
                    .register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }
}
复制代码

这个相对于原生 NIO 的方法就是:

channel.register(selector, SelectionKey.OP_CONNECT);
复制代码

(4)端口绑定

接下来继续看 ServerBootstrapbind() 方法:

// 同步等待启动服务器监控端口
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
复制代码

进入源码,最后会调用 doBind()

// 定位:io.netty.bootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // ... ...
        
        // 重要:
        doBind0(regFuture, channel, localAddress, promise);
        
        // ... ... 
    }
    
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
}
复制代码

继续走:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();
    // 省略其他代码
    boolean wasActive = isActive();
    try {
        // 调用 JDK 底层进行端口绑定
        doBind(localAddress); 
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // ... ...
}
复制代码

最后,首先看下调用 JDK 底层进行端口绑定的 doBind() 方法:

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
复制代码

相对于原生 NIO ,代码:

SocketChannel channel = SocketChannel.open();
channel.register(selector, SelectionKey.OP_CONNECT);
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享