Netty源码(九)核心组件概述

前言

本章开始Netty核心源码阅读。

  • 阅读官方案例,了解Netty核心组件使用
  • Netty核心组件概述:Channel、ChannelHandler、ChannelPipeline、ChannelHandlerContext、EventLoop、EventLoopGroup、BootStrap。这部分做简单概述,后续章节会深入分析
  • 开源框架如何配置Netty:Dubbo、ShardingProxy、RocketMQ

一、案例

首先看一下官方提供的EchoServer和EchoClient配置。

1、EchoServer

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        // boss线程组,线程数为1
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // worker线程组,线程数为核数*2
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            // 配置主从reactor线程模型
            b.group(bossGroup, workerGroup)
                    // 配置ServerSocket的channel类型
             .channel(NioServerSocketChannel.class)
                    // 配置ServerSocket的channel
             .option(ChannelOption.SO_BACKLOG, 100)
                    // 配置ServerSocketChannel需要执行的Handler
             .handler(new LoggingHandler(LogLevel.INFO))
                    // 配置客户端SocketChannel需要执行的Handler
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            // 关闭线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
复制代码

Server端启动配置分为几步:

  • 线程模型配置:往往Server端使用主从Reactor线程模型。
    • Boss线程组负责处理accept事件接收客户端channel,将客户端channel注册到Worker线程对应的Selector上。
    • Worker线程组负责处理读写事件,可能还需要负责编解码,取决于编解码工作是否很复杂,会阻塞IO线程。
    • 其他业务往往会放到独立的一个线程池中执行,EchoServer没有体现这一点。
  • Channel类型配置:这里是服务端,使用NioServerSocketChannel。
  • 选项参数配置:分为option和childOption。option是服务端Socket配置,childOption是客户端Socket配置。
  • Handler配置:分为handler和childHandler。handler配置服务端Channel需要执行的Handler,childHandler配置客户端Channel需要执行的Handler。

Server端启动,通过调用ServerBootstrap的bind方法启动,bind方法返回一个ChannelFuture,通过sync方法同步等待Server启动完成。最后通过closeFuture.sync阻塞等待ServerSocket关闭。

2、EchoClient

public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            // 配置事件循环组
            b.group(group)
                    // 配置channel类型
             .channel(NioSocketChannel.class)
                    // 配置option
             .option(ChannelOption.TCP_NODELAY, true)
                    // 配置客户端channel的Handler
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     p.addLast(new EchoClientHandler());
                 }
             });
            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}
复制代码

Client端启动配置相对简单,分为几步:

  • 事件循环组配置:客户端只需要配置一个事件循环组,用于处理channel上的不同事件。一般业务逻辑会单独放在另外的线程池处理,EchoClient没有体现。
  • channel类型配置:客户端的channel类型是NioSocketChannel。
  • 选项参数配置:客户端只需要配置option。
  • Handler配置:客户端只需要配置一个handler。

客户端通过Bootstrap的connect方法与服务端建立连接,返回一个ChannelFuture,然后通过sync方法同步等待连接建立。连接建立完成后,通过closeFuture.sync阻塞等待连接关闭。

二、Channel

Channel类图.png

Channel译为通道,负责处理底层通讯关联Netty组件

父接口AttributeMap持有属性的能力,通过attr方法获取属性,hasAttr方法判断属性是否存在。

public interface AttributeMap {
    <T> Attribute<T> attr(AttributeKey<T> key);
    <T> boolean hasAttr(AttributeKey<T> key);
}
复制代码

父接口ChannelOutboundInvoker,Channel出栈执行器,具备底层通讯能力。可以看到Netty的操作都是异步的,方法调用后都会返回Future或Promise对象,当实际操作完成后会通过Future或Promise收到通知。

public interface ChannelOutboundInvoker {
	// 绑定端口
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
    // 关闭当前资源
	ChannelFuture close();
    ChannelFuture close(ChannelPromise promise);
	// 建立连接/关闭连接
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
	ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture disconnect();
    ChannelFuture disconnect(ChannelPromise promise);
	// 从EventLoop注销
    ChannelFuture deregister();
    ChannelFuture deregister(ChannelPromise promise);
    // 设置关心的I/O事件为OP_READ
    ChannelOutboundInvoker read();
    // 写
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
	// 构造future/promise
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);
    ChannelPromise voidPromise();
}
复制代码

Channel接口本身提供了通道状态Netty组件的关联关系

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    /** 通道状态 **/
    // 是否打开
    boolean isOpen();
	// 是否注册到EventLoop上了
    boolean isRegistered();
	// 是否已经激活(建立连接)
    boolean isActive();
    // 返回一个Future,当通道关闭后会收到通知
    ChannelFuture closeFuture();
    // 当io线程立即可以执行写操作,返回true
    boolean isWritable();
    // 当isWritable返回true时,返回可写字节数;否则返回0
    long bytesBeforeUnwritable();
    // 当isWritable返回false时,返回可读字节数;否则返回0
    long bytesBeforeWritable();
    // 通道本地地址
    SocketAddress localAddress();
    // 通道连接远程地址
    SocketAddress remoteAddress();
    
    /** 关联Netty组件 **/
    // Channel的唯一标识
    ChannelId id();
    // 当前Channel注册的EventLoop
    EventLoop eventLoop();
    // 父Channel
    // 被ServerSocketChannel接受的SocketChannel将返回ServerSocketChannel作为其parent
    Channel parent();
    // 配置
    ChannelConfig config();
    // 元数据
    ChannelMetadata metadata();
    // 关联Channel.Unsafe实例,Unsafe实例是真正底层负责处理通讯的类。
    // 用户代码不会直接操作,所以是Unsafe
    Unsafe unsafe();
    // ChannelPipeline
    ChannelPipeline pipeline();
    // ByteBuf分配器
    ByteBufAllocator alloc();
}
复制代码

Channel.Unsafe是Channel接口里声明的内部接口,是Netty自己使用的,不会被用户代码直接调用,它真正负责底层通讯。

interface Unsafe {
    RecvByteBufAllocator.Handle recvBufAllocHandle();
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();
    ChannelPromise voidPromise();
    ChannelOutboundBuffer outboundBuffer();
}
复制代码

ServerChannel是一个标记接口,继承Channel接口,通过ServerChannel accept创建Channel,前者是后者的parent,ServerChannel的实现例子就是NioServerSocketChannel。

/**
 * A {@link Channel} that accepts an incoming connection attempt and creates
 * its child {@link Channel}s by accepting them.  {@link ServerSocketChannel} is
 * a good example.
 */
public interface ServerChannel extends Channel {
    // This is a tag interface.
}
复制代码

三、ChannelHandler与ChannelPipeline

1、ChannelHandler

ChannelHandler接口提供了ChannelHandler被ChannelHandlerContext加入或移除时的钩子。

public interface ChannelHandler {
    // 当ChannelHandler被加入上下文触发
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    // 当ChannelHandler被从上下文移除触发
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
}
复制代码

ChannelHandler又分为入栈和出栈。

ChannelInboundHandler

ChannelInboundHandler处理入栈,主要是在Channel上发生事件时触发。

public interface ChannelInboundHandler extends ChannelHandler {
    // 当Channel注册到EventLoop上后触发
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    // 当Channel从EventLoop注销后触发
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    // Channel激活后触发(如连接建立)
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    // Channel未激活后触发(如连接关闭)
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    // 当Channel读取到msg数据后触发
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    // channelRead结束后触发
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    // 用户自定义事件触发
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    // Channel的isWritable发生变化后触发
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
	// 发生异常时触发
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码

可以看到ChannelInboundHandler定义了多个钩子方法,其中对于业务开发最重要的是channelRead方法,这一步可以拿到channel通道里的数据。但是如果要实现ChannelInboundHandler,就要实现这里面所有的方法,尽管什么事都不做。

为了解决这个问题,Netty提供了ChannelInboundHandlerAdapter,用户代码可以继承ChannelInboundHandlerAdapter,这样就不需要实现所有的钩子方法,只需要实现自己关心的方法即可。其中@Skip注解的方法不会被调用,除非子类重写这个方法,原理后续再说。

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }
    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }
	// ... 省略其他实现
}
复制代码

ChannelOutboundHandler

ChannelOutboundHandler处理出栈,当用户使用channel读写时会触发这里的钩子方法,这里的方法与Channel继承的ChannelOutboundInvoker接口基本一一对应。

public interface ChannelOutboundHandler extends ChannelHandler {
    // 当bind操作触发时
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
	// 当connect操作触发时
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
    // 当disconnect操作触发时
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
	// 当close操作触发时
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
	// 当deregister操作触发时
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    // 当read操作触发时
    void read(ChannelHandlerContext ctx) throws Exception;
    // 当write操作触发时
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    // 当flush操作触发时
    void flush(ChannelHandlerContext ctx) throws Exception;
}
复制代码

ChannelOutboundHandler和ChannelInboundHandler一样,Netty提供了ChannelOutboundHandlerAdapter适配,代码不贴了,和ChannelInboundHandlerAdapter的实现方式一致。

除此之外,如果要同时处理出栈和入栈,Netty提供了一个ChannelDuplexHandler

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
                     ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }
    // ... 省略其他ChannelOutboundHandler方法实现
}
复制代码

2、ChannelPipeline

有了ChannelHandler之后存在以下几个问题:

  • 单个ChannelHandler无法处理整个网络请求,往往一个ChannelHandler只会处理一次请求的部分业务。比如编解码和业务处理往往是不同的Handler。
  • 当存在多个ChannelHandler的情况下,如何编排ChannelHandler。
  • Channel接口继承了ChannelOutboundInvoker接口,可以触发ChannelOutboundHandler出栈Handler执行,如何触发入栈Handler执行。

ChannelPipeline就是为了解决这些问题而生的,他不但继承了ChannelOutboundInvoker接口可以触发出栈事件,同时继承了ChannelInboundInvoker可以触发入栈事件。这些fireXXX与ChannelInboundHandler的方法一一对应,不做解释。

public interface ChannelInboundInvoker {
    ChannelInboundInvoker fireChannelRegistered();
    ChannelInboundInvoker fireChannelUnregistered();
    ChannelInboundInvoker fireChannelActive();
    ChannelInboundInvoker fireChannelInactive();
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);
    ChannelInboundInvoker fireUserEventTriggered(Object event);
    ChannelInboundInvoker fireChannelRead(Object msg);
    ChannelInboundInvoker fireChannelReadComplete();
    ChannelInboundInvoker fireChannelWritabilityChanged();
}
复制代码

同时ChannelPipeline维护了多个ChannelHandler,由Pipeline传播Inbound和Outbound事件。

ChannelPipeline.png

ChannelPipeline接口提供了链表的增删改查操作,这些链表节点看起来是一个一个的ChannelHandler,实际上是ChannelHandlerContext

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
    // 增
    ChannelPipeline addLast(ChannelHandler... handlers);
    // 带线程组的增
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
    // 删
    ChannelPipeline remove(ChannelHandler handler);
    // 改
    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
    // 查
    ChannelHandler get(String name);
    // ... 省略其他类似方法
}
复制代码

假设我们创建了以下管道:

ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
复制代码

在上面的示例中,其名称以Inbound开头的类表示它是一个入站处理程序。 名称以Outbound开头的类表示它是一个出站处理程序,InboundOutboundHandlerX同时处理出入站。
事件进入时,处理程序评估顺序为1、2、3、4、5。 事件出站时,顺序为5、4、3、2、1 。ChannelPipeline跳过对某些处理程序的求值,以缩短堆栈深度:

  • 3和4没有实现ChannelInboundHandler ,因此入站事件的实际评估顺序为:1、2和5。
  • 1和2没有实现ChannelOutboundHandler ,因此出站事件的实际评估顺序为:5、4和3。

3、ChannelHandlerContext

ChannelHandlerContext是一个上下文对象,持有Channel/ChannelHandler/ChannelPipeline重要组件。同时它也继承了ChannelInboundInvoker和ChannelOutboundInvoker,可以触发入栈和出栈事件。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
    // 绑定这个Context的Channel
    Channel channel();
    // 使用的线程池服务
    EventExecutor executor();
	// 唯一名称
    String name();
	// 绑定这个Context的ChannelHandler
    ChannelHandler handler();
    // 绑定的ChannelHandler是否移除
    boolean isRemoved();
    // 这个Context属于哪个Pipeline
    ChannelPipeline pipeline();
	// ByteBuf分配器
    ByteBufAllocator alloc();
}
复制代码

ChannelHandlerContext的链表结构,是在抽象类AbstractChannelHandlerContext中体现的。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
}
复制代码

AbstractChannelHandlerContext在触发入栈事件时会选择下一个需要执行的链表节点。比如fireChannelRegistered时,会查询链表后面对channelRegistered方法没有@Skip注解的Handler实现类(注意Context与Handler是一一绑定的,找到Context就找到了Handler)。

@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
}
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.next;
        // 判断是否要跳过下一个AbstractChannelHandlerContext
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    }
    // ...
}
private void invokeChannelRegistered() {
    // ...
    // 执行当前上下文绑定的ChannelInboundHandler的channelRegistered方法
    ((ChannelInboundHandler) handler()).channelRegistered(this);
}
复制代码

四、EventLoop与EventLoopGroup

1、EventLoop

EventLoop类图.png

EventLoop译为事件循环,事件驱动的编程模型都有EventLoop这个概念(如javascript)。Netty的EventLoop可以认为是EventLoopGroup的一个特殊实现,它是一个线程构成的线程池服务。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}
复制代码

NioEventLoop是对于JDK NIO的事件循环处理,每个NioEventLoop都持有一个Selector,处理Selector上的SelectionKey事件。

public final class NioEventLoop extends SingleThreadEventLoop {
    /**
     * The NIO {@link Selector}.
     */
    private Selector selector;
}
复制代码

NioEventLoop除了处理事件以外,还会根据实际策略执行一些任务,简化流程如下。

@Override
protected void run() {
    for (;;) {
        // 轮询是否有事件发生
        select();
        // 处理事件
	processSelectedKeys();
        // 执行任务
        runAllTasks();
    }
}
复制代码

NioEventLoop的父类SingleThreadEventExecutor绑定线程,并持有任务队列。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    // 任务队列
    private final Queue<Runnable> taskQueue;
	// EventLoop绑定的线程
    private volatile Thread thread;
    // JDK Executor 单线程提供服务
    private final Executor executor;
}
复制代码

Netty的串行化设计避免了线程竞争问题,核心在于EventLoop的设计。将每个需要同步的对象,都关联一个对应的EventLoop线程,专门用于处理这个对象产生的任务,这也是为什么EventLoop除了事件循环以外,还需要执行任务。所以netty的源码中经常能看到如下代码:

// 将任务放入channel对象关联的EventLoop执行
channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
        // ...
    }
});
复制代码

或这样的代码:

// 判断ctx对象关联的EventLoop是不是当前线程
if (ctx.executor().inEventLoop()) {
    // 如果是的话直接执行
    closeOutbound0(promise);
} else {
    // 否则放入对应线程的EventLoop的任务队列
    ctx.executor().execute(new Runnable() {
        @Override
        public void run() {
            closeOutbound0(promise);
        }
    });
}

// io.netty.util.concurrent.AbstractEventExecutor#inEventLoop
@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}
// io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
@Override
public boolean inEventLoop(Thread thread) {
    // 判断EventLoop绑定线程,与入参线程是否一致
    return thread == this.thread;
}
// io.netty.util.concurrent.SingleThreadEventExecutor#execute
private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // 将task加入任务队列
    addTask(task);
    // 如果当前线程的不是EventLoop绑定的线程,尝试开启这个EventLoop对应的线程
    if (!inEventLoop) {
        startThread();
    }
	  // 唤醒EventLoop
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}
// NioEventLoop#wakeup
@Override
protected void wakeup(boolean inEventLoop) {
  // 如果是另外一个线程唤醒,且之前并非唤醒状态,唤醒selector
  if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
    selector.wakeup();
  }
}
复制代码

2、EventLoopGroup

EventLoopGroup类图.png

EventLoopGroup管理多个EventLoop,主要负责选择EventLoop并注册Channel到对应的EventLoop上。接口作用和方法职责在javadoc上很清楚,两个register方法的目的都是一样的,将Channe注册到EventLoop上。

/**
 * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get
 * processed for later selection during the event loop.
 */
public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     */
    EventLoop next();
    /**
     * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
     * will get notified once the registration was complete.
     */
    ChannelFuture register(Channel channel);
    /**
     * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
     * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
     */
    ChannelFuture register(ChannelPromise promise);
}
复制代码

五、BootStrap与ServerBootStrap

1、AbstractBootstrap

BootStrap译为引导程序,方便客户端配置和启动Netty。AbstractBootStrap是客户端引导程序BootStrap和服务端引导程序ServerBootStrap的公共父类,主要提供几个能力:

配置

handler、option、attr、group、channel类型等通用配置的配置能力。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    // EventLoopGroup 理解为线程池 每个线程持有一个Selector
    volatile EventLoopGroup group;
    // 构建Channel实例的工厂,根据配置channel类型不同,创建不同Channel实例
    private volatile ChannelFactory<? extends C> channelFactory;
    // 选项
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    // 属性
    private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    // 处理器
    private volatile ChannelHandler handler;
}
复制代码

创建并注册Channel

根据配置的Channel类型通过ChannelFactory创建Channel,根据配置的handler/option/attr初始化Channel,最后根据配置的group使用EventLoopGroup的register方法注册Channel。

final ChannelFuture initAndRegister() {
    // 1. 创建Channel
    Channel channel = channelFactory.newChannel();
    // 2. 初始化Channel
    init(channel);
    // 3. 注册Channel
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}
复制代码

创建Channel并绑定端口

AbstractBootStrap是客户端和服务端的抽象父类,创建Channel并绑定端口这个能力往往只有服务端使用。

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1. 创建并注册Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 2. 绑定端口
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // ...
    }
}
复制代码

需要子类实现的方法

AbstractBootStrap需要子类实现几个方法。

init:子类需要提供初始化channel的实现,往往子类会通过父类的辅助方法设置channel的option/attr,并添加handler。

abstract void init(Channel channel) throws Exception;
复制代码

config:返回配置信息。AbstractBootstrapConfig是服务端和客户端配置的抽象父类。

public abstract AbstractBootstrapConfig<B, C> config();
复制代码

clone:子类需要支持克隆。

public abstract B clone();
复制代码

2、BootStrap

BootStrap是客户端引导类。

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    // 构造AbstractBootstrapConfig
    private final BootstrapConfig config = new BootstrapConfig(this);
}
复制代码

BootStrap的config和clone方法都很简单直接贴一下。

public Bootstrap clone() {
    return new Bootstrap(this);
}
public final BootstrapConfig config() {
    return config;
}
复制代码

重点在于BootStrap的init方法,这里将配置的handler加入ChannelPipeline,将配置的attr和option设置到Channel上。

void init(Channel channel) {
    ChannelPipeline p = channel.pipeline();
    // 将配置的handler加入ChannelPipeline
    p.addLast(config.handler());
    // 父类方法设置Channel的option
    setChannelOptions(channel, newOptionsArray(), logger);
    // 父类方法设置Channel的attr
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
复制代码

除了父类的抽象方法实现以外,BootStrap还提供了connect方法用于连接服务端。这里不细说,后续讲启动流程时再说。

private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}
复制代码

3、ServerBootStrap

ServerBootStrap是服务端启动引导类。因为Server端往往基于主从Reactor模型,所以除了Server端的group/handler/option/attr配置以外,ServerBootStrap还可以配置childGroup/childHandler/childOption/childAttr。主Reactor负责处理连接事件,辅Reactor负责处理读写事件。

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    // 客户端Channel的相关配置
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    private volatile EventLoopGroup childGroup;
    private volatile ChannelHandler childHandler;
    // 服务端配置
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
}
复制代码

ServerBootStrap的config和clone方法,实现方式与BootStrap一致。

public final ServerBootstrapConfig config() {
    return config;
}
public ServerBootstrap clone() {
    return new ServerBootstrap(this);
}
复制代码

重点看一下ServerBootStrap的init方法。主要是设置ServerChannel的option/attr/handler。值得注意的是,child相关配置传入了负责处理ServerChannel的ServerBootstrapAcceptor这个Handler中,后续会用到。

@Override
void init(Channel channel) {
    // 设置ServerChannel的option和attr
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    // 设置ServerChannel的handler
    ChannelPipeline p = channel.pipeline();
	// 组装child相关属性
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            // 针对ServerChannel的Handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            // 除了配置的Handler,额外加入了Netty自己的ServerBootstrapAcceptor负责处理连接事件
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 构造ServerBootstrapAcceptor时,将child相关配置都传入了
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
复制代码

六、开源框架使用Netty的配置案例

1、Dubbo

Dubbo默认使用Netty作为底层通讯框架。服务端配置入口NettyServer#doOpen

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
    // Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)
    workerGroup = NettyEventLoopFactory.eventLoopGroup(
            getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            "NettyServerWorker");
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NettyEventLoopFactory.serverSocketChannelClass())
            .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // ...
                }
            });
}
复制代码
  • bossGroup线程数:1。
  • workerGroup线程数:优先取URL定义的,否则取核数 + 1,最大不超过32。Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)
  • option
    • SO_REUSEADDR:true,Socket参数,地址复用,默认值False。有四种情况可以使用:
      • (1).当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而你希望启动的程序的socket2要占用该地址和端口,比如重启服务且保持先前端口。
      • (2).有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。
      • (3).单个进程绑定相同的端口到多个socket上,但每个socket绑定的ip地址不同。
      • (4).完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。
    • TCP_NODELAY:true,TCP参数,立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。该值设置Nagle算法的启用(false),该算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时。
    • ALLOCATOR:PooledByteBufAllocator.DEFAULT。指定ByteBuf的分配器。

客户端配置入口NettyClient#doOpen

// Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
private static final EventLoopGroup NIO_EVENT_LOOP_GROUP = eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");

protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(NIO_EVENT_LOOP_GROUP)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(socketChannelClass());

    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // ...
        }
    });
}
复制代码
  • IO线程数:核数 + 1,最大不超过32,Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)

  • option

    • SO_KEEPALIVE:true,Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。这也是为什么Dubbo叫默认单一长连接。
    • TCP_NODELAY:true,TCP参数,立即发送数据,关闭Nagle算法。
    • ALLOCATOR:PooledByteBufAllocator.DEFAULT。
    • CONNECT_TIMEOUT_MILLIS:连接超时时间,取配置的超时事件,最少3000毫秒。

2、ShardingProxy

ShardingProxy服务端使用Netty,入口在ShardingProxy#start

public void start(final int port) {
    ServerBootstrap bootstrap = new ServerBootstrap();
    // bossGroup 线程数量 1
    bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
    // workerGroup 线程数量 核数*2
    groupsNio(bootstrap);
    // ...
}

private void groupsNio(final ServerBootstrap bootstrap) {
    // 使用NettyEventLoopGroup的默认线程数 = 核数*2
    workerGroup = new NioEventLoopGroup();
    bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 128)
        .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ServerHandlerInitializer());
}
复制代码
  • bossGroup:线程数1。
  • workerGroup:使用NettyEventLoopGroup的默认线程数,核数*2。
  • option
    • SO_BACKLOG:128。Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。
    • WRITE_BUFFER_WATER_MARK:8M-16M。Netty写缓冲区的水位线配置,默认32K-64K。WriteBufferWaterMark用于设置写缓冲区的低水位标记和高水位标记。如果写入缓冲区中排队的字节数超过了高水位线,则Channel.isWritable()将开始返回false 。如果在写缓冲区中排队的字节数超过了高水位线,然后降到了低水位线以下,则Channel.isWritable()将再次开始返回true 。
    • ALLOCATOR:PooledByteBufAllocator.DEFAULT。
  • childOption
    • ALLOCATOR:PooledByteBufAllocator.DEFAULT。
    • TCP_NODELAY:true,关闭Nagle算法。

3、RocketMQ

RocketMQ的NameServer和Broker都使用了Netty作为底层通讯框架。

NameServer和Broker的Server端配置入口NettyRemotingServer#start

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
    this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
        //...
    });
    this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
        // ...
    });
}
public void start() {
    // 业务线程池 配置=8
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {
			// ...
        });

    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            encoder,
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            connectionManageHandler,
                            serverHandler
                        );
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }
}
复制代码
  • bossGroup:线程数为1。
  • workerGroup:线程数取配置的,默认是3。
  • 业务线程池:线程数取配置的,默认是8。这里把ChannelInitializer的实现都贴出来了,因为这里可以明显看出,往往boss和worker只做IO操作,耗时的业务操作都在另外一个业务线程池处理(defaultEventExecutorGroup)。其实其他框架也是一样的,只不过这个线程池没有外露到Netty的配置里,而是在某个业务Handler里将业务处理封装为Task放入自定义的业务线程池中。
  • option
    • SO_BACKLOG:1024。服务端接受连接的队列长度。
    • SO_REUSEADDR:true。地址复用。
    • SO_KEEPALIVE:false。连接保活。
  • childOption
    • TCP_NODELAY:true。关闭Nagle算法。
    • SO_SNDBUF:取配置的。Socket参数,TCP数据发送缓冲区大小,该缓冲区即TCP发送滑动窗口。
    • SO_RCVBUF:取配置的。Socket参数,TCP数据接收缓冲区大小,该缓冲区即TCP接收滑动窗口。

总结

  • 如何使用Netty:服务端配置ServerBootstrap,客户端配置BootStrap。在ChannelInitializer的initPipeline方法中,加入ChannelHandler做业务逻辑。
  • Netty核心组件:
    • Channel:通道,负责处理底层通讯关联Netty组件
    • ChannelHandler:ChannelInboundHandler处理入栈,当Channel发生事件时触发;ChannelOutboundHandler处理出栈,当用户使用Channel读写时触发。
    • ChannelPipeline:管道。继承ChannelOutboundInvoker/ChannelInboundInvoker接口,负责触发出入栈事件;ChannelHandler编排,通过ChannelHandlerContex的前后指针,实现出入栈事件在管道中传播。
    • ChannelHandlerContext:上下文对象。持有Channel/ChannelHandler/ChannelPipeline重要组件。同时继承ChannelInboundInvoker/ChannelOutboundInvoker,可以触发入栈和出栈事件。此外ChannelHandlerContext有前后指针指向前后ChannelHandlerContext,由ChannelPipeline控制实现事件传播。
    • EventLoop:事件循环。针对NioEventLoop来说,每个事件循环对应一个线程,持有一个Selector。事件循环处理分为三步:轮询事件、处理事件、执行任务。Netty串行化设计避免了线程竞争问题,核心在于EventLoop的设计,将每个需要同步的对象,都关联一个对应的EventLoop线程,专门用于处理这个对象产生的任务
    • EventLoopGroup:事件循环组。EventLoopGroup管理多个EventLoop,主要负责接入Channel后,选择一个EventLoop,将Channel注册到对应EventLoop中(也注册到了JDK的Selector中)。
    • BootStrap:引导程序。方便客户端配置和启动Netty。
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享