Android消息机制 3 – Native层源码分析

一. 概述

Android消息机制本质上可以分为Java层和Native层。Java层以Message、MessageQueue、Looper、Handler这些类为主题,Native层架构也是如此。
但Java层在死循环提取消息(queue.next)中跟Native层的epoll扯上了一点关系,因此本文的叙事路线也是依靠Java层的走,来分析Native层代码。

二. nativeInit

2.1 new MessageQueue()

[-> MessageQueue.java]

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        见2.2
        mPtr = nativeInit();
    }
复制代码

在Java层Looper.prepare时会创建MessageQueue,会调用nativeInit进行native层MessageQueue的初始化。

2.2 nativeInit()

[-> android_os_MessageQueue.cpp]

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    //见2.3
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}
复制代码

在JNI层初始化NativeMessageQueue,并通过reinterpret_cast<jlong>将该对象转成内存地址,返回给Java层的mPtr。

2.3 NativeMessageQueue::NativeMessageQueue

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    //见2.3.1
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        //见2.3.2
        mLooper = new Looper(false);
        //见2.3.4
        Looper::setForThread(mLooper);
    }
}
复制代码

初始化Native层NativeMessageQueue。

2.3.1 Looper::getForThread()

[-> Looper.cpp]

sp<Looper> Looper::getForThread() {
    int result = pthread_once(& gTLSOnce, initTLSKey);
    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");

    return (Looper*)pthread_getspecific(gTLSKey);
}
复制代码

函数pthread_once通过变量gTLSOnce来进行标记,只会执行一次initTLSKey,可以理解为是一种初始化。

[-> Looper.cpp]

void Looper::initTLSKey() {
    int error = pthread_key_create(&gTLSKey, threadDestructor);
    LOG_ALWAYS_FATAL_IF(error != 0, "Could not allocate TLS key: %s", strerror(error));
}
复制代码

函数pthread_key_create为TLS分配一块内存地址gTLSKey,并在该内存地址被回收时执行threadDestructor函数。

2.3.2 new Looper

[-> Looper.cpp]

Looper::Looper(bool allowNonCallbacks)
    : mAllowNonCallbacks(allowNonCallbacks),
      mSendingMessage(false),
      mPolling(false),
      mEpollRebuildRequired(false),
      mNextRequestSeq(0),
      mResponseIndex(0),
      mNextMessageUptime(LLONG_MAX) {
    //创建管道,并将管道fd赋值给mWakeEventFd
    mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
    LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));

    AutoMutex _l(mLock);
    //见2.3.3
    rebuildEpollLocked();
}

复制代码

创建Native层Looper。其中比较需要关注的是eventfd,eventfd是Linux提供的一个接口,允许创建一个类似于管道的fd。后续当Java层消息入队之后,会通过nativeWake,往管道的写端写入字符1,epoll_wait就会醒来处理。

2.3.3 rebuildEpollLocked

[-> Looper.cpp]

void Looper::rebuildEpollLocked() {
     .....
    // Allocate the new epoll instance and register the wake pipe.
    //使用epoll_create创建出epollFd
    mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
    //填充events结构体,也就是我们感兴趣的事件
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    //监听可读事件
    eventItem.events = EPOLLIN;
    //监听的fd为刚才管道的fd
    eventItem.data.fd = mWakeEventFd.get();
    //epoll_ctl注册,但监听的fd发生了我们感兴趣的事件(可读事件),epoll_wait就会醒来。
    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));
    //Request处理
    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
                  request.fd, strerror(errno));
        }
    }
}
复制代码

这里用了Linux I/O多路复用技术epoll监听管道的fd,当管道被写入数据时,epoll_wait就会醒来,管道常用来做进程间的通信(IPC)。 另外Request的问题会放在第五节讲解。

2.3.4 Looper::setForThread(mLooper)

[-> Looper.cpp]

void Looper::setForThread(const sp<Looper>& looper) {
    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS

    if (looper != nullptr) {
        looper->incStrong((void*)threadDestructor);
    }
    //设置TLS的value为当前新创建好的Looper
    pthread_setspecific(gTLSKey, looper.get());

    if (old != nullptr) {
        old->decStrong((void*)threadDestructor);
    }
}
复制代码

跟Java层逻辑也一样,设置TLS的value为当前新创建好的Looper。

小结

在Java层Looper.prepare时,对应到Native层则是创建NativeMessageQueue,初始化Native层的Looper,创建一个管道,并通过epoll来进行监听,以便在Java层事件入队后,通过wake(往管道写入字符)能唤醒NativeMessageQueue处理事件。

三. queue.next() -> nativePollOnce

按照Java层的逻辑路线,当调用Looper.prepare,Handler与Looper挂钩后,线程就该调用Looper.loop进行死循环,不断通过queue.next()来提取消息来处理。

3.1 queue.next()

[-> MessageQueue.java]

 Message next() {
     //ptr为Native层的NativeMessageQueue的地址
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            //真正阻塞在这里
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
               ......
        }
    }
复制代码

可以看到Java层真正的阻塞,就是在nativePollOnce,需要注意的是,参数nextPollTimeoutMillis是为0,代表当没有消息时,直接返回,而-1则表示无限期等待。

[-> android_os_MessageQueue_nativePollOnce.cpp]

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    //见 3.2
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
复制代码

可以看到,nativePollOnce通过JNI调用,根据之前NativeMessageQueue指针mPtr进行调用,进而往下到Native层执行pollOnce。

3.2 pollOnce

[-> Looper.cpp]

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        //与Request相关
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != nullptr) *outFd = fd;
                if (outEvents != nullptr) *outEvents = events;
                if (outData != nullptr) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != nullptr) *outFd = 0;
            if (outEvents != nullptr) *outEvents = 0;
            if (outData != nullptr) *outData = nullptr;
            return result;
        }
        //见3.3
        result = pollInner(timeoutMillis);
    }
}
复制代码

对参数做一下说明:

  • timeoutMillis参数为超时等待时间,如果为0,则无需等待直接返回。
  • outFd 用来存储发生事件的那个文件描述符。
  • outEvents 用来存储outFd发生了什么事件,有可读、可写、错误和中断4种事件。
  • outData 用来存储上下文数据,也就是epoll_ctl时注册的事件,当epoll_wait返回时,注册的监听事件会原封不动的返回给我们,我们可以根据fd来判断。

返回值:

  • ALOOPER_POLL_WAKE,表示这次返回的是由wake函数触发,管道事件。
  • ALOOPER_POLL_CALLBACK,表示某个被监听的句柄因某个原因被触发。与Request有关。

3.3 pollInner

[-> Looper.cpp]

int Looper::pollInner(int timeoutMillis) {
    ......
    //根据Native层下次消息到来时醒来的时间mNextMessageUptime计算出messageTimeoutMillis
    //比较到底是messageTimeoutMillis还是timeoutMillis离现在的时间更近,择优选择
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
        ......
    }
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    mPolling = true;
    //通过epoll_wait监听管道事件
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    //如果监听事件发生,就会执行下面的代码

    mPolling = false;

    mLock.lock();

    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // epoll出错
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // timeout了
    if (eventCount == 0) {
        result = POLL_TIMEOUT;
        goto Done;
    }

    //根据监听事件的fd来判断是管道事件还是callback事件(与request有关)
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
            if (epollEvents & EPOLLIN) {
                //清空管道数据
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

    // 真正处理NativeMessage
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                ......
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // 处理request callback
   ......
    return result;
}
复制代码

Java层通过enqueueMessage之后,会调用wake(),在管道的写端写入字符1,从而唤醒epoll_wait来处理,从而使result != 0,能打破pollOnce死循环回到Java层处理消息。

而对于Native层来说,Native通过sendMessage的方式往管道的写端写入字符1,如果此时头部Message还没有到处理时间,设置好mNextMessageUptime,此时在pollOnce中result!=0,会直接返回到Java层的queue.next(),循环一次又回来了Native,进而又来到pollInner这里设置下次timeout时间,又继续epoll_wait了。

另外,在Native层的MessageQueue是货真价实的List。

小结

在Java层的Looper.lopp中,不断的通过queue.next来获取消息,实际上会在nativePollOnce中通过epoll_wait监听管道是否被写入事件。因为无论是Java事件还是Native事件都会通过wake往管道写入字符1来唤醒epoll_wait,进而处理事件。

四. enqueueMessage

无论是Java层还是Native层最终都会走到wake()中
[-> Looper.cpp]

void Looper::wake() {
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
                             mWakeEventFd.get(), nWrite, strerror(errno));
        }
    }
}
复制代码

正如刚才前面所说,只是单纯的往nativeMessageQueue创建时创建的管道写端写入字符1,epoll_wait就能监听到该事件并唤醒处理事件。

五. Looper.addFd

5.1 原理

分析源码时一直都有Request,Response等,其实都跟Looper.addFd有关系。

Looper.addFd运用在很多场景,比如经典的就是Input子系统中,客户端App端是如何获取到输入事件并处理的。

[-> Looper.cpp]

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
   .....//主要是做对callback验null
    { // acquire lock
        AutoMutex _l(mLock);

        Request request;
        request.fd = fd;
        request.ident = ident;
        request.events = events;
        request.seq = mNextRequestSeq++;
        request.callback = callback;
        request.data = data;
        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
            if (epollResult < 0) {
                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
                return -1;
            }
            mRequests.add(fd, request);
        } else {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
            if (epollResult < 0) {
                if (errno == ENOENT) {
                    epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
                    if (epollResult < 0) {
                        ALOGE("Error modifying or adding epoll events for fd %d: %s",
                                fd, strerror(errno));
                        return -1;
                    }
                    scheduleEpollRebuildLocked();
                } else {
                    ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
                    return -1;
                }
            }
            mRequests.replaceValueAt(requestIndex, request);
        }
    } // release lock
    return 1;
}
复制代码

在Looper中有维护一个mRequests,类型为 KeyedVector<int, Request>,里面存着的是Fd标识以及对应的request。

**Looper.addFd实际上就是通过epoll_ctl监听传入的fd,并把该fd对应的信息,比如callback封装成request存入到mRequests这个KeyedVector中,以便后续epoll_wait醒来时可以根据fd拿到对应的request中的callback进行处理。 **

具体callback的处理依然还是在pollInner中。

[-> Looper.cpp]

int Looper::pollInner(int timeoutMillis) {
......
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
}
......
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            //如果epoll_wait监听到不是管道事件,则是通过Looper.addFd注册的callback,根据事件的fd拿到callback封装成response。
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    //Native消息处理
    ......
    //Response的处理,即对Resquest Callback的处理
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            //如果callback函数的返回值为0,则取消监听。
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    
}
复制代码

5.2 Input子系统中的运用

在Input子系统中,Server和Client通信是采用Socket通信,Server端通过SocketPair创建一对相互连接的fd,并通过InputChannel封装,随后分别流转到Client和Server中。Client通过Looper.addFd接受输入事件,而Server则监听事件是否完成。

Client(App)
[-> android_view_InputEventReceiver.cpp]

static jlong nativeInit(JNIEnv* env, jclass clazz, jobject receiverWeak,
        jobject inputChannelObj, jobject messageQueueObj) {
    sp<InputChannel> inputChannel = android_view_InputChannel_getInputChannel(env,
            inputChannelObj);
    //根据InputChannel创建messageQueue
    sp<MessageQueue> messageQueue = android_os_MessageQueue_getMessageQueue(env, messageQueueObj);
    //根据InputChannel创建NativeInputEventReceiver
    sp<NativeInputEventReceiver> receiver = new NativeInputEventReceiver(env,
            receiverWeak, inputChannel, messageQueue);
    //执行initialize,函数内会执行setFdEvents
    status_t status = receiver->initialize();
     ......
    receiver->incStrong(gInputEventReceiverClassInfo.clazz); // retain a
    return reinterpret_cast<jlong>(receiver.get());
}

void NativeInputEventReceiver::setFdEvents(int events) {
    if (mFdEvents != events) {
        mFdEvents = events;
        int fd = mInputConsumer.getChannel()->getFd();
        if (events) {
            //重点!
            mMessageQueue->getLooper()->addFd(fd, 0, events, this, nullptr);
        } else {
            mMessageQueue->getLooper()->removeFd(fd);
        }
    }
}
复制代码

Client端在注册InputChannel时,通过Looper.addFd监听该fd的事件,以便能在InputMessage到来时,封装成Request,在Looper的pollInner中进行处理。

Server(System_Server)

[-> InputDispatcher.cpp]

status_t InputDispatcher::registerInputChannel(const sp<InputChannel>& inputChannel) {
    { // acquire lock
        std::scoped_lock _l(mLock);
        //为传入的inputChannel创建一个Connection对象并对其进行封装
        sp<Connection> existingConnection = getConnectionLocked(inputChannel->getConnectionToken());
        if (existingConnection != nullptr) {
            ALOGW("Attempted to register already registered input channel '%s'",
                  inputChannel->getName().c_str());
            return BAD_VALUE;
        }
        sp<Connection> connection = new Connection(inputChannel, false /*monitor*/, mIdGenerator);
        //获取inputChannel的描述符,将其对应的connection对象保存。
        int fd = inputChannel->getFd();
        mConnectionsByFd[fd] = connection;
        mInputChannelsByToken[inputChannel->getConnectionToken()] = inputChannel;
        //epoll_wait又来了,监听该描述符,因此Looper对象具有监听文件描述符的可读性事件的能力,在此注册InputChannel可读性事件
        //并在事件到来时执行handleReceiveCallback
        mLooper->addFd(fd, 0, ALOOPER_EVENT_INPUT, handleReceiveCallback, this);
    } // release lock
    // Wake the looper because some connections have changed.
    mLooper->wake();
    return OK;
}
复制代码

Server端通过获取Server InputChannel(即SocketPair)的Fd,通过Looper.addFd来监听该Fd是否有可读事件,如果有则执行handleReceiveCallback,在这个方法里会把事件从waitQueue中移除。

六. 补充

Linux Fd
C++ pthread_key_create

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享