Asop 之 消息处理机制

葛泽续,2012年2月加入去哪儿网,多次参与 Android 客户端的重构,插件化开发,客户端安全攻防,现负责国际酒店抓取系统架构升级维护。


Android 应用程序是通过消息来驱动的,系统为每一个应用程序维护一个消息队例,应用程序的主线程不断地从这个消息队例中获取消息(Looper),然后对这些消息进行处理(Handler),这样就实现了通过消息来驱动应用程序的执行。先了解一下涉及到的几个概念:

  • Message

消息(Message)代表一个行为(what)或者一串动作(Runnable),每一个消息在加入消息队列时,都有明确的目标(Handler)。

  • MessageQueue

以队列的形式存放消息对象,其内部结构是以链表的形式存储消息。对外提供插入和删除操作。

  • Looper

****Looper 是循环的意思,它负责从 MessageQueue 中循环的取出 Message 然后交给目标(Handler)处理。

  • Handler

****消息的真正处理者,具备获取消息、发送消息、处理消息、移除消息等功能。

  • ThreadLocal

作用是为了线程隔离,内部实现相当于Map以当前线程为key,存入的值作为 value。

Looper 不断从 MessageQueue 中取出一个 Message,然后交给其对应的 Handler 处理。

image.png
我们平时接触到的 Looper、Message、Handler 都是用 JAVA 实现的,Android 是一个基于 Linux 的系统,底层用C、C++实现的,而且还有 NDK 的存在,Android 消息驱动的模型为了消息的及时性、高效性,在 Native 层也设计了 Java 层对应的类如 Looper、MessageQueue 等。

在 ActivityThread 的 main 函数里面调用主线程的 loop 方法开启消息循环监听,这个 loop 方法会一直运行,伴随应用的整个生命周期。

以下是 ActitivyThread 的 main 的实现:

public static void main(String[] args) {    
    Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain");
    // CloseGuard defaults to true and can be quite spammy.  We    
    // disable it here, but selectively enable it later (via    
    // StrictMode) on debug builds, but using DropBox, not logs.    
    CloseGuard.setEnabled(false);
    
    Environment.initForCurrentUser();
    
    // Set the reporter for event logging in libcore    
    EventLogger.setReporter(new EventLoggingReporter());
    
    // Make sure TrustedCertificateStore looks in the right place for CA certificates    
    final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId());    
    TrustedCertificateStore.setDefaultUserDirectory(configDir);
    
    Process.setArgV0("<pre-initialized>");
    
    Looper.prepareMainLooper();
    
    ActivityThread thread = new ActivityThread();    
    thread.attach(false);
    
    if (sMainThreadHandler == null) {        
        sMainThreadHandler = thread.getHandler();    
    }
    
    if (false) {        
        Looper.myLooper().setMessageLogging(new               
            LogPrinter(Log.DEBUG, "ActivityThread"));   
    }
    
    // End of event ActivityThreadMain.    
    Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);   
    Looper.loop();
    
    throw new RuntimeException("Main thread loop unexpectedly exited");
}
复制代码

prepareMainLooper 做的事情其实就是在线程中创建一个 Looper 对象:

/** 
 * Initialize the current thread as a looper, marking it as an 
 * application's main looper. The main looper for your application 
 * is created by the Android environment, so you should never need 
 * to call this function yourself.  See also: {@link #prepare()} 
 */
public static void prepareMainLooper() {    
    prepare(false);    
    synchronized (Looper.class) {       
        if (sMainLooper != null) {        
            throw new IllegalStateException("The main Looper has already been prepared.");       
        }       
        sMainLooper = myLooper();  
    }
}
复制代码

先调用 prepare 来进行主要成员变量的初始化,传传入参数 false 最终会传到 MessageQueue 的构造函数中。初始化完成后,接着调用 myLooper 方法将返回值赋给成员变量 sMainLooper,它也是一个 Looper 类型的成员变量,接着再来看一下 prepare 方法的实现,源码如下:

private static void prepare(boolean quitAllowed) {    
    if (sThreadLocal.get() != null) {       
        throw new RuntimeException("Only one Looper may be created per thread");    
    }  
    sThreadLocal.set(new Looper(quitAllowed));
}
复制代码

这个 Looper 对象是存放在 sThreadLocal 成员变量里面的。线程创建 Looper 对象的工作是由 prepare 函数来完成的,而在创建 Looper 对象的时候,会同时创建一个消息队列 MessageQueue,保存在 Looper 的成员变量 mQueue 中,后续消息就是存放在这个队列中去。消息队列在 Android 应用程序消息处理机制中最重要的组件,以下是它的创建过程:

public class MessageQueue {  
  ......    
    
   // True if the message queue can be quit.
    
      private final boolean mQuitAllowed;      
      
      private int mPtr; // used by native code   private native void nativeInit();   
      
      MessageQueue(boolean quitAllowed) {        
        mQuitAllowed = quitAllowed;       
        mPtr = nativeInit(); 
    }  
  ......
}
复制代码

它的初始化工作都交给 JNI 方法 nativeInit 实现:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {    
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();    
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");        
        return 0;    
    }
    
    nativeMessageQueue->incStrong(env);   
    return reinterpret_cast<jlong>(nativeMessageQueue);
}
复制代码

在 JNI 中,也相应地创建了一个消息队列 NativeMessageQueue,接着把 C++ 里面的这个指针转成 jlong 类型返回给 java 层,赋值给前面我们在 Java 层创建的 MessageQueue 对象的 mPtr 成员变量。继续看 NativeMessageQueue 的创建过程:

NativeMessageQueue::NativeMessageQueue() :        
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {   
    mLooper = Looper::getForThread();   
    if (mLooper == NULL) {       
        mLooper = new Looper(false);      
        Looper::setForThread(mLooper);  
    }
}
复制代码

它主要就是在内部创建了一个 Looper 对象,这里的 Looper 跟 java 层的是对应的。继续看 Looper 对象的创建过程:

Looper::Looper(bool allowNonCallbacks) :       
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),     
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),      
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {   
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);   
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));
    
    AutoMutex _l(mLock);  
    rebuildEpollLocked();
}
复制代码

该方法中首先调用 eventfd 系统函数,该函数返回一个文件描述符,与打开的其他文件一样,可以进行读写操作。然后调用 rebuildEpollLocked 函数继续进行后续的初始化,继续看 rebuildEpollLocked:

void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.   
    if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS        
        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif        
        close(mEpollFd);    
    }
    
    // Allocate the new epoll instance and register the wake pipe.   
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);  
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
    
    struct epoll_event eventItem;   
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union     
    eventItem.events = EPOLLIN;   
    eventItem.data.fd = mWakeEventFd;  
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);   
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));
   
    for (size_t i = 0; i < mRequests.size(); i++) {       
        const Request& request = mRequests.valueAt(i);      
        struct epoll_event eventItem;      
        request.initEventItem(&eventItem);
      
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);        
        if (epollResult < 0) {   
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",                
                    request.fd, strerror(errno));        
        }   
    }
}
复制代码

为我们的主线程创建 Epoll 循环的结构体。该方法执行完,我们的 epoll 节点添加进去之后,那么初始化的工作就结束了。framework 中为我们创建好的 java 层的 Looper、MessageQueue 和 native 层的 Looper、NativeMessageQueue 都已经准备好了,epoll 机制相应的节点也注册好了。

下面我们接着来分析 ActivityThread 类的 main 方法中的 Looper.loop()的实现。先调用 myLooper 方法来判断前面的准备工作是否完成,如果准备工作都出错,那就直接抛出运行时异常。接着一个 for (;;) 无限循环取消息。queue.next()取下一个消息,该方法可能会阻塞,如果取到的 msg 为空,则说明消息循环要退出了,则直接 return。取到下一个消息 msg 之后,就调用 msg.target.dispatchMessage(msg) 将它分发给目标进行处理,msg 的成员变量 target 的类型为 Handler,它是在我们往当前的 MessageQueue 消息队列上发送消息时指定的,分发完成后调用 recycleUnchecked() 来将当前的 msg 回收掉。Message 对象的构建也是使用了一个缓存池,因为消息循环是非常频繁的,所以使用缓存池可以有效的减少无用内存的分配,非常必要。接下来重点看一下 queue.next() 是如何取到下一条消息的,该方法的实现在 MessageQueue 类中,方法的源码如下:

Message next() {  
    // Return here if the message loop has already quit and been disposed.   
    // This can happen if the application tries to restart a looper after quit 
    // which is not supported.  
    final long ptr = mPtr;  
    if (ptr == 0) {      
        return null;    
    }
    
    int pendingIdleHandlerCount = -1; // -1 only during first iteration 
    int nextPollTimeoutMillis = 0;  
    for (;;) {      
        if (nextPollTimeoutMillis != 0) {        
            Binder.flushPendingCommands();    
        }
        
        nativePollOnce(ptr, nextPollTimeoutMillis);
       
        synchronized (this) {       
            // Try to retrieve the next message.  Return if found.    
            final long now = SystemClock.uptimeMillis();        
            Message prevMsg = null;    
            Message msg = mMessages;        
            if (msg != null && msg.target == null) {        
                // Stalled by a barrier.  Find the next asynchronous message in the queue.                     
                do {
                    prevMsg = msg;                 
                    msg = msg.next;             
                } while (msg != null && !msg.isAsynchronous());      
            }          
            if (msg != null) {       
                if (now < msg.when) {               
                    // Next message is not ready.  Set a timeout to wake up when it is ready.                    
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);             
                } else {             
                    // Got a message.             
                    mBlocked = false;                  
                    if (prevMsg != null) {             
                        prevMsg.next = msg.next;                  
                    } else {                      
                        mMessages = msg.next;          
                    }                  
                    msg.next = null;                 
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);                    
                    msg.markInUse();                  
                    return msg;     
                }          
            } else {            
                // No more messages.          
                nextPollTimeoutMillis = -1;      
            }
            
            // Process the quit message now that all pending messages have been handled.          
            if (mQuitting) {            
                dispose();               
                return null;        
            }
            
            // If first time idle, then get the number of idlers to run.            
            // Idle handles only run if the queue is empty or if the first message            
            // in the queue (possibly a barrier) is due to be handled in the future.            
            if (pendingIdleHandlerCount < 0                
                    && (mMessages == null || now < mMessages.when)) {                
                pendingIdleHandlerCount = mIdleHandlers.size();         
            }         
            if (pendingIdleHandlerCount <= 0) {             
                // No idle handlers to run.  Loop and wait some more.           
                mBlocked = true;             
                continue;        
            }
           
            if (mPendingIdleHandlers == null) {          
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];          
            }         
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);      
        }
        
        // Run the idle handlers.       
        // We only ever reach this code block during the first iteration.        
        for (int i = 0; i < pendingIdleHandlerCount; i++) {            
            final IdleHandler idler = mPendingIdleHandlers[i];            
            mPendingIdleHandlers[i] = null; // release the reference to the handler
            
            boolean keep = false;           
            try {              
                keep = idler.queueIdle();          
            } catch (Throwable t) {             
                Log.wtf(TAG, "IdleHandler threw exception", t);         
            }
            
            if (!keep) {             
                synchronized (this) {                
                    mIdleHandlers.remove(idler);          
                }         
            }       
        }
        
        // Reset the idle handler count to 0 so we do not run them again.        
        pendingIdleHandlerCount = 0;
        
        // While calling an idle handler, a new message could have been delivered        
        // so go back and look again for a pending message without waiting.        
        nextPollTimeoutMillis = 0;   
    }
}
复制代码

主要是一个 for (;;) 无限循环,所有发送过来的消息最终都会存储在成员变量 mMessages 上,它的类型为 Message,Message 类又有一个类型为 Message 的成员变量 next,相当于 Message 类就是单向链表,所以我们发送过来的消息会不断的往上挂,从 mMessages 上取下一个消息 msg,如果当前消息时间未到,那么就需要休眠,休眠的时间长短取决于 nextPollTimeoutMillis;否则处理该消息,则将该消息返回给 Looper 类的 loop 方法中进行处理。下面看一下 nativePollOnce 的实现:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,        
        jlong ptr, jint timeoutMillis) {  
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);    
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
复制代码

取到在 native 层创建的 NativeMessageQueue,然后调用它的 pollOnce 继续处理,pollOnce 方法的源码如下:

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;    
    mPollObj = pollObj;   
    mLooper->pollOnce(timeoutMillis);  
    mPollObj = NULL;  
    mPollEnv = NULL;
    
    if (mExceptionObj) {        
        env->Throw(mExceptionObj);      
        env->DeleteLocalRef(mExceptionObj);      
        mExceptionObj = NULL;    
    }
}
复制代码

调用 native 层的 Looper 类的 pollOnce 继续处理,源码如下:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 
    int result = 0;   
    for (;;) {      
        while (mResponseIndex < mResponses.size()) {        
            const Response& response = mResponses.itemAt(mResponseIndex++);        
            int ident = response.request.ident;           
            if (ident >= 0) {              
                int fd = response.request.fd;             
                int events = response.events;               
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE                
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "                        
                    "fd=%d, events=0x%x, data=%p",                     
                    this, ident, fd, events, data);
#endif               
                if (outFd != NULL) *outFd = fd;              
                if (outEvents != NULL) *outEvents = events;        
                if (outData != NULL) *outData = data;            
                return ident;           
            }      
        }
        
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE         
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif          
            if (outFd != NULL) *outFd = 0;         
            if (outEvents != NULL) *outEvents = 0;        
            if (outData != NULL) *outData = NULL;       
            return result;      
        }
        
        result = pollInner(timeoutMillis);   
    }
}
复制代码

调用 pollInner 进一步处理,pollInner 方法的源码如下:

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
                this, mNextMessageUptime - now, timeoutMillis);
#endif
    }

    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}
复制代码

该方法的参数 timeoutMillis 就是下一个消息的等待时间,在调用 epollwait 系统函数时,就会将当前的线程休眠。休眠时间到之后,epollwait 就会返回,再次检查消息队列时,就会有符合要求的消息了。

END.png

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享