前言
Netty框架的原理是Reactor模型中基于多个反应器的多线程模式,本篇文章主要介绍Netty较为重要的几个概念,编写思路借鉴了参考资料中的文章
ChannelFuture
我们先来了解了解Netty中几个较为重要的接口
public interface Future<V> extends java.util.concurrent.Future<V> {
// I/O操作是否成功,成功返回true
boolean isSuccess();
// 是否可取消
boolean isCancellable();
// 抛出I/O操作失败原因
Throwable cause();
// 此处使用了观察者模式,该Future任务完成,这个Listener会立刻收到通知
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待Future任务完成,如果任务失败会抛出异常
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
// 等待Future任务完成,如果任务失败不会抛出异常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 立即获取Future任务结果,如果Future任务未完成会返回null,所以在使用这个方法之前最好先判断这个Future任务是否完成
V getNow();
}
复制代码
Netty的Future接口继承了jdk1.5的Future接口,在本身已经有Future接口的情况下为什么要重复造轮子?
这是因为jdk1.5的Future接口不满足Netty的需求,jdk的Future接口可以获取异步计算的结果,并且提供了多种方法,可查看任务是否取消是否完成,但是使用者无法知道方法什么时候完成,比如某用户提交了一个Future任务,什么时候才能去调用get()方法获取结果,总不能循环调用isDone()方法吧,这样太消耗cpu资源。Netty的Future接口一定程度上弥补了这个缺陷,通过新增监听器,可以得知该任务是否完成以及任务完成后该做的事情,isSuccess可以得知任务是否成功,netty的Future接口更加智能
public interface ChannelFuture extends Future<Void> {
// 返回ChannelFuture关联的Channel
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
}
复制代码
ChannelFuture继承了Netty的Future接口,由于Netty中所有的I/O操作都是异步了,所以当方法返回时,不代表I/O操作已经完成,所以ChannelFuture封装了异步I/O操作的结果,接口定义的方法与Netty的Future接口相似,并没有什么新鲜的,值得一提的是ChannelFuture关联了Channel
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
// 此operation关联的Future任务完成时,这个方法会被调用
void operationComplete(F future) throws Exception;
}
复制代码
GenericFutureListener接口中定义了Listener的回调方法,当Future任务完成时,会回调此类中的方法
public interface Promise<V> extends Future<V> {
// 标记该Future任务成功,并且会通知所有的Listener
// 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果)
Promise<V> setSuccess(V result);
// 标记该Future任务成功,并且会通知所有的Listener
// 操作失败不抛出异常,返回false
boolean trySuccess(V result);
// 标记该Future任务失败,并且会通知所有的Listener
// 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果)
Promise<V> setFailure(Throwable cause);
// 标记该Future任务失败,并且会通知所有的Listener
// 操作失败不抛出异常,返回false
boolean tryFailure(Throwable cause);
// 标记该Future任务不可取消
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
复制代码
Promise同样也继承了Netty的Future接口,由于Netty的Future接口中没有写操作相关的接口,所以Netty通过Promise进行扩展,用于设置I/O操作的结果,接口中的setSuccess()、setFailure()方法会在任务完成后调用,然后回调Listener中的方法,经过这些操作后,await() 或 sync() 的线程就会从等待中返回。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override
Channel channel();
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
ChannelPromise unvoid();
}
复制代码
ChannelPromise接口同时继承了ChannelFuture, Promise,拥有双方的特性,接口中的方法同样跟之前的接口非常相似,只是返回值变成了ChannelPromise
看完以上的接口后,我们来看看Netty中对于这些接口的实现
观察这张类图,可以发现,DefaultPromise实现了Promise,DefaultChannelPromise实现了ChannelPromise并且继承了DefaultPromise,DefaultPromise由于没有实现ChannelFuture,所以没有ChannelFuture相关的特性,所以要看Netty中关于以上接口的实现,应该去看DefaultChannelPromise这个类
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
private long checkpoint;
/**
* Creates a new instance.
*
* @param channel
* the {@link Channel} associated with this future
*/
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = checkNotNull(channel, "channel");
}
@Override
public ChannelPromise setSuccess(Void result) {
super.setSuccess(result);
return this;
}
@Override
public ChannelPromise setFailure(Throwable cause) {
super.setFailure(cause);
return this;
}
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
}
复制代码
观察DefaultChannelPromise的代码,可以发现,很多方法都调用了DefaultPromise父类中的方法,所以我们转移一下战场,去看DefaultPromise的代码
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
// 唤醒Listeners
notifyListeners();
}
return true;
}
return false;
}
复制代码
可以看到setSuccess的步骤就是设置值,然后唤醒所有的Listeners,如果这个操作失败,会抛出异常,trySuccess也是同样的步骤,但是不会抛出异常
ChannelPipeline
ChannelPipeline本身是一个与Channel关联的容器对象,这个容器中存放了多个ChannelHandlerContext,ChannelHandlerContext中存放的是我们编写的ChannelHandler对象,多个ChannelHandlerContext使用链表串联,I/O事件按照顺序经过ChannelPipeline中的一个个ChannelHandler
如上图
Netty中的事件分为Inbound事件和Outbound事件,Inbound事件通常由I/O线程触发,例如TCP链路建立事件、读事件,Outbound事件通常是用户主动发起的网络I/O事件,例如连接事件、读事件
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
复制代码
以上是一段添加ChannelHandler对象的代码,以Inbound开头的类意味着它是一个入站Handler,以Outbound开头的类表示它是一个出站Handler,我们猜测一下这些ChannelHandler的执行顺序。
3、4没有实现ChannelnboundHandler,1、2没有实现ChannelOutboundHandler,5既实现了ChannelnboundHandler又实现了ChannelOutboundHandler,按照先执行Inbound事件,再执行Outbound事件的规则的话,执行顺序应该是1->2->5->3->4->5。
实际上不是的,Inbound事件的执行顺序是从前往后,Outbound事件的执行顺序是从后往前,所以执行顺序是1->2->5->5->4->3
ChannelPipeline的创建时机
前面讲过ChannelPipeline和Channel是一一搭配的,所以Channel创建的时候ChannelPipeline也会随之创建
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 此处会调用下面的方法
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
// 创建DefaultChannelPipeline
return new DefaultChannelPipeline(this);
}
复制代码
// 此方法就是创建两个链表节点,并且让头节点和尾节点双向连接
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 观察TailContext,可以发现TailContext继承了AbstractChannelHandlerContext
// 说明ChannelPipeline中的节点是ChannelHandlerContext,不是ChannelHandler
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
NioEventLoopGroup
可以看到NioEventLoopGroup最顶层继承的接口是Executor,说明NioEventLoopGroup就是一个线程池,NioEventLoop是其创建出来的一个线程
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
// 注意,这里executor赋值为null
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 看到这里,我们可以知道Netty默认的线程数是2 * CPU 个
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
复制代码
构造器的代码一路追,终于找到干正事的方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
// 因为构造器中赋值为null,所以此处executor为ThreadPerTaskExecutor()
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 这里很关键,下面细说
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 新建线程失败,将线程优雅关闭
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 选择合适的轮训机制
chooser = chooserFactory.newChooser(children);
// 新建一个监听器,用于监听是否所有线程都terminated了
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 给所有的EventExecutor都设置上这个监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
复制代码
// executors的数量是2的幂次方和非2的幂次方使用不同的轮训方式
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 判断executors是否是2的幂次方
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
复制代码
上面说到,NioEventLoopGroup是一个线程池,NioEventLoop是其创建出来的一个个线程,上面的newChild()方法便是创建一个个NioEventLoop,我们来看看NioEventLoop的构造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
// 这里可以看到selector跟NioEventLoop进行了绑定
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
// 这里是一个关键点,对一个任务队列进行了赋值,这里的任务队列有什么用呢?
// NioEventLoop一部分时间会执行I/O任务,一部分时间执行非I/O任务,在执行I/O任务时,如果有任务过来,会先把任务放
// 到任务队列中
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
复制代码
到这里为止,NioEventLoop便跟selector关联起来了
未完待续…..