一. 概述
Android消息机制本质上可以分为Java层和Native层。Java层以Message、MessageQueue、Looper、Handler这些类为主题,Native层架构也是如此。
但Java层在死循环提取消息(queue.next)中跟Native层的epoll扯上了一点关系,因此本文的叙事路线也是依靠Java层的走,来分析Native层代码。
二. nativeInit
2.1 new MessageQueue()
[-> MessageQueue.java]
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
见2.2
mPtr = nativeInit();
}
复制代码
在Java层Looper.prepare时会创建MessageQueue,会调用nativeInit进行native层MessageQueue的初始化。
2.2 nativeInit()
[-> android_os_MessageQueue.cpp]
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
//见2.3
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
复制代码
在JNI层初始化NativeMessageQueue,并通过reinterpret_cast<jlong>
将该对象转成内存地址,返回给Java层的mPtr。
2.3 NativeMessageQueue::NativeMessageQueue
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//见2.3.1
mLooper = Looper::getForThread();
if (mLooper == NULL) {
//见2.3.2
mLooper = new Looper(false);
//见2.3.4
Looper::setForThread(mLooper);
}
}
复制代码
初始化Native层NativeMessageQueue。
2.3.1 Looper::getForThread()
[-> Looper.cpp]
sp<Looper> Looper::getForThread() {
int result = pthread_once(& gTLSOnce, initTLSKey);
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
return (Looper*)pthread_getspecific(gTLSKey);
}
复制代码
函数pthread_once
通过变量gTLSOnce来进行标记,只会执行一次initTLSKey,可以理解为是一种初始化。
[-> Looper.cpp]
void Looper::initTLSKey() {
int error = pthread_key_create(&gTLSKey, threadDestructor);
LOG_ALWAYS_FATAL_IF(error != 0, "Could not allocate TLS key: %s", strerror(error));
}
复制代码
函数pthread_key_create
为TLS分配一块内存地址gTLSKey,并在该内存地址被回收时执行threadDestructor函数。
2.3.2 new Looper
[-> Looper.cpp]
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
//创建管道,并将管道fd赋值给mWakeEventFd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
//见2.3.3
rebuildEpollLocked();
}
复制代码
创建Native层Looper。其中比较需要关注的是eventfd,eventfd是Linux提供的一个接口,允许创建一个类似于管道的fd。后续当Java层消息入队之后,会通过nativeWake,往管道的写端写入字符1,epoll_wait就会醒来处理。
2.3.3 rebuildEpollLocked
[-> Looper.cpp]
void Looper::rebuildEpollLocked() {
.....
// Allocate the new epoll instance and register the wake pipe.
//使用epoll_create创建出epollFd
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
//填充events结构体,也就是我们感兴趣的事件
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
//监听可读事件
eventItem.events = EPOLLIN;
//监听的fd为刚才管道的fd
eventItem.data.fd = mWakeEventFd.get();
//epoll_ctl注册,但监听的fd发生了我们感兴趣的事件(可读事件),epoll_wait就会醒来。
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
//Request处理
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
复制代码
这里用了Linux I/O多路复用技术epoll监听管道的fd,当管道被写入数据时,epoll_wait就会醒来,管道常用来做进程间的通信(IPC)。 另外Request的问题会放在第五节讲解。
2.3.4 Looper::setForThread(mLooper)
[-> Looper.cpp]
void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
if (looper != nullptr) {
looper->incStrong((void*)threadDestructor);
}
//设置TLS的value为当前新创建好的Looper
pthread_setspecific(gTLSKey, looper.get());
if (old != nullptr) {
old->decStrong((void*)threadDestructor);
}
}
复制代码
跟Java层逻辑也一样,设置TLS的value为当前新创建好的Looper。
小结
在Java层Looper.prepare时,对应到Native层则是创建NativeMessageQueue,初始化Native层的Looper,创建一个管道,并通过epoll来进行监听,以便在Java层事件入队后,通过wake(往管道写入字符)能唤醒NativeMessageQueue处理事件。
三. queue.next() -> nativePollOnce
按照Java层的逻辑路线,当调用Looper.prepare,Handler与Looper挂钩后,线程就该调用Looper.loop进行死循环,不断通过queue.next()来提取消息来处理。
3.1 queue.next()
[-> MessageQueue.java]
Message next() {
//ptr为Native层的NativeMessageQueue的地址
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//真正阻塞在这里
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
......
}
}
复制代码
可以看到Java层真正的阻塞,就是在nativePollOnce,需要注意的是,参数nextPollTimeoutMillis是为0,代表当没有消息时,直接返回,而-1则表示无限期等待。
[-> android_os_MessageQueue_nativePollOnce.cpp]
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
//见 3.2
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
复制代码
可以看到,nativePollOnce通过JNI调用,根据之前NativeMessageQueue指针mPtr进行调用,进而往下到Native层执行pollOnce。
3.2 pollOnce
[-> Looper.cpp]
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
//与Request相关
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
//见3.3
result = pollInner(timeoutMillis);
}
}
复制代码
对参数做一下说明:
- timeoutMillis参数为超时等待时间,如果为0,则无需等待直接返回。
- outFd 用来存储发生事件的那个文件描述符。
- outEvents 用来存储outFd发生了什么事件,有可读、可写、错误和中断4种事件。
- outData 用来存储上下文数据,也就是epoll_ctl时注册的事件,当epoll_wait返回时,注册的监听事件会原封不动的返回给我们,我们可以根据fd来判断。
返回值:
- ALOOPER_POLL_WAKE,表示这次返回的是由wake函数触发,管道事件。
- ALOOPER_POLL_CALLBACK,表示某个被监听的句柄因某个原因被触发。与Request有关。
3.3 pollInner
[-> Looper.cpp]
int Looper::pollInner(int timeoutMillis) {
......
//根据Native层下次消息到来时醒来的时间mNextMessageUptime计算出messageTimeoutMillis
//比较到底是messageTimeoutMillis还是timeoutMillis离现在的时间更近,择优选择
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
......
}
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;
//通过epoll_wait监听管道事件
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//如果监听事件发生,就会执行下面的代码
mPolling = false;
mLock.lock();
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// epoll出错
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// timeout了
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
//根据监听事件的fd来判断是管道事件还是callback事件(与request有关)
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
//清空管道数据
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// 真正处理NativeMessage
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
......
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// 处理request callback
......
return result;
}
复制代码
Java层通过enqueueMessage之后,会调用wake(),在管道的写端写入字符1,从而唤醒epoll_wait来处理,从而使result != 0,能打破pollOnce死循环回到Java层处理消息。
而对于Native层来说,Native通过sendMessage的方式往管道的写端写入字符1,如果此时头部Message还没有到处理时间,设置好mNextMessageUptime,此时在pollOnce中result!=0,会直接返回到Java层的queue.next(),循环一次又回来了Native,进而又来到pollInner这里设置下次timeout时间,又继续epoll_wait了。
另外,在Native层的MessageQueue是货真价实的List。
小结
在Java层的Looper.lopp中,不断的通过queue.next来获取消息,实际上会在nativePollOnce中通过epoll_wait监听管道是否被写入事件。因为无论是Java事件还是Native事件都会通过wake往管道写入字符1来唤醒epoll_wait,进而处理事件。
四. enqueueMessage
无论是Java层还是Native层最终都会走到wake()中
[-> Looper.cpp]
void Looper::wake() {
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
复制代码
正如刚才前面所说,只是单纯的往nativeMessageQueue创建时创建的管道写端写入字符1,epoll_wait就能监听到该事件并唤醒处理事件。
五. Looper.addFd
5.1 原理
分析源码时一直都有Request,Response等,其实都跟Looper.addFd有关系。
Looper.addFd运用在很多场景,比如经典的就是Input子系统中,客户端App端是如何获取到输入事件并处理的。
[-> Looper.cpp]
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
.....//主要是做对callback验null
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d: %s",
fd, strerror(errno));
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
复制代码
在Looper中有维护一个mRequests,类型为 KeyedVector<int, Request>
,里面存着的是Fd标识以及对应的request。
**Looper.addFd实际上就是通过epoll_ctl监听传入的fd,并把该fd对应的信息,比如callback封装成request存入到mRequests这个KeyedVector中,以便后续epoll_wait醒来时可以根据fd拿到对应的request中的callback进行处理。 **
具体callback的处理依然还是在pollInner中。
[-> Looper.cpp]
int Looper::pollInner(int timeoutMillis) {
......
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
}
......
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
//如果epoll_wait监听到不是管道事件,则是通过Looper.addFd注册的callback,根据事件的fd拿到callback封装成response。
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
//Native消息处理
......
//Response的处理,即对Resquest Callback的处理
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
//如果callback函数的返回值为0,则取消监听。
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
}
复制代码
5.2 Input子系统中的运用
在Input子系统中,Server和Client通信是采用Socket通信,Server端通过SocketPair创建一对相互连接的fd,并通过InputChannel封装,随后分别流转到Client和Server中。Client通过Looper.addFd接受输入事件,而Server则监听事件是否完成。
Client(App)
[-> android_view_InputEventReceiver.cpp]
static jlong nativeInit(JNIEnv* env, jclass clazz, jobject receiverWeak,
jobject inputChannelObj, jobject messageQueueObj) {
sp<InputChannel> inputChannel = android_view_InputChannel_getInputChannel(env,
inputChannelObj);
//根据InputChannel创建messageQueue
sp<MessageQueue> messageQueue = android_os_MessageQueue_getMessageQueue(env, messageQueueObj);
//根据InputChannel创建NativeInputEventReceiver
sp<NativeInputEventReceiver> receiver = new NativeInputEventReceiver(env,
receiverWeak, inputChannel, messageQueue);
//执行initialize,函数内会执行setFdEvents
status_t status = receiver->initialize();
......
receiver->incStrong(gInputEventReceiverClassInfo.clazz); // retain a
return reinterpret_cast<jlong>(receiver.get());
}
void NativeInputEventReceiver::setFdEvents(int events) {
if (mFdEvents != events) {
mFdEvents = events;
int fd = mInputConsumer.getChannel()->getFd();
if (events) {
//重点!
mMessageQueue->getLooper()->addFd(fd, 0, events, this, nullptr);
} else {
mMessageQueue->getLooper()->removeFd(fd);
}
}
}
复制代码
Client端在注册InputChannel时,通过Looper.addFd监听该fd的事件,以便能在InputMessage到来时,封装成Request,在Looper的pollInner中进行处理。
Server(System_Server)
[-> InputDispatcher.cpp]
status_t InputDispatcher::registerInputChannel(const sp<InputChannel>& inputChannel) {
{ // acquire lock
std::scoped_lock _l(mLock);
//为传入的inputChannel创建一个Connection对象并对其进行封装
sp<Connection> existingConnection = getConnectionLocked(inputChannel->getConnectionToken());
if (existingConnection != nullptr) {
ALOGW("Attempted to register already registered input channel '%s'",
inputChannel->getName().c_str());
return BAD_VALUE;
}
sp<Connection> connection = new Connection(inputChannel, false /*monitor*/, mIdGenerator);
//获取inputChannel的描述符,将其对应的connection对象保存。
int fd = inputChannel->getFd();
mConnectionsByFd[fd] = connection;
mInputChannelsByToken[inputChannel->getConnectionToken()] = inputChannel;
//epoll_wait又来了,监听该描述符,因此Looper对象具有监听文件描述符的可读性事件的能力,在此注册InputChannel可读性事件
//并在事件到来时执行handleReceiveCallback
mLooper->addFd(fd, 0, ALOOPER_EVENT_INPUT, handleReceiveCallback, this);
} // release lock
// Wake the looper because some connections have changed.
mLooper->wake();
return OK;
}
复制代码
Server端通过获取Server InputChannel(即SocketPair)的Fd,通过Looper.addFd来监听该Fd是否有可读事件,如果有则执行handleReceiveCallback,在这个方法里会把事件从waitQueue中移除。