Handle消息机制解析
概述
Handler消息机制(由Handler/Looper/MessageQueue等构成),Android有大量的消息驱动方法来进行交互,就像Android的四大组件(Activity、Service、Broadcast、ContentProvider)的启动过程交互,都离不开Handler的消息机制,所以Android系统某种意义上说也是一种以消息驱动的系统。
架构图
相应的类(基于Android 8.0源码)
Handler
Handler概述
- Handler是一个可以通过关联一个消息队列来发送和处理消息,发送或处理Runnable对象的一个处理程序,每个Handler都关联一个单独的线程和消息队列,当你创建一个新的Handler的时候它就将绑定到一个线程或线程上的消息队列,从那时起,这个Handler就将为这个消息队列提供消息或Runnable对象,处理消息队列分发出来的消息或Runnable对象。
handler有什么用
-
发送消息到 MesageQueue
-
处理 looper 分发的消息
构造函数
public Handler(Callback callback, boolean async) {
//如果是匿名类、内部类、局不类(方法内部),且没有声明为STATIC,则存在内存泄露风险,所以要打印日志提醒开发者
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
//从ThreadLocal中获取当前线程的Looper
mLooper = Looper.myLooper();
//如果当前线程没有Looper,则说明没有调用Looper.prepare(),抛异常
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
//关联MessageQueue
mQueue = mLooper.mQueue;
//mCallback赋值,如果不为null,使用该callback处理消息
mCallback = callback;
//表示该Handler是否为异步的,如果为true,则由该handler发送的消息都是异步消息。异步消息是不受到通过MessageQueue.enqueueSyncBarrier(long)(后方法签名改为postSyncBarrier(long))发送的同步障碍影响。
mAsynchronous = async;
}
复制代码
Looper
looper的构造函数是private的,通过
public static void prepare() {
prepare(true);
}
//quiteAllowed - 是否允许退出消息队列(MessageQueue)
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
复制代码
来创建Looper对象,并通过ThreadLoacl将其和线程进行绑定。
还有一个比较重要的方法
public static void prepareMainLooper() {
//不能quit
prepare(false);
synchronized (Looper.class) {
//不能手动调用该方法,因为在应用进程启动时(ActivityThread.main()方法中)已经调用过
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
复制代码
在Android虚拟机的入口-ActivityThread的main函数中
public static void main(String[] args) {
//prepare mainLooper
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
if (false) {
Looper.myLooper().setMessageLogging(new
LogPrinter(Log.DEBUG, "ActivityThread"));
}
// End of event ActivityThreadMain.
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
复制代码
初始化主线程的Looper,这也间接的说明Android系统在某种程度上可以说是事件驱动的系统。
ThreadLocal
概述
ThreadLocal类用来提供线程内部的局部变量,适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。ThreadLocal实例通常来说都是 private static 类型,用于关联线程。
核心方法
1.set()-获取当前线程Thread对象中的ThreadLocalMap对象实例threadLocals,然后再将需要保存的值保存到threadLocals里面
//返回此线程局部变量的当前线程副本中的值。 如果该变量对于当前线程没有值,则首先将其初始化为调用initialValue方法返回的值。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
//返回当前thread对象中的threadLocals
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
复制代码
2.get()-获取当前线程Thread对象中的 ThreadLocalMap,然后从中读取 key 为 ThreadLocal 对象的值
//将此线程局部变量的当前线程副本设置为指定值。 大多数子类将不需要覆盖此方法,仅依赖于initialValue方法来设置线程initialValue的值。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
复制代码
map是ThreadLocal的静态内部类ThreadLocalMap,存、取等操作基本都是对这个map做处理。
ThreadLocalMap
和 HashMap 类似,底层是一个Entry<K,V>数组,继承 WeakReference ,key是弱引用类型,value是强引用类型,所以在数组中会存在陈旧条目(key == null),在get(),set()操作中,会通过expungeStaleEntry()删除陈旧的条目。
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
//key(threadLocal对象)是弱引用类型
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
//存储 Entry 对象。当发生 hash 冲突时,线性探测的开放地址法解决hash冲突。
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
//计算数组下标,与 HashMap 类似
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//存在key,替换值
if (k == key) {
e.value = value;
return;
}
//当前是陈旧条目,替换陈旧条目(key == null)
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
//新建entry,插入
tab[i] = new Entry(key, value);
int sz = ++size;
//判断是否有陈旧条目被移除 && size >= 阈值
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
// 首先扫描整个表,删除陈旧的条目。 如果size 大于 阈值的3/4 ,扩1倍容
private void rehash() {
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
if (size >= threshold - threshold / 4)
resize();
}
//获取 Entry 对象。
private Entry getEntry(ThreadLocal<?> key) {
//计算数组下标
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
//不为null直接返回
if (e != null && e.get() == key)
return e;
else
//1.可能当前是陈旧条目
//2.存在hash冲突
return getEntryAfterMiss(key, i, e);
}
//遍历数组查找对应的 Entry 对象
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
复制代码
ThreadLocal整体模型
ThreadLocal与内存泄漏
造成泄漏的行为
-
使用static的ThreadLocal,延长了ThreadLocal的生命周期
-
当使用线程池时,即当前线程不一定会退出(比如固定大小的线程池),这样将一些大对象设置到ThreadLocal中,可能会导致系统出现内存泄露(当对象不再使用时,因为引用存在,无法被回收)
内存泄漏的根源
- ThreadLocal 本身的设计是不会导致内存泄漏。ThreadLocalMap 对象被当前 Thread 对象持有,当线程退出时,会将 ThreadLocalMap 对象置 null。
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/* Aggressively null out all reference fields: see bug 4006245 */
target = null;
/* Speed the release of some of these resources */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}
复制代码
-
原因 :由于 Entry 的key是弱引用,当没有强引用存在时,当 JVM GC 时,会被回收,但 Entry 对象和对应的 value 不会被回收。当Entry和value未被主动回收时,除非当前线程死亡,否则线程对于Entry的强引用会一直存在,从而导致内存泄露。
-
建议:当希望回收对象时,可以调用remove方法。
/**
* Remove the entry for key.
*/
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
复制代码
ThreadLocal总结
-
ThreadLocal 并不解决线程间共享数据的问题
-
ThreadLocal 通过隐式的在不同线程内创建独立实例副本避免了实例线程安全的问题
-
每个线程持有一个 Map 并维护了 ThreadLocal 对象与具体实例的映射,该 Map 由于只被持有它的线程访问,故不存在线程安全以及锁的问题
-
ThreadLocalMap 的 Entry 对 ThreadLocal 的引用为弱引用,避免了 ThreadLocal 对象无法被回收的问题
-
ThreadLocalMap 的 set 方法通过调用 replaceStaleEntry 方法回收键为 null 的 Entry 对象的值(即为具体实例)以及 Entry 对象本身从而防止内存泄漏
-
ThreadLocal 适用于变量在线程间隔离且在方法间共享的场景
MessageQueue
概述
MessageQueue内部维护了一个 Message 构成的单向链表。Handler将Message发送到消息队列中,消息队列会按照一定的规则取出要执行的Message。
Looper创建时,会创建关联的MessageQueu对象
Message
Message 是一个链表,MessageQueue中的所有消息根据时间组成一个单向的链表。同时内部维护了一个回收对象池来复用。
获得message
public static Message obtain() {
// 保证线程安全
synchronized (sPoolSync) {
//优先从回收池中获取message
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
// 清除使用标志
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
复制代码
回收msg
/**
* Recycles a Message that may be in-use.
* Used internally by the MessageQueue and Looper when disposing of queued Messages.
*/
void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {
//加入回收池
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
复制代码
如何使用Handler
下面将通过使用Handler发送消息将整个流程穿起来
如何发送消息
先来讲一下同步消息屏障。
同步消息屏障通过
//向 Looper 的消息队列发布一个同步屏障。 消息处理照常进行,直到消息队列遇到已发布的同步障碍。 当遇到屏障时,队列中稍后的同步消息将被暂停(阻止执行),直到通过调用removeSyncBarrier并指定标识同步屏障的令牌来释放屏障。 此方法用于立即推迟所有后续发布的同步消息的执行,直到满足释放屏障的条件。 异步消息(参见Message.isAsynchronous免除屏障并继续照常处理。此调用必须始终与具有相同令牌的removeSyncBarrier调用匹配,以确保消息队列恢复正常操作。否则应用程序可能会悬挂
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
复制代码
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
//找插入位置
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
//插入消息队列
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
复制代码
插入消息队列,可以看到同步屏障的target是null。比如说在ViewRootImpl,在进行绘制流程之前
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
if (!mUnbufferedInputDispatch) {
scheduleConsumeBatchedInput();
}
notifyRendererOfFramePending();
pokeDrawLockIfNeeded();
}
}
复制代码
先放入了一个消息屏障,从而使得界面绘制的消息会比其他消息优先执行,避免了因为 MessageQueue 中消息太多导致绘制消息被阻塞导致画面卡顿,当绘制完成后,就会将消息屏障移除
同步消息
同步消息无论使用send(sendMessageAtFrontOfQueue、sendMessageAtTime等)还是post(postAtFrontOfQueue、postAtTime等),最终会调用
sendMessageAtTime
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
复制代码
Handler.enqueueMessage
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
//msg 和 handler绑定
msg.target = this;
//设置是否为异步消息,异步消息不受同步屏障的影响
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
复制代码
MessageQueue.enqueueMessage
boolean enqueueMessage(Message msg, long when) {
//判断msg的target变量是否为null,如果为null,则为同步屏障,而同步屏障入队则是通过postSyncBarrier()方法入队,所以msg的target一定有值
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
//判断msg的标志位,因为此时的msg应该是要入队,意味着msg的标志位应该显示还未被使用。如果显示已使用,明显有问题,直接抛异常
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
//同步锁
synchronized (this) {
//如果消息队列正在退出,回收消息
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
//设置msg的when并且修改msg的标志位
msg.markInUse();
msg.when = when;
Message p = mMessages;
//是否需要唤醒
boolean needWake;
//p==null则说明消息队列中没有消息;when == 0 表示插入到队列的头部;when < p.when 表示 msg的执行时间早于链表中的头部元素的执行时间
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
//如果消息队列处于阻塞状态,需要唤醒
needWake = mBlocked;
} else {
//在消息队列中寻找插入位置
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
//如果当前是阻塞状态并且队列的头是同步屏障并且当前是异步消息,需要唤醒
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
//到达队列的尾部或者根据when找到插入位置,跳出循环
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
//插入消息队列
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
//如果需要唤醒,则唤醒,具体请看后面的Handler中的Native详解
if (needWake) {
nativeWake(mPtr);
}
}
//告知消息入队成功
return true;
}
复制代码
消息的取出
消息的取出主要是通过Looper.loop(),这是一个死循环,不断的遍历消息队列,取出消息后分发执行消息,最后回收消息。
public static void loop() {
//获取looper
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
//从消息队列中获取msg对象
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
//分发消息
msg.target.dispatchMessage(msg);
//消息回收
msg.recycleUnchecked();
}
}
复制代码
再看 next() 方法,这个是对消息队列的遍历过程,从消息队列中取出可用的消息。
在for循环内,在1处调用的nativePollOnce方法,它与 Handler 的阻塞唤醒机制有关,我们后面再说。
2处就是通过通过同步屏障取出后续的异步消息。在3处判断消息是否到达发送的时间,若到了应该发送的时间,直接将 msg return,否则仅仅是将 nextPollTimeoutMillis 置为了剩余的时间。在4处判断若消息队列否为null或者没有到达发送的时间,去执行 idleHanlder,并且执行结束后通过将 pendingIdleHandlerCount 置为0来避免多次执行。
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
//空闲时处理的IdlerHandler的数量
int pendingIdleHandlerCount = -1; // -1 only during first iteration
//native层用到的变量 ,和阻塞欢迎机制有关,后面讲
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//1. 调用native层进行消息标示,nextPollTimeoutMillis 为0立即返回,为-1则阻塞等待。
nativePollOnce(ptr, nextPollTimeoutMillis); //android_os_MessageQueue.cpp L188
//同步锁
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//2. 判断Message是否是同步屏障,如果是则执行循环,拦截所有同步消息,直到取到第一个异步消息为止
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
// 有可执行的Message
if (msg != null) {
//3
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//从队列中取出该Message
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
// 因为要取出msg,所以msg的next不能指向链表的任何元素,所以next要置为null
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
//标记msg状态
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
//4
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
//没有要执行的 idleHandler
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// idleHandler 只执行一次,直到下一次调用MessageQueue.next() 方法。
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
复制代码
分发消息
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
//当Message存在回调方法,回调msg.callback.run()方法;
handleCallback(msg);
} else {
if (mCallback != null) {
//当Handler存在Callback成员变量时,回调方法handleMessage();
if (mCallback.handleMessage(msg)) {
return;
}
}
//Handler自身的回调方法handleMessage()
handleMessage(msg);
}
}
复制代码
消息的移除
移除单个消息
void removeMessages(Handler h, Runnable r, Object object) {
if (h == null || r == null) {
return;
}
synchronized (this) {
Message p = mMessages;
// Remove all messages at front.
while (p != null && p.target == h && p.callback == r
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}
// Remove all messages after front.
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.callback == r
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
复制代码
移除所有消息
private void removeAllFutureMessagesLocked() {
final long now = SystemClock.uptimeMillis();
Message p = mMessages;
if (p != null) {
if (p.when > now) {
removeAllMessagesLocked();
} else {
Message n;
for (;;) {
n = p.next;
if (n == null) {
return;
}
if (n.when > now) {
break;
}
p = n;
}
p.next = null;
do {
p = n;
n = p.next;
p.recycleUnchecked();
} while (n != null);
}
}
}
复制代码
阻塞唤醒机制
Handler 通过阻塞唤醒机制,来避免无消息可执行时的循环操作。当无消息可执行时,通过阻塞唤醒机制,将 Looper 的 loop 过程阻塞,直到到达指定的时间或者被唤醒,避免资源的浪费。
文件描述符
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
epoll
epoll是Linux内核为处理大批量文件描述符而设计的IO多路复用机制,它能显著提高程序在存在大量并发连接而只有少部分活跃连接情况下的系统CPU利用率。epoll之所以可以做到如此高的效率是因为它在获取就绪事件的时候,并不会遍历所有被监听的文件描述符集,而只会遍历那些被设备IO事件异步唤醒(CPU中断机制)而加入就绪链表的文件描述符集。
epoll 主要有三个方法,分别是 epoll_create、epoll_ctl、epoll_wait。
epoll_create
int epoll_create(int size);
复制代码
创建一个epoll的句柄。参数size是内核保证能够正确处理的最大文件描述符数目。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值。
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
复制代码
操作上面建立的epoll fd,例如,将刚建立的socket fd加入到epoll中让其监控,或者把 epoll正在监控的某个socket fd移出epoll,不再监控它等。这个函数用于向epoll注册一个事件,而且明确监听的事件类型。
第一个参数为epoll句柄。
第二个参数表示对这个fd监听事件操作。
-
EPOLL_CTL_ADD:注册 fd 到 epfd
-
EPOLL_CTL_DEL:从 epfd 中删除 fd
-
EPOLL_CTL_MOD:修改已注册的 fd 的监听事件
第三个参数是需要监听的fd。
第四个参数是告诉内核需要监听什么事。epoll_event 是一个结构体,里面的 events 代表了对应文件操作符的操作。
-
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
-
EPOLLOUT:表示对应的文件描述符可以写;
-
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
-
EPOLLERR:表示对应的文件描述符发生错误;
-
EPOLLHUP:表示对应的文件描述符被挂断;
-
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
-
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
复制代码
第一个参数为epoll句柄。
第二个参数为从内核中得到的事件集合。
第三个参数为events 数量,不能超过 create 时的 size。
第四个参数超时时间。
当调用了该方法后,会进入阻塞状态,等待 epfd 上的 IO 事件,若 epfd 监听的某个文件描述符发生前面指定的 event 时,就会进行回调,从而使得 epoll 被唤醒并返回需要处理的事件个数。若超过了设定的超时时间,同样也会被唤醒并返回 0 避免一直阻塞。
nativeInit
MessageQueue在创建时,会调用nativeInit()方法,在native层创建 NativeMessageQueue并返回其地址给java层mPtr,之后通过这个地址与该 NativeMessageQueue 进行通信。而在 NativeMessageQueue 创建时又会创建 Native 层下的 Looper。
android_os_MessageQueue.cpp
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//与java层类似,先从当前线程获取 looper
mLooper = Looper::getForThread();
if (mLooper == NULL) {
//创建 looper
mLooper = new Looper(false); //Looper.cpp L63
//存储到当前线程中
Looper::setForThread(mLooper);
}
}
复制代码
Looper.cpp
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
//构造唤醒事件的fd
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();//L134
}
复制代码
首先构造唤醒事件fd,然后在rebuildEpollLocked()中初始化epoll。首先构造唤醒事件fd,然后在rebuildEpollLocked()中初始化epoll。
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
// 关闭老的epoll实例
close(mEpollFd);
}
// Allocate the new epoll instance and register the wake pipe.
// 创建一个epoll实例
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
// 清空,把未使用的数据区域进行置0操作
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
//注册EPOLLIN事件
eventItem.events = EPOLLIN;
//设置fd为唤醒事件fd
eventItem.data.fd = mWakeEventFd;
//将唤醒事件(mWakeEventFd)添加到epoll实例(mEpollFd)
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
// 这里主要添加的是Input事件,如键盘、传感器输入,这里基本上是由系统负责。
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
复制代码
首先关闭了旧的 epoll 描述符,之后又调用了 epoll_create 创建了新的 epoll 描述符,然后进行了一些初始化后,将 mWakeEventFd 及 mRequests 中的 fd 都注册到了 epoll 的描述符中,注册的事件都是 EPOLLIN。
这就意味着当这些文件描述符其中一个发生了 IO 时,就会通知 epoll_wait 使其唤醒,Handler 的阻塞就是通过 epoll_wait 实现的。
nativePollOnce
在 MessageQueue 的 next 方法中,调用 nativePollOnce 方法将返回的过程阻塞到指定的时间。这是一个 native 方法,最终调用的是 Looper.cpp 的 pollOnce(timeoutMillis) 方法。
Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 先处理没有Callback的Response事件
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
// ...
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 1
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
return result;
}
复制代码
最终调用 epoll_wait 方法,并传入了我们之前在 natviePollOnce 方法传入的 timeoutMillis 。这就是我们的阻塞功能的核心实现了,调用该方法后,会一直阻塞,直到到达我们设定的时间或之前我们在 epoll 的 fd 中注册的几个 fd 发生了 IO。
nativeWake
android_os_messageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();//Looper.cpp
}
复制代码
调用到了 Native 层 Looper 的 wake() 方法。
Looper.cpp
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
// 向mWakeEventFd写入字符1
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
mWakeEventFd, strerror(errno));
}
}
}
复制代码
调用了 write 方法,对 mWakeEventFd 中写入了 1,从而使得监听该 fd 的 pollOnce 方法被唤醒,从而使得 Java 中的 next 方法继续执行。
唤醒的时机
-
调用 MessageQueue 的 quit 方法进行退出时,会进行唤醒
-
消息入队时,若插入的消息在链表最前端(最早将执行)或者有同步屏障时插入的是最前端的异步消息(最早被执行的异步消息)
-
移除同步屏障时,若消息列表为空或者同步屏障后面不是异步消息时
退出消息队列
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
//如果该方法的变量safe为true,则删除以当前时间为分界线,删除未来的所有消息,如果该方法的变量safe为false,则删除当前消息队列的所有消息
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
复制代码