Android 消息机制

前言

说到Android 的消息机制,首先就会想到Looper、MessageQueue、Handler、Message这四个对象来实现完成的,下面查看一下Androd消息机制原理

Looper

创建Looper (Looper.preare())

流程分析:

  • ActivityThread.main() –>Looper.prepareMainLooper()
  • Looper.prepareMainLooper() –> Looper.prepare(boolean quitAllowed)
  • Looper.prepare(boolean quitAllowed) –> new Looper(quitAllowed)
  • new Looper(quitAllowed) –> new new MessageQueue(quitAllowed)

APP启动过程中通过上述流程最终创建的了Looper和MessageQueue对象

ActivtiyThread.main()

public static void main(String[] args) {
    ...  
    //该方法中调用了Looper.prepare(false)创建Looper
    Looper.prepareMainLooper();
    ...
    //循环去消息
    Looper.loop();
    ...
}
复制代码

Looper.prepareMainLooper()

 //初始化主线程的Looper
 public static void prepareMainLooper() {
        //调用Looper.prepare()方法创建Looper
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            //获取创建的Looper 赋值给sMainLooper
            sMainLooper = myLooper();
        }
    }
复制代码

Looper.prepare()

private static void prepare(boolean quitAllowed) {
    //sThreadLocal获取当前线程中的Looper,判断是否为空,不为空抛出异常
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    //保存当前线程的Looper对象
    sThreadLocal.set(new Looper(quitAllowed));
}
复制代码

Looper(boolean quitAllowed)

private Looper(boolean quitAllowed) {
    //创建消息队列
    mQueue = new MessageQueue(quitAllowed);
    //获取当前线程
    mThread = Thread.currentThread();
}
复制代码

循环消息(Looper.loop())

创建完Looper对象之后,调用Looper.loop()方法,Looper开始真正的工作,一直轮询MessageQueue队列中的Messager消息

public static void loop() {
    //获取当前线程Looper
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    ...
    //开始消息轮询
    for (;;) {
        //从消息队列中获取Message消息,接收消息时可能会阻塞
        Message msg = queue.next(); 
        if (msg == null) {
             //message为null时,退出消息循环
            return;
        }
        ...
        try {
            //根据Message的traget进行消息的分发
            msg.target.dispatchMessage(msg);
            if (observer != null) {
                observer.messageDispatched(token, msg);
            }
            dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
        } catch (Exception exception) {
            if (observer != null) {
                observer.dispatchingThrewException(token, msg, exception);
            }
            throw exception;
        } finally {
            ThreadLocalWorkSource.restore(origWorkSource);
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }
        ...
        //消息回收
        msg.recycleUnchecked();
    }
}
复制代码

通过上述源码分析:

  • 首先获取Looper对象,如果为空抛出异常
  • 开始循环获取消息
  • 通过queue.next()从消息队列中获取消息,如果消息为空则退出循环
  • 根据Message的target进行消息的分发,target指的是Handler
  • 调用Message的recycleUnchecked()进行消息回收

ThreadLocal

ThreadLocal是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,数据存储以后,只有在指定的线程中才可以访问,其他线程则无法获取。

在Looper当中有一个sThreadLocal属性,它的作用就是对Looper与线程之间进行关联

public final class Looper {
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
}
复制代码

ThreadLocalMap

在ThreadLocalMap对象中有一个Entry[]对象数组,Entry 继承了 WeakReference,那么通过 Entry 对象的 get 方法就可以获取到一个弱引用的 ThreadLocal 对象。扩展了一个 Object 类型的 value 对象,并且在构造方法中进行了初始化赋值。Entry 保存了 ThreadLocal(key) 和 对应的值(value),其中 ThreadLoacl 是通过弱引用的形式,避免了线程池线程复用带来的内存泄露

static class ThreadLocalMap {

    static class Entry extends WeakReference<ThreadLocal<?>> {

        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    ...
    private Entry[] table;
    ...
}
复制代码

ThreadLocalMap.set()

private void set(ThreadLocal<?> key, Object value) {

    Entry[] tab = table;
    int len = tab.length;
    //ThreadLocal的HashCode来计算数组存储的索引位置i
    int i = key.threadLocalHashCode & (len-1);
    //如果i位置已经存储了对象,那么就往后挪一个位置依次类推,直到找到空的位置,再将对象存放
    for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        //如果找到的Entry的k与key相等,则直接Entry的value值
        if (k == key) {
            e.value = value;
            return;
        }
        //如果找到的Entry的k为null,则通过replaceStaleEntry()保存key
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    //构建出 Entry 保存进 table 数组中
    tab[i] = new Entry(key, value);
    int sz = ++size;
    //判断一下当前的存储的对象个数是否已经超出了阈值(threshold的值)大小,如果超出了,需要重新扩充并将所有的对象重新计算位置
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}
复制代码

ThreadLocal.set()

public void set(T value) {
    //获取当前线程
    Thread t = Thread.currentThread();
    /**获取当前线程的ThreadLocalMap 
     * 通过Thread.threadLocals获取ThreadLocalMap
     * ThreadLocalMap 底层是一个Entry[]数组组成
     * Entry 继承WeakReference 
     * key=ThreadLocal value=Lopper 组成对象
     */
    ThreadLocalMap map = getMap(t);
    if (map != null)
        //设置Entry对象key=ThreadLocal,value=Object,Entry数组
        map.set(this, value);
    else
        //ThreadLocalMap 对象
        createMap(t, value);
}
复制代码

通过上述源码发现 set()方法中做了如下操作

  • 获取当前线程
  • 获取当前线程的 ThreadLocalMap 对象
  • 如果获取到的 map 对象不为空,则设置值,否则创建 map 设置值

ThreadLocalMap getMap(Thread t)

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}
复制代码

获取的是Thread对象中的threadLocals变量,threadLocals的类型是ThreadLocalMap

createMap(Thread t, T firstValue)

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
复制代码

ThreadLocal.get()

public T get() {
    //获取当前先线程
    Thread t = Thread.currentThread();
    /**获取当前线程的ThreadLocalMap 
     * 通过Thread.threadLocals获取ThreadLocalMap
     * ThreadLocalMap 底层是一个Entry[]数组组成
     * Entry 继承WeakReference 
     * key=ThreadLocal value=Lopper 组成对象
     */
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}
复制代码

通过上述源码发现 get()方法做了如下操作

  • 获取当前线程
  • 获取当前线程的 ThreadLocalMap 对象
  • 判断获取的map对象是否为空,为空则返回默认值
  • 不为空,使用当前ThreadLoacl 对象(this)获取 ThreadLocalMap 的 Entry 对象,返回 Entry 保存的 value 值

Message

Message是消息主体,它负责存储消息的各种信息,包括发送消息的Handler对象、消息信息、消息标识等

属性

常用的几个属性

  • what:消息的匹配code,用来区别不同的消息对应着不同的处理结果
  • arg1/arg2:Message携带的int信息,也可以通过setData设置
  • obj:Message携带的Object信息,必须要经过Parcelable ,否则会抛出    java.lang.RuntimeException: Can’t marshal non-Parcelable objects across processes.异常也可以通过setData设置
  • target:该Message对应的Handler,用来进行分发消息
  • callback:处理消息

常量

  • sPoolSync :充当锁的作用,避免多线程争抢资源,导致脏数据
  • sPool: 消息队列的头部的指针
  • sPoolSize :消息队列的长度
  • MAX_POOL_SIZE:消息队列缓存消息的最大的长度

创建Message对象 obtain()

public static Message obtain() {
    //sPoolSync 同步锁,避免多线程争抢资源,导致脏数据
    synchronized (sPoolSync) {
        //判断头部指针是否为空,如果为空则代表没有可复用的Message
        if (sPool != null) {
            //当前节点的Message赋值给m
            Message m = sPool;
            //获取当前Message的下一个节点
            sPool = m.next;
            m.next = null;
            m.flags = 0; // clear in-use flag
            //删除一个插入节点
            sPoolSize--;
            //返回Message
            return m;
        }
    }
    return new Message();
}
复制代码

通过上述代码可以发现,创建Message有如下操作

  • 首先通过同步锁,避免多线程争抢资源,导致脏数据
  • 判断当前队列头部指针是否为空,为空代表当前队列中没有Message,直接new Message 创建返回
  • 当前队列头部指针不为空,代表有可复用的Message
  • 获取当前队列头部节点Message
  • 设置下一个节点数据到sPool
  • 设置取出的Message的next为空
  • 当前队列长度减一
  • 返回去到的想象

Message对象回收 recycle()/recycleUnchecked()

public void recycle() {
    //判断当前message是否可以被回收(是否正在被使用)
    if (isInUse()) {
        if (gCheckRecycle) {
            throw new IllegalStateException("This message cannot be recycled because it "
                    + "is still in use.");
        }
        return;
    }
    //当前message对象可以被回收
    recycleUnchecked();
}

void recycleUnchecked() {
    //清除当前Message对象成员变量信息
    flags = FLAG_IN_USE;
    what = 0;
    arg1 = 0;
    arg2 = 0;
    obj = null;
    replyTo = null;
    sendingUid = UID_NONE;
    workSourceUid = UID_NONE;
    when = 0;
    target = null;
    callback = null;
    data = null;

    synchronized (sPoolSync) {
        //如果对象池中的size小于最大size,则在对象池中提那件当前Message
        //MAX_POOL_SIZE 默认为50
        if (sPoolSize < MAX_POOL_SIZE) {
            //当前Message作为第一个节点赋值给sPool
            next = sPool;
            sPool = this;
            sPoolSize++;
        }
    }
}
复制代码

当message对象被回收的时候,先清除其成员变量信息,然后将该对象作为sPool池的第一个节点添加到池中

MessageQueue

消息队列就是用来存储消息、取出消息和移除消息的;他既不是用List来存储,也不是用Map来存储,他是用Message的内部链表结构来存储的。那么接下来我们就来研究研究,MessageQueue到底是如何做到对消息的存储、取出和移除的

添加消息 enqueueMessage(Message msg, long when)

boolean enqueueMessage(Message msg, long when) {
    //判断msg.target是否为空(Handerl 是否为空)
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }

    synchronized (this) {
        //判断msg是否处于不能使用的状态
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
        //MessageQueue是否调用退出消息
        if (mQuitting) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
            //回收消息
            msg.recycle();
            return false;
        }

        msg.markInUse();
        //设置msg处理时间
        msg.when = when;
        //当前队列节点Message赋值给P
        Message p = mMessages;
        boolean needWake;
        //判断当前队列是否为空、或者消息处理时间为零 、或者队列节点Message的时间大于当前msg处理时间,直接加入队列最后一个节点
        if (p == null || when == 0 || when < p.when) {
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                //取出下一个消息
                p = p.next;
                //根据msg处理时间,判断插入到队列哪个节点前面
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            //msg插入队列中
            msg.next = p;
            prev.next = msg;
        }

        
        if (needWake) {
            //唤醒消息循环
            nativeWake(mPtr);
        }
    }
    return true;
}
复制代码

通过上述源码可以发现,插入Message分为如下几个步骤

  • 判断Message.target是否为空,为空抛出一次
  • 判断Message是否被消费,被消费抛出异常
  • 判断当前队列是否有消息或者当前消息处理优先级高于队列中等待处理节点的消息,直接插入到队列当中,并设置优先出来当前消息
  • 通过比较消息的执行时间,将消息插入到相应的位置
  • 判断是否唤醒消息循环

取消息 next()

Message next() {       
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }

    int pendingIdleHandlerCount = -1; 
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        //native方法,nextPollTimeoutMillis为-1时进入睡眠状态
        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            //获取当前系统时间
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            //把当前队列最新节点消息赋值给msg
            Message msg = mMessages;
            //开启同步屏障,获取异步消息
            if (msg != null && msg.target == null) {
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                //判断当前时间是否小于,msg执行时间
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                      
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    //返回消息
                    return msg;
                }
            } else {
                    
                nextPollTimeoutMillis = -1;
            }
            //退出循环退出
            if (mQuitting) {
                dispose();
                return null;
            }
            //获取空闲IdleHand数量
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                pendingIdleHandlerCount = mIdleHandlers.size();
            }
            //判断是否空闲IdleHand数量是否大于零,如果小于零,直接跳过本次轮询
            if (pendingIdleHandlerCount <= 0) {
                mBlocked = true;
                continue;
            }

            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            }
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
        }
        //非睡眠状态下处理IdleHandler接口
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; 

            boolean keep = false;
            try {
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf(TAG, "IdleHandler threw exception", t);
            }

            if (!keep) {
                synchronized (this) {
                    mIdleHandlers.remove(idler);
                }
            }
        }

        pendingIdleHandlerCount = 0;
        
        nextPollTimeoutMillis = 0;
    }
}
复制代码

通过上述源码可以发现,在next()做了如下操作

  • 首先判断ptr是否为零,为零则是MessageQueue被释放了,直接返回null
  • native方法,nextPollTimeoutMillis为-1时进入睡眠状态
  • msg.target(即handler)== null的时候,则开启了同步屏障,message会在它后面消息中进行循环遍历,直到遍历到队尾或找到一个isAsynchronous()==true的消息,即异步消息,否则取出第一个同步消息来处理
  • 比较当前想象的执行时间是否小于当前系统时间,小于则返回该msg对象,否则就会计算当前时间到when还有多久并保存到nextPollTimeoutMillis中
  • 获取设置IdleHandl的size,如果IdleHandl的size大于零,则会执行每一个IdleHandl的queueIdle()方法否则退出本次循环

退出消息 quit(boolean safe)

void quit(boolean safe) {
    //当前MessageQueue是否允许退出,不允许退出 则抛出异常
    if (!mQuitAllowed) {
        throw new IllegalStateException("Main thread not allowed to quit.");
    }

    synchronized (this) {
        if (mQuitting) {
            return;
        }
        //设置退出状态
        mQuitting = true;

        if (safe) {
            //处理所有延时消息
            removeAllFutureMessagesLocked();
        } else {
            //处理所有消息
            removeAllMessagesLocked();
        }
    
        //唤醒消息循环
        nativeWake(mPtr);
    }
}
//处理延时延时消息
private void removeAllFutureMessagesLocked() {
    //获取当前系统时间
    final long now = SystemClock.uptimeMillis();
    Message p = mMessages;
    if (p != null) {
        //判断当前节点Message执行时间是否大于当前系统时间
        if (p.when > now) {
            //处理所有消息
            removeAllMessagesLocked();
        } else {
            Message n;
            //
            for (;;) {
                n = p.next;
                if (n == null) {
                    return;
                }
                if (n.when > now) {
                    //找出延时消息
                    break;
                }
                p = n;
            }
            p.next = null;
            //处理所有延时消息
            do {
                p = n;
                n = p.next;
                //消息回收
                p.recycleUnchecked();
            } while (n != null);
        }
    }
}
//处理所有消息
private void removeAllMessagesLocked() {
    Message p = mMessages;
    while (p != null) {
        Message n = p.next;
        //消息回收
        p.recycleUnchecked();
        p = n;
    }
    mMessages = null;
}
复制代码

通过上述源码可以发现,quit()主要有如下步骤

  • 设置退出状态,以便在next方法中返回null,使Looper.loop()退出循环
  • 判断处理消息的方式,如果safe为true 则处理所有延时消息,否则处理所有消息
  • 所有消息调用Message.recycleUnchecked()进行消息回收

Handler

Handler 是一个消息分发对象。Handler是Android给我们提供用来更新UI的一套机制,也是一套消息处理机制,我们可以发消息,也可以通过它处理消息。

创建Handler

 public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
     mLooper = looper;
     mQueue = looper.mQueue;
     mCallback = callback;
     mAsynchronous = async;
 }
复制代码

从上述源码发现,Handler在创建的时候绑定了Lopper和MessagerQueue对象,所以每个Handler对象都会持有一个Looper和MessageQueue对象。所以子线程用handler发送消息,发送的消息被送到与主线程相关联的MessageQueue,也是主线程相关联的Looper在循环消息,handler所关联的是主线程的Looper和MessageQueue,所以最后消息的处理逻辑也是在主线程。只有发送消息是在子线程,其它都是在主线程,Handler与哪个线程的Looper相关联,消息处理逻辑就在与之相关的线程中执行,相应的消息的走向也就在相关联的MessageQueue中。所以子线程切换到主线程是很自然的过程。

发送消息

在Handler类中发送消息的方法有很多,最终都会调用sendMessageAtTime()方法

 /**
  * msg 需要传递的消息
  * uptimeMillis 消息执行的绝对时间
  */
 public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    //判断消息队列是否为空,为空抛出一次
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}
/**
 *
 */
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,long uptimeMillis) {
    //Message绑定Handler
    msg.target = this;
    msg.workSourceUid = ThreadLocalWorkSource.getUid();
    //是否异步消息
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    //调用MessageQueue.enqueueMessage把Message放入队列当中
    return queue.enqueueMessage(msg, uptimeMillis);
}
复制代码

从上述源码可以发现,Handler发送消息主要做了如下几个步骤

  • 计算出需要传递消息执行的绝对时间
  • Message 消息绑定Handler 以便在loop()方法中取到消息后进行消息处理
  • 把Message消息添加到MessageQueue当中

消息处理

public void dispatchMessage(@NonNull Message msg) {
    //判断Message的callback 是否为空
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        //判断Handler的mCallback是为空
        if (mCallback != null) {
            //mCallback处理消息,并返回是否向下分发消息
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        //处理消息
        handleMessage(msg);
    }
}
复制代码

通过上述源码可以发现,Handler处理消息做了如下几个步骤

  • 当Message中的callback不为null时,执行Message中的callback中的方法。这个callback时一个Runnable接口
  • 当Handler中的Callback接口不为null时,执行Callback接口中的方法
  • 直接执行Handler中的handleMessage()方法

总结

Android消息机制处理流程如下:

  • 应用启动/主线层启动时,在ActvityThread的main方法中初始化了Looper和MessageQueue
  • 同时在Lopper和MessageQueue初始化完成后,在ActvityThread的main方法中调用了Looper.loop()方法进行消息轮询
  • 使用Handler时,在Handler创建时会绑定一个Lopper和MessageQueue
  • Handler通过sendMessage方法将消息插入MessageQueue中
  • Handler在通过handlerMessage处理消息
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享