Android 消息机制 – Native 层实现 – 以 InputReader 与 InputDispatcher 间通信为背景

源码基于 Android S AOSP,源码在线查看地址: cs.android.com/android/pla…

线程间通信在 Android 系统中应用十分广泛,本文是一个系列文章,主要梳理了Android 中 Java、native 层的线程间通信机制。

前两篇文章主要总结了一下Java层是如何实现Android线程间通信的,本篇文章主要梳理一下native层的实现

本文是以system_server中的InputReader与InputDispatcher线程间的通信为背景。通过分析源码了解实现的逻辑。

InputReader 与 InputDispatcher 初始化工作

IMS启动过程中,会调用frameworks/base/services/core/jni/com_android_server_input_InputManagerService.cppnativeInit方法初始化native层

InputManagerService.InputManagerService

frameworks/base/services/core/java/com/android/server/input/InputManagerService.java

public InputManagerService(Context context) {
    this.mContext = context;
    // 创建 Handler
    this.mHandler = new InputManagerHandler(DisplayThread.get().getLooper());

    mStaticAssociations = loadStaticInputPortAssociations();
    mUseDevInputEventForAudioJack =
        context.getResources().getBoolean(R.bool.config_useDevInputEventForAudioJack);
    Slog.i(TAG, "Initializing input manager, mUseDevInputEventForAudioJack="
           + mUseDevInputEventForAudioJack);
    
    // 初始化 native 层,携带 MessageQueue
    mPtr = nativeInit(this, mContext, mHandler.getLooper().getQueue()/* 拿到 MessageQueue */);

    String doubleTouchGestureEnablePath = context.getResources().getString(
        R.string.config_doubleTouchGestureEnableFile);
    mDoubleTouchGestureEnableFile = TextUtils.isEmpty(doubleTouchGestureEnablePath) ? null :
    new File(doubleTouchGestureEnablePath);

    LocalServices.addService(InputManagerInternal.class, new LocalService());
}
复制代码

jni.nativeInit

frameworks/base/services/core/jni/com_android_server_input_InputManagerService.cpp

static jlong nativeInit(JNIEnv* env, jclass /* clazz */,
        jobject serviceObj, jobject contextObj, jobject messageQueueObj) {
    // 根据传入的 messageQueueObj 参数获取到 NativeMessageQueue
    sp<MessageQueue> messageQueue = android_os_MessageQueue_getMessageQueue(env, messageQueueObj);
    if (messageQueue == nullptr) {
        jniThrowRuntimeException(env, "MessageQueue is not initialized.");
        return 0;
    }
	
    // 创建 NativeInputManager,同时传入 NativeMessageQueue 中的 looper
    NativeInputManager* im = new NativeInputManager(contextObj, serviceObj,
            messageQueue->getLooper());
    im->incStrong(0);
    return reinterpret_cast<jlong>(im);
}
复制代码

NativeInputManager.NativeInputManager

frameworks/base/services/core/jni/com_android_server_input_InputManagerService.cpp

NativeInputManager::NativeInputManager(jobject contextObj,
        jobject serviceObj, const sp<Looper>& looper) :
        mLooper(looper), mInteractive(true) { // 将 looper 保存到成员变量
    JNIEnv* env = jniEnv();

    mServiceObj = env->NewGlobalRef(serviceObj);

    {
        AutoMutex _l(mLock);
        mLocked.systemUiLightsOut = false;
        mLocked.pointerSpeed = 0;
        mLocked.pointerGesturesEnabled = true;
        mLocked.showTouches = false;
        mLocked.pointerDisplayId = ADISPLAY_ID_DEFAULT;
    }
    mInteractive = true;
	// 创建 InputManager
    InputManager* im = new InputManager(this, this);
    mInputManager = im;
    defaultServiceManager()->addService(String16("inputflinger"), im);
}
复制代码

InputManager.InputManager

frameworks/native/services/inputflinger/InputManager.cpp

InputManager::InputManager(
        const sp<InputReaderPolicyInterface>& readerPolicy,
        const sp<InputDispatcherPolicyInterface>& dispatcherPolicy) {
    // 创建 InputDispatcher
    mDispatcher = createInputDispatcher(dispatcherPolicy);
    mClassifier = new InputClassifier(mDispatcher);
    // 创建 InputReader,将  InputDispatcher 对象传入
    mReader = createInputReader(readerPolicy, mClassifier);
}
复制代码

createInputDispatcher

frameworks/native/services/inputflinger/dispatcher/InputDispatcherFactory.cpp

sp<InputDispatcherInterface> createInputDispatcher(
        const sp<InputDispatcherPolicyInterface>& policy) {
    return new android::inputdispatcher::InputDispatcher(policy);
}
复制代码

InputDispatcher.InputDispatcher

frameworks/native/services/inputflinger/dispatcher/InputDispatcher.cpp

InputDispatcher::InputDispatcher(const sp<InputDispatcherPolicyInterface>& policy)
      : mPolicy(policy),
        mPendingEvent(nullptr),
        mLastDropReason(DropReason::NOT_DROPPED),
        mIdGenerator(IdGenerator::Source::INPUT_DISPATCHER),
        mAppSwitchSawKeyDown(false),
        mAppSwitchDueTime(LONG_LONG_MAX),
        mNextUnblockedEvent(nullptr),
        mDispatchEnabled(false),
        mDispatchFrozen(false),
        mInputFilterEnabled(false),
        // mInTouchMode will be initialized by the WindowManager to the default device config.
        // To avoid leaking stack in case that call never comes, and for tests,
        // initialize it here anyways.
        mInTouchMode(true),
        mMaximumObscuringOpacityForTouch(1.0f),
        mFocusedDisplayId(ADISPLAY_ID_DEFAULT),
        mWindowTokenWithPointerCapture(nullptr),
        mLatencyAggregator(),
        mLatencyTracker(&mLatencyAggregator),
        mCompatService(getCompatService()) {
    // 创建 Looper
    mLooper = new Looper(false);
    mReporter = createInputReporter();

    mKeyRepeatState.lastKeyEntry = nullptr;

    policy->getDispatcherConfiguration(&mConfig);
}
复制代码

createInputReader.createInputReader

frameworks/native/services/inputflinger/reader/InputReaderFactory.cpp

sp<InputReaderInterface> createInputReader(const sp<InputReaderPolicyInterface>& policy,
                                           const sp<InputListenerInterface>& listener) {
    return new InputReader(std::make_unique<EventHub>(), policy, listener);
}
复制代码

InputReader.InputReader

frameworks/native/services/inputflinger/reader/InputReader.cpp

InputReader::InputReader(std::shared_ptr<EventHubInterface> eventHub,
                         const sp<InputReaderPolicyInterface>& policy,
                         const sp<InputListenerInterface>& listener)
      : mContext(this),
        mEventHub(eventHub),
        mPolicy(policy),
        mGlobalMetaState(0),
        mLedMetaState(AMETA_NUM_LOCK_ON),
        mGeneration(1),
        mNextInputDeviceId(END_RESERVED_ID),
        mDisableVirtualKeysTimeout(LLONG_MIN),
        mNextTimeout(LLONG_MAX),
        mConfigurationChangesToRefresh(0) {
    // 创建 QueuedInputListener,将间接传入的 InputDispatcher 对象传入
    mQueuedListener = new QueuedInputListener(listener);

    { // acquire lock
        std::scoped_lock _l(mLock);

        refreshConfigurationLocked(0);
        updateGlobalMetaStateLocked();
    } // release lock
}
复制代码

QueuedInputListener.QueuedInputListener

frameworks/native/services/inputflinger/InputListener.cpp

QueuedInputListener::QueuedInputListener(const sp<InputListenerInterface>& innerListener) :
        mInnerListener(innerListener) { // 将 InputDispatcher 存到成员变量 mInnerListener 中
}
复制代码

InputReader.start

frameworks/native/services/inputflinger/reader/InputReader.cpp

在IMS调用start方法后,会经过NativeInputManager到InputManager,再到InputReader中,调用InputReader.start方法

status_t InputReader::start() {
    if (mThread) {
        return ALREADY_EXISTS;
    }
    mThread = std::make_unique<InputThread>(
            "InputReader", [this]() { loopOnce(); }, [this]() { mEventHub->wake(); });
    return OK;
}
复制代码

InputReader.loopOnce

void InputReader::loopOnce() {
    // ......
	
    // 获取Event
    size_t count = mEventHub->getEvents(timeoutMillis, mEventBuffer, EVENT_BUFFER_SIZE);

    { // acquire lock
        std::scoped_lock _l(mLock);
        mReaderIsAliveCondition.notify_all();

        if (count) {
            processEventsLocked(mEventBuffer, count);
        }

        // ......
        
    } // release lock
    // ......
    
    mQueuedListener->flush();
}
复制代码

InputReader读取到Event事件后,调用processEventsLocked方法,该方法根据Event的类型调用QueuedInputListener中对应的方法,例如键盘Event,会调用QueuedInputListener::notifyKey方法

QueuedInputListener.notifyKey

void QueuedInputListener::notifyKey(const NotifyKeyArgs* args) {
    traceEvent(__func__, args->id);
    // push 到 mArgsQueue 中
    mArgsQueue.push_back(new NotifyKeyArgs(*args));
}
复制代码

InputReader.loopOnce方法中,就调用QueuedInputListener.flush方法,唤醒InputDispatcher处理Event

QueuedInputListener.flush

void QueuedInputListener::flush() {
    size_t count = mArgsQueue.size();
    // 拿到 mArgsQueue 中的 Event
    for (size_t i = 0; i < count; i++) {
        NotifyArgs* args = mArgsQueue[i];
        // 执行 notify 方法
        args->notify(mInnerListener);
        delete args;
    }
    mArgsQueue.clear();
}
复制代码

NotifyKeyArgs.notify

void NotifyKeyArgs::notify(const sp<InputListenerInterface>& listener) const {
    // listener 就是 InputDispatcher
    listener->notifyKey(this);
}
复制代码

InputDispatcher.notifyKey

frameworks/native/services/inputflinger/dispatcher/InputDispatcher.cpp

void InputDispatcher::notifyKey(const NotifyKeyArgs* args) {
	// ......

    int32_t keyCode = args->keyCode;
    accelerateMetaShortcuts(args->deviceId, args->action, keyCode, metaState);

    KeyEvent event;
    // Event 初始化
    event.initialize(args->id, args->deviceId, args->source, args->displayId, INVALID_HMAC,
                     args->action, flags, keyCode, args->scanCode, metaState, repeatCount,
                     args->downTime, args->eventTime);

    android::base::Timer t;
    mPolicy->interceptKeyBeforeQueueing(&event, /*byref*/ policyFlags);
    if (t.duration() > SLOW_INTERCEPTION_THRESHOLD) {
        ALOGW("Excessive delay in interceptKeyBeforeQueueing; took %s ms",
              std::to_string(t.duration().count()).c_str());
    }

    bool needWake;
    { // acquire lock
        // ......

        std::unique_ptr<KeyEntry> newEntry =
                std::make_unique<KeyEntry>(args->id, args->eventTime, args->deviceId, args->source,
                                           args->displayId, policyFlags, args->action, flags,
                                           keyCode, args->scanCode, metaState, repeatCount,
                                           args->downTime);

        needWake = enqueueInboundEventLocked(std::move(newEntry));
        mLock.unlock();
    } // release lock

    if (needWake) {
        // 唤醒 InputReader的dispatchOnce方法
        mLooper->wake();
    }
}
复制代码

InputDispatcher.enqueueInboundEventLocked

bool InputDispatcher::enqueueInboundEventLocked(std::unique_ptr<EventEntry> newEntry) {
    bool needWake = mInboundQueue.empty();
    // 将Event push 到 iq 中
    mInboundQueue.push_back(std::move(newEntry));
    EventEntry& entry = *(mInboundQueue.back());
    traceInboundQueueLengthLocked();

    switch (entry.type) {
        case EventEntry::Type::KEY: {
            const KeyEntry& keyEntry = static_cast<const KeyEntry&>(entry);
            if (isAppSwitchKeyEvent(keyEntry)) {
                // AKEY_EVENT_ACTION_DOWN needWake走mInboundQueue.empty();
                if (keyEntry.action == AKEY_EVENT_ACTION_DOWN) {
                    mAppSwitchSawKeyDown = true;
                } else if (keyEntry.action == AKEY_EVENT_ACTION_UP) { // AKEY_EVENT_ACTION_UP 则needWake = true
                    if (mAppSwitchSawKeyDown) {
#if DEBUG_APP_SWITCH
                        ALOGD("App switch is pending!");
#endif
                        mAppSwitchDueTime = keyEntry.eventTime + APP_SWITCH_TIMEOUT;
                        mAppSwitchSawKeyDown = false;
                        needWake = true;
                    }
                }
            }
            break;
        }
            
        // ......
    }

    return needWake;
}
复制代码

InputDispatcher.start

在IMS调用start方法后,会经过NativeInputManager到InputManager,再到InputDispatcher中,调用InputDispatcher的start方法

status_t InputDispatcher::start() {
    if (mThread) {
        return ALREADY_EXISTS;
    }
    mThread = std::make_unique<InputThread>(
            "InputDispatcher", [this]() { dispatchOnce(); }, [this]() { mLooper->wake(); });
    return OK;
}
复制代码

InputDispatcher.dispatchOnce

void InputDispatcher::dispatchOnce() {
    // ......

    // Wait for callback or timeout or wake.  (make sure we round up, not down)
    nsecs_t currentTime = now();
    int timeoutMillis = toMillisecondTimeoutDelay(currentTime, nextWakeupTime);
    // epoll_wait 等待被唤醒
    mLooper->pollOnce(timeoutMillis);
}
复制代码

上面的一整套InputReader与InputDispatcher初始化的流程有点长,但是又是不可省略的分析,现在我们将分析延展到Looper的pollOnce中,看看poolOnce是如何实现wait的。

在InputReader构造函数中,创建了一个Looper对象,Looper的构造函数如下

Looper.Looper

Looper::Looper(bool allowNonCallbacks)
    : mAllowNonCallbacks(allowNonCallbacks),
      mSendingMessage(false),
      mPolling(false),
      mEpollRebuildRequired(false),
      mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
      mResponseIndex(0),
      mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
    LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));

    AutoMutex _l(mLock);
    // 构建 epoll 机制
    rebuildEpollLocked();
}
复制代码

Looper.rebuildEpollLocked

void Looper::rebuildEpollLocked() {
    // 如果存在旧的epoll则关闭
    if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
        mEpollFd.reset();
    }

    // 创建epoll
    mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

    epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
    
    // 注意,此处是对mWakeEventFd进行监听,监听可读事件
    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));

    
    for (const auto& [seq, request] : mRequests) {
        epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);

        int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
                  request.fd, strerror(errno));
        }
    }
}
复制代码

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

  • 第一个参数 为epoll_create的返回值
  • 第二个参数 表示动作,用三个宏表示
    • EPOLL_CTL_ADD 添加新的fd到epfd中
    • EPOLL_CTL_MOD 修改已经注册fd的监听事件
    • EPOLL_CTL_DEL 从epfd中删除一个fd
  • 第三个参数 表示要监听的fd
  • 第四个参数 表示监听什么事,可以用已经定义好的宏表示
    • EPOLLIN 表示对应的文件描述符可读
    • EPOLLOUT 表示对应的文件描述符可写
    • EPOLLPRI 表示对应的文件描述符有紧急的数据可写
    • EPOLLERR 表示对应的文件描述符发生错误
    • EPOLLHUP 表示对应的文件描述符被挂断
    • EPOLLET 将epoll设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
    • EPOLLONESHOT 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

关于epoll_ctl返回值

  • 返回 0,代表设置成功
  • 返回 -1,设置错误

Looper.pollOnce

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        // ......

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != nullptr) *outFd = 0;
            if (outEvents != nullptr) *outEvents = 0;
            if (outData != nullptr) *outData = nullptr;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}		
复制代码

Looper.pollInner

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

    // 调整 timeoutMillis 时间
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
                this, mNextMessageUptime - now, timeoutMillis);
#endif
    }

    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 会在此处阻塞,当对mWakeEventFd文件fd有写入事件时,会唤醒此处进行读取,或等待timeoutMillis超时
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // rebuild epoll
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // epoll发生错误,直接跳转到Done标签
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // 超时,跳转到Done标签
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        // 令 result = POLL_TIMEOUT
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

    for (int i = 0; i < eventCount; i++) {
        const SequenceNumber seq = eventItems[i].data.u64;
        uint32_t epollEvents = eventItems[i].events;
        if (seq == WAKE_EVENT_FD_SEQ) {
            // 判断是输入类型的事件
            if (epollEvents & EPOLLIN) {
                // 读出事件
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            // ......
        }
    }
Done: ;

    // ......
    
    return result;
}
复制代码

result的返回值有如下几个:

  • POLL_WAKE = -1,有输入事件被唤醒
  • POLL_CALLBACK = -2,Callback类型事件
  • POLL_TIMEOUT = -3,epoll_wait等待超时
  • POLL_ERROR = -4,epoll发生错误

上面的函数被阻塞在了epoll_wait中,当对mWakeEventFd这个fd有输入事件的时候,会唤醒epoll_wait。

InputDispatcher.notifyKey函数中,会调用mLooper->wake()唤醒此epoll_wait

Looper.wake

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    // 向 mWakeEventFd 写入数字1,如果失败则不断尝试,直到写入成功
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
                             mWakeEventFd.get(), nWrite, strerror(errno));
        }
    }
}
复制代码

epoll_wait被唤醒后,执行到awoken中

Looper.awoken

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ awoken", this);
#endif

    uint64_t counter;
    // 读取事件
    TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
复制代码

awoken会读取事件,失败则不断尝试读取,直到读取成功。

最终Looper.poolInner会返回POLL_WAKE(-1),并return掉poolOnce函数,接着在InputDispatcher.dispatchOnce处理Event

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享