源码基于 Android S AOSP,源码在线查看地址: cs.android.com/android/pla…
线程间通信在 Android 系统中应用十分广泛,本文是一个系列文章,主要梳理了Android 中 Java、native 层的线程间通信机制。
- Android 消息机制 – Java 层
- Android 消息机制 – Java 层 – MessagePool
- Android 消息机制 – Native 层实现 – 以 InputReader 与 InputDispatcher 间通信为背景
- Android 消息机制 – Java 层依赖 native 层实现 wait
前两篇文章主要总结了一下Java层是如何实现Android线程间通信的,本篇文章主要梳理一下native层的实现
本文是以system_server中的InputReader与InputDispatcher线程间的通信为背景。通过分析源码了解实现的逻辑。
InputReader 与 InputDispatcher 初始化工作
IMS启动过程中,会调用frameworks/base/services/core/jni/com_android_server_input_InputManagerService.cpp
中nativeInit
方法初始化native层
InputManagerService.InputManagerService
frameworks/base/services/core/java/com/android/server/input/InputManagerService.java
public InputManagerService(Context context) {
this.mContext = context;
// 创建 Handler
this.mHandler = new InputManagerHandler(DisplayThread.get().getLooper());
mStaticAssociations = loadStaticInputPortAssociations();
mUseDevInputEventForAudioJack =
context.getResources().getBoolean(R.bool.config_useDevInputEventForAudioJack);
Slog.i(TAG, "Initializing input manager, mUseDevInputEventForAudioJack="
+ mUseDevInputEventForAudioJack);
// 初始化 native 层,携带 MessageQueue
mPtr = nativeInit(this, mContext, mHandler.getLooper().getQueue()/* 拿到 MessageQueue */);
String doubleTouchGestureEnablePath = context.getResources().getString(
R.string.config_doubleTouchGestureEnableFile);
mDoubleTouchGestureEnableFile = TextUtils.isEmpty(doubleTouchGestureEnablePath) ? null :
new File(doubleTouchGestureEnablePath);
LocalServices.addService(InputManagerInternal.class, new LocalService());
}
复制代码
jni.nativeInit
frameworks/base/services/core/jni/com_android_server_input_InputManagerService.cpp
static jlong nativeInit(JNIEnv* env, jclass /* clazz */,
jobject serviceObj, jobject contextObj, jobject messageQueueObj) {
// 根据传入的 messageQueueObj 参数获取到 NativeMessageQueue
sp<MessageQueue> messageQueue = android_os_MessageQueue_getMessageQueue(env, messageQueueObj);
if (messageQueue == nullptr) {
jniThrowRuntimeException(env, "MessageQueue is not initialized.");
return 0;
}
// 创建 NativeInputManager,同时传入 NativeMessageQueue 中的 looper
NativeInputManager* im = new NativeInputManager(contextObj, serviceObj,
messageQueue->getLooper());
im->incStrong(0);
return reinterpret_cast<jlong>(im);
}
复制代码
NativeInputManager.NativeInputManager
frameworks/base/services/core/jni/com_android_server_input_InputManagerService.cpp
NativeInputManager::NativeInputManager(jobject contextObj,
jobject serviceObj, const sp<Looper>& looper) :
mLooper(looper), mInteractive(true) { // 将 looper 保存到成员变量
JNIEnv* env = jniEnv();
mServiceObj = env->NewGlobalRef(serviceObj);
{
AutoMutex _l(mLock);
mLocked.systemUiLightsOut = false;
mLocked.pointerSpeed = 0;
mLocked.pointerGesturesEnabled = true;
mLocked.showTouches = false;
mLocked.pointerDisplayId = ADISPLAY_ID_DEFAULT;
}
mInteractive = true;
// 创建 InputManager
InputManager* im = new InputManager(this, this);
mInputManager = im;
defaultServiceManager()->addService(String16("inputflinger"), im);
}
复制代码
InputManager.InputManager
frameworks/native/services/inputflinger/InputManager.cpp
InputManager::InputManager(
const sp<InputReaderPolicyInterface>& readerPolicy,
const sp<InputDispatcherPolicyInterface>& dispatcherPolicy) {
// 创建 InputDispatcher
mDispatcher = createInputDispatcher(dispatcherPolicy);
mClassifier = new InputClassifier(mDispatcher);
// 创建 InputReader,将 InputDispatcher 对象传入
mReader = createInputReader(readerPolicy, mClassifier);
}
复制代码
createInputDispatcher
frameworks/native/services/inputflinger/dispatcher/InputDispatcherFactory.cpp
sp<InputDispatcherInterface> createInputDispatcher(
const sp<InputDispatcherPolicyInterface>& policy) {
return new android::inputdispatcher::InputDispatcher(policy);
}
复制代码
InputDispatcher.InputDispatcher
frameworks/native/services/inputflinger/dispatcher/InputDispatcher.cpp
InputDispatcher::InputDispatcher(const sp<InputDispatcherPolicyInterface>& policy)
: mPolicy(policy),
mPendingEvent(nullptr),
mLastDropReason(DropReason::NOT_DROPPED),
mIdGenerator(IdGenerator::Source::INPUT_DISPATCHER),
mAppSwitchSawKeyDown(false),
mAppSwitchDueTime(LONG_LONG_MAX),
mNextUnblockedEvent(nullptr),
mDispatchEnabled(false),
mDispatchFrozen(false),
mInputFilterEnabled(false),
// mInTouchMode will be initialized by the WindowManager to the default device config.
// To avoid leaking stack in case that call never comes, and for tests,
// initialize it here anyways.
mInTouchMode(true),
mMaximumObscuringOpacityForTouch(1.0f),
mFocusedDisplayId(ADISPLAY_ID_DEFAULT),
mWindowTokenWithPointerCapture(nullptr),
mLatencyAggregator(),
mLatencyTracker(&mLatencyAggregator),
mCompatService(getCompatService()) {
// 创建 Looper
mLooper = new Looper(false);
mReporter = createInputReporter();
mKeyRepeatState.lastKeyEntry = nullptr;
policy->getDispatcherConfiguration(&mConfig);
}
复制代码
createInputReader.createInputReader
frameworks/native/services/inputflinger/reader/InputReaderFactory.cpp
sp<InputReaderInterface> createInputReader(const sp<InputReaderPolicyInterface>& policy,
const sp<InputListenerInterface>& listener) {
return new InputReader(std::make_unique<EventHub>(), policy, listener);
}
复制代码
InputReader.InputReader
frameworks/native/services/inputflinger/reader/InputReader.cpp
InputReader::InputReader(std::shared_ptr<EventHubInterface> eventHub,
const sp<InputReaderPolicyInterface>& policy,
const sp<InputListenerInterface>& listener)
: mContext(this),
mEventHub(eventHub),
mPolicy(policy),
mGlobalMetaState(0),
mLedMetaState(AMETA_NUM_LOCK_ON),
mGeneration(1),
mNextInputDeviceId(END_RESERVED_ID),
mDisableVirtualKeysTimeout(LLONG_MIN),
mNextTimeout(LLONG_MAX),
mConfigurationChangesToRefresh(0) {
// 创建 QueuedInputListener,将间接传入的 InputDispatcher 对象传入
mQueuedListener = new QueuedInputListener(listener);
{ // acquire lock
std::scoped_lock _l(mLock);
refreshConfigurationLocked(0);
updateGlobalMetaStateLocked();
} // release lock
}
复制代码
QueuedInputListener.QueuedInputListener
frameworks/native/services/inputflinger/InputListener.cpp
QueuedInputListener::QueuedInputListener(const sp<InputListenerInterface>& innerListener) :
mInnerListener(innerListener) { // 将 InputDispatcher 存到成员变量 mInnerListener 中
}
复制代码
InputReader.start
frameworks/native/services/inputflinger/reader/InputReader.cpp
在IMS调用start方法后,会经过NativeInputManager到InputManager,再到InputReader中,调用InputReader.start
方法
status_t InputReader::start() {
if (mThread) {
return ALREADY_EXISTS;
}
mThread = std::make_unique<InputThread>(
"InputReader", [this]() { loopOnce(); }, [this]() { mEventHub->wake(); });
return OK;
}
复制代码
InputReader.loopOnce
void InputReader::loopOnce() {
// ......
// 获取Event
size_t count = mEventHub->getEvents(timeoutMillis, mEventBuffer, EVENT_BUFFER_SIZE);
{ // acquire lock
std::scoped_lock _l(mLock);
mReaderIsAliveCondition.notify_all();
if (count) {
processEventsLocked(mEventBuffer, count);
}
// ......
} // release lock
// ......
mQueuedListener->flush();
}
复制代码
InputReader读取到Event事件后,调用processEventsLocked方法,该方法根据Event的类型调用QueuedInputListener中对应的方法,例如键盘Event,会调用QueuedInputListener::notifyKey
方法
QueuedInputListener.notifyKey
void QueuedInputListener::notifyKey(const NotifyKeyArgs* args) {
traceEvent(__func__, args->id);
// push 到 mArgsQueue 中
mArgsQueue.push_back(new NotifyKeyArgs(*args));
}
复制代码
在InputReader.loopOnce
方法中,就调用QueuedInputListener.flush
方法,唤醒InputDispatcher处理Event
QueuedInputListener.flush
void QueuedInputListener::flush() {
size_t count = mArgsQueue.size();
// 拿到 mArgsQueue 中的 Event
for (size_t i = 0; i < count; i++) {
NotifyArgs* args = mArgsQueue[i];
// 执行 notify 方法
args->notify(mInnerListener);
delete args;
}
mArgsQueue.clear();
}
复制代码
NotifyKeyArgs.notify
void NotifyKeyArgs::notify(const sp<InputListenerInterface>& listener) const {
// listener 就是 InputDispatcher
listener->notifyKey(this);
}
复制代码
InputDispatcher.notifyKey
frameworks/native/services/inputflinger/dispatcher/InputDispatcher.cpp
void InputDispatcher::notifyKey(const NotifyKeyArgs* args) {
// ......
int32_t keyCode = args->keyCode;
accelerateMetaShortcuts(args->deviceId, args->action, keyCode, metaState);
KeyEvent event;
// Event 初始化
event.initialize(args->id, args->deviceId, args->source, args->displayId, INVALID_HMAC,
args->action, flags, keyCode, args->scanCode, metaState, repeatCount,
args->downTime, args->eventTime);
android::base::Timer t;
mPolicy->interceptKeyBeforeQueueing(&event, /*byref*/ policyFlags);
if (t.duration() > SLOW_INTERCEPTION_THRESHOLD) {
ALOGW("Excessive delay in interceptKeyBeforeQueueing; took %s ms",
std::to_string(t.duration().count()).c_str());
}
bool needWake;
{ // acquire lock
// ......
std::unique_ptr<KeyEntry> newEntry =
std::make_unique<KeyEntry>(args->id, args->eventTime, args->deviceId, args->source,
args->displayId, policyFlags, args->action, flags,
keyCode, args->scanCode, metaState, repeatCount,
args->downTime);
needWake = enqueueInboundEventLocked(std::move(newEntry));
mLock.unlock();
} // release lock
if (needWake) {
// 唤醒 InputReader的dispatchOnce方法
mLooper->wake();
}
}
复制代码
InputDispatcher.enqueueInboundEventLocked
bool InputDispatcher::enqueueInboundEventLocked(std::unique_ptr<EventEntry> newEntry) {
bool needWake = mInboundQueue.empty();
// 将Event push 到 iq 中
mInboundQueue.push_back(std::move(newEntry));
EventEntry& entry = *(mInboundQueue.back());
traceInboundQueueLengthLocked();
switch (entry.type) {
case EventEntry::Type::KEY: {
const KeyEntry& keyEntry = static_cast<const KeyEntry&>(entry);
if (isAppSwitchKeyEvent(keyEntry)) {
// AKEY_EVENT_ACTION_DOWN needWake走mInboundQueue.empty();
if (keyEntry.action == AKEY_EVENT_ACTION_DOWN) {
mAppSwitchSawKeyDown = true;
} else if (keyEntry.action == AKEY_EVENT_ACTION_UP) { // AKEY_EVENT_ACTION_UP 则needWake = true
if (mAppSwitchSawKeyDown) {
#if DEBUG_APP_SWITCH
ALOGD("App switch is pending!");
#endif
mAppSwitchDueTime = keyEntry.eventTime + APP_SWITCH_TIMEOUT;
mAppSwitchSawKeyDown = false;
needWake = true;
}
}
}
break;
}
// ......
}
return needWake;
}
复制代码
InputDispatcher.start
在IMS调用start方法后,会经过NativeInputManager到InputManager,再到InputDispatcher中,调用InputDispatcher的start方法
status_t InputDispatcher::start() {
if (mThread) {
return ALREADY_EXISTS;
}
mThread = std::make_unique<InputThread>(
"InputDispatcher", [this]() { dispatchOnce(); }, [this]() { mLooper->wake(); });
return OK;
}
复制代码
InputDispatcher.dispatchOnce
void InputDispatcher::dispatchOnce() {
// ......
// Wait for callback or timeout or wake. (make sure we round up, not down)
nsecs_t currentTime = now();
int timeoutMillis = toMillisecondTimeoutDelay(currentTime, nextWakeupTime);
// epoll_wait 等待被唤醒
mLooper->pollOnce(timeoutMillis);
}
复制代码
上面的一整套InputReader与InputDispatcher初始化的流程有点长,但是又是不可省略的分析,现在我们将分析延展到Looper的pollOnce中,看看poolOnce是如何实现wait的。
在InputReader构造函数中,创建了一个Looper对象,Looper的构造函数如下
Looper.Looper
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
// 构建 epoll 机制
rebuildEpollLocked();
}
复制代码
Looper.rebuildEpollLocked
void Looper::rebuildEpollLocked() {
// 如果存在旧的epoll则关闭
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
mEpollFd.reset();
}
// 创建epoll
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
// 注意,此处是对mWakeEventFd进行监听,监听可读事件
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
for (const auto& [seq, request] : mRequests) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
复制代码
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
- 第一个参数 为epoll_create的返回值
- 第二个参数 表示动作,用三个宏表示
- EPOLL_CTL_ADD 添加新的fd到epfd中
- EPOLL_CTL_MOD 修改已经注册fd的监听事件
- EPOLL_CTL_DEL 从epfd中删除一个fd
- 第三个参数 表示要监听的fd
- 第四个参数 表示监听什么事,可以用已经定义好的宏表示
- EPOLLIN 表示对应的文件描述符可读
- EPOLLOUT 表示对应的文件描述符可写
- EPOLLPRI 表示对应的文件描述符有紧急的数据可写
- EPOLLERR 表示对应的文件描述符发生错误
- EPOLLHUP 表示对应的文件描述符被挂断
- EPOLLET 将epoll设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
- EPOLLONESHOT 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
关于epoll_ctl
返回值
- 返回 0,代表设置成功
- 返回 -1,设置错误
Looper.pollOnce
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// ......
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
result = pollInner(timeoutMillis);
}
}
复制代码
Looper.pollInner
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// 调整 timeoutMillis 时间
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 会在此处阻塞,当对mWakeEventFd文件fd有写入事件时,会唤醒此处进行读取,或等待timeoutMillis超时
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// rebuild epoll
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// epoll发生错误,直接跳转到Done标签
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// 超时,跳转到Done标签
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
// 令 result = POLL_TIMEOUT
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
if (seq == WAKE_EVENT_FD_SEQ) {
// 判断是输入类型的事件
if (epollEvents & EPOLLIN) {
// 读出事件
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// ......
}
}
Done: ;
// ......
return result;
}
复制代码
result的返回值有如下几个:
- POLL_WAKE = -1,有输入事件被唤醒
- POLL_CALLBACK = -2,Callback类型事件
- POLL_TIMEOUT = -3,epoll_wait等待超时
- POLL_ERROR = -4,epoll发生错误
上面的函数被阻塞在了epoll_wait中,当对mWakeEventFd这个fd有输入事件的时候,会唤醒epoll_wait。
InputDispatcher.notifyKey函数中,会调用mLooper->wake()
唤醒此epoll_wait
Looper.wake
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// 向 mWakeEventFd 写入数字1,如果失败则不断尝试,直到写入成功
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
复制代码
epoll_wait被唤醒后,执行到awoken中
Looper.awoken
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
uint64_t counter;
// 读取事件
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
复制代码
awoken会读取事件,失败则不断尝试读取,直到读取成功。
最终Looper.poolInner
会返回POLL_WAKE(-1),并return掉poolOnce函数,接着在InputDispatcher.dispatchOnce
处理Event