GCD 底层源码分析(二)

在这一篇章我们主要分析同步函数与异步函数的底层源码实现,在探索之前我们先了解一下同步函数与异步函数的区别。

  1. 能否开辟线程
  2. 任务的回调是否具备异步性或者同步性
  3. 产生死锁的原因

同步函数 dispatch_sync

  1. dispatch_sync
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
	uintptr_t dc_flags = DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	}
	_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码
  1. _dispatch_sync_f
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
		uintptr_t dc_flags)
{
	_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
复制代码
  1. _dispatch_sync_f_inline
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	// dq_width == 1 代表是串行队列,串行队列同步函数会产生死锁现象
	if (likely(dq->dq_width == 1)) {
		return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
	}

	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	}

	dispatch_lane_t dl = upcast(dq)._dl;
	// Global concurrent queues and queues bound to non-dispatch threads
	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
	if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
		return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
	}

	if (unlikely(dq->do_targetq->do_targetq)) {
		return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
	}
	_dispatch_introspection_sync_begin(dl);
	_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
			_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
复制代码
  1. _dispatch_barrier_sync_f
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
复制代码
  1. _dispatch_barrier_sync_f_inline

image.png

这里我们重点看一下 _dispatch_sync_f_slow 函数,平时我们遇到死锁的时候会报的错误信息。

  1. 模拟死锁现象
- (void)textDemo1{
    dispatch_queue_t queue = dispatch_queue_create("chenxi", DISPATCH_QUEUE_SERIAL);
    NSLog(@"1");
    dispatch_async(queue, ^{
        NSLog(@"2");
        dispatch_sync(queue, ^{
            NSLog(@"3");
        });
        NSLog(@"4");
    });
    NSLog(@"5");
}
复制代码

这里我们来运行这段代码来模拟一下死锁现象。

image.png

但我们运行起来之后可以看到真正报错的其实是在 __DISPATCH_WAIT_FOR_QUEUE__ 函数中产生的,所以我们具体看下 __DISPATCH_WAIT_FOR_QUEUE__ 函数的实现。

  1. __DISPATCH_WAIT_FOR_QUEUE__

image.png

在这里我们可以看到这些信息跟堆栈报错信息一致,所以说明如果这个判断条件成立就会产生死锁,就会抛出异常。所以我们具体来看一下这个判断条件成立的条件,这里 dsc->dsc_waiter 代表当前线程的 iddq_state 代表等待的状态。

  1. _dq_state_drain_locked_by
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
	return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}

static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
	// equivalent to _dispatch_lock_owner(lock_value) == tid
	// 这里的返回结果是 lock_value ^ tid 于上 DLOCK_OWNER_MASK,因为DLOCK_OWNER_MASK是个很大的值
	// 所以最后结果成立的条件就是 lock_value ^ tid 等于 0
	return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
复制代码

通过源码我们可以看到,最后参数死锁的条件就是 lock_value ^ tid == 0,也就是说要等待的状态和线程 id 相同,当前在等待状态,又调起了 dq(队列),又要去执行,执行的时候,又发现是在等待状态,产生了死锁。

同步函数任务同步

这里我们来分析一下在全局并发队列下执行同步函数底层代码的执行流程。

image.png
image.png

我们执行这段代码,并对 _dispatch_barrier_sync_f_inline 函数中我们对这几个地方下符号断点。

image.png

运行之后可以看到会执行 _dispatch_sync_f_slow 函数,所以我们跟到这里函数里面。

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
		dispatch_function_t func, uintptr_t top_dc_flags,
		dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
	dispatch_queue_t top_dq = top_dqu._dq;
	dispatch_queue_t dq = dqu._dq;
	if (unlikely(!dq->do_targetq)) {
		return _dispatch_sync_function_invoke(dq, ctxt, func);
	}

	pthread_priority_t pp = _dispatch_get_priority();
	struct dispatch_sync_context_s dsc = {
		.dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
		.dc_func     = _dispatch_async_and_wait_invoke,
		.dc_ctxt     = &dsc,
		.dc_other    = top_dq,
		.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
		.dc_voucher  = _voucher_get(),
		.dsc_func    = func,
		.dsc_ctxt    = ctxt,
		.dsc_waiter  = _dispatch_tid_self(),
	};

	_dispatch_trace_item_push(top_dq, &dsc);
	__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

	if (dsc.dsc_func == NULL) {
		// dsc_func being cleared means that the block ran on another thread ie.
		// case (2) as listed in _dispatch_async_and_wait_f_slow.
		dispatch_queue_t stop_dq = dsc.dc_other;
		return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
	}

	_dispatch_introspection_sync_begin(top_dq);
	_dispatch_trace_item_pop(top_dq, &dsc);
	_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
			DISPATCH_TRACE_ARG(&dsc));
}
复制代码

_dispatch_sync_f_slow 函数中可以看到还是有很多流程,我们继续下符号断点,看看会执行到哪一个函数。

image.png

断点之后可以看到会执行 _dispatch_sync_function_invoke 函数,所以继续跟进这个函数。

static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
		dispatch_function_t func)
{
	_dispatch_sync_function_invoke_inline(dq, ctxt, func);
}
复制代码
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
		dispatch_function_t func)
{
	dispatch_thread_frame_s dtf;
	_dispatch_thread_frame_push(&dtf, dq);
	_dispatch_client_callout(ctxt, func);
	_dispatch_perfmon_workitem_inc();
	_dispatch_thread_frame_pop(&dtf);
}
复制代码

我们在 GCD 底层源码分析(一) 讲过 _dispatch_client_callout,执行到这句的时候就会执行函数调用,所以就会同步执行。

异步函数分析

  1. dispatch_async
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码

在这里任务会被包装在 qos 中,然后执行 _dispatch_continuation_async 函数。

  1. _dispatch_continuation_async
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
		dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
	if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
		_dispatch_trace_item_push(dqu, dc);
	}
#else
	(void)dc_flags;
#endif
	return dx_push(dqu._dq, dc, qos);
}
复制代码

在前面也讲过 dx_push 是一个宏定义,dx_push 会根据队列类型的不同有所不同。

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
	.do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
	.do_dispose     = _dispatch_lane_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_lane_invoke,

	.dq_activate    = _dispatch_lane_activate,
	.dq_wakeup      = _dispatch_lane_wakeup,
	.dq_push        = _dispatch_lane_concurrent_push,
);
复制代码

这里我们看一下并发队列的情况。

  1. _dispatch_lane_concurrent_push
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
ordering
	if (dq->dq_items_tail == NULL &&
			!_dispatch_object_is_waiter(dou) &&
			!_dispatch_object_is_barrier(dou) &&
			_dispatch_queue_try_acquire_async(dq)) {
		return _dispatch_continuation_redirect_push(dq, dou, qos);
	}

	_dispatch_lane_push(dq, dou, qos);
}
复制代码

这里需要注意的一个细节就是需要判断是否有栅栏的情况。

  1. _dispatch_lane_push
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
	dispatch_wakeup_flags_t flags = 0;
	struct dispatch_object_s *prev;

	if (unlikely(_dispatch_object_is_waiter(dou))) {
		return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
	}

	dispatch_assert(!_dispatch_object_is_global(dq));
	qos = _dispatch_queue_push_qos(dq, qos);

	// If we are going to call dx_wakeup(), the queue must be retained before
	// the item we're pushing can be dequeued, which means:
	// - before we exchange the tail if we have to override
	// - before we set the head if we made the queue non empty.
	// Otherwise, if preempted between one of these and the call to dx_wakeup()
	// the blocks submitted to the queue may release the last reference to the
	// queue when invoked by _dispatch_lane_drain. <rdar://problem/6932776>

	prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
	if (unlikely(os_mpsc_push_was_empty(prev))) {
		_dispatch_retain_2_unsafe(dq);
		flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
	} else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
		// There's a race here, _dispatch_queue_need_override may read a stale
		// dq_state value.
		//
		// If it's a stale load from the same drain streak, given that
		// the max qos is monotonic, too old a read can only cause an
		// unnecessary attempt at overriding which is harmless.
		//
		// We'll assume here that a stale load from an a previous drain streak
		// never happens in practice.
		_dispatch_retain_2_unsafe(dq);
		flags = DISPATCH_WAKEUP_CONSUME_2;
	}
	os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
	if (flags) {
		return dx_wakeup(dq, qos, flags);
	}
}
复制代码
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
复制代码

_dispatch_lane_push 函数中我们继续下符号断点,这里需要注意的一点是 dx_wakeup 是一个宏定义,而 dq_wakeup 在并发队列时 .dq_wakeup = _dispatch_lane_wakeup,所以这里符号断点需要下 _dispatch_lane_wakeup

image.png

断点之后可以看到会执行 _dispatch_lane_wakeup 函数。

  1. _dispatch_lane_wakeup
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
	dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

	if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
		return _dispatch_lane_barrier_complete(dqu, qos, flags);
	}
	if (_dispatch_queue_class_probe(dqu)) {
		target = DISPATCH_QUEUE_WAKEUP_TARGET;
	}
	return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
复制代码

在这里继续下断点会执行 _dispatch_queue_wakeup 函数。

  1. _dispatch_queue_wakeup

_dispatch_queue_wakeup 函数中同样对所有流程下符号断点,然后会执行 _dispatch_lane_class_barrier_complete 函数。

  1. _dispatch_lane_class_barrier_complete

在这里会执行到 _dispatch_root_queue_push 函数。

  1. _dispatch_root_queue_push

image.png

  1. _dispatch_root_queue_push_inline
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
		dispatch_object_t _head, dispatch_object_t _tail, int n)
{
	struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
	if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
		return _dispatch_root_queue_poke(dq, n, 0);
	}
}
复制代码
  1. _dispatch_root_queue_poke
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
	if (!_dispatch_queue_class_probe(dq)) {
		return;
	}
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
	if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
	{
		if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
			_dispatch_root_queue_debug("worker thread request still pending "
					"for global queue: %p", dq);
			return;
		}
	}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
	return _dispatch_root_queue_poke_slow(dq, n, floor);
}
复制代码
  1. _dispatch_root_queue_poke_slow

该方法的源码实现,主要有两步操作

  • 通过 _dispatch_root_queues_init 方法注册回调
  • 通过 do-while 循环创建线程,使用 pthread_create 方法
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
    int r = ENOSYS;

    _dispatch_root_queues_init();//重点
    
    ...
    //do-while循环创建线程
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
        while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
            if (r != EAGAIN) {
                (void)dispatch_assume_zero(r);
            }
            _dispatch_temporary_resource_shortage();
        }
    } while (--remaining);
    
    ...
}
复制代码

_dispatch_root_queues_init

  • 进入 _dispatch_root_queues_init 源码实现,发现是一个dispatch_once_f 单例(请查看后续单例的底层分析,这里不作说明),其中传入的 func_dispatch_root_queues_init_once
 DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
    dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
}
复制代码

进入 _dispatch_root_queues_init_once 的源码,其内部不同事务的调用句柄都是 _dispatch_worker_thread2

image.png

block 回调执行的调用路径为:_dispatch_root_queues_init_once -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline -> _dispatch_client_callout -> dispatch_call_block_and_release
这个路径可以通过断点,bt 打印堆栈信息得出

image.png

在这里需要说明一点的是,单例的 block 回调和异步函数的 block 回调是不同的

  • 单例中,block 回调中的 func_dispatch_Block_invoke(block)
  • 而异步函数中,block 回调中的 funcdispatch_call_block_and_release

异步函数的底层分析总结如下:

  • 准备工作:首先,将异步任务拷贝并封装,并设置回调函数 func
  • block 回调:底层通过 dx_push 递归,会重定向到根队列,然后通过 pthread_creat 创建线程,最后通过 dx_invoke 执行 block 回调(dx_pushdx_invoke 是成对的)

GCD 单例底层原理

  1. dispatch_once
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
    NSLog(@"单例应用");
});
复制代码

进入 dispatch_once 源码实现,底层是通过 dispatch_once_f 实现的

  • 参数1:onceToken,它是一个静态变量,由于不同位置定义的静态变量是不同的,所以静态变量具有唯一性
  • 参数2:block 回调
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
    dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
复制代码
  1. dispatch_once_f
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);//load
    if (likely(v == DLOCK_ONCE_DONE)) {//已经执行过了,直接返回
        return;
    }
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
        return _dispatch_once_mark_done_if_quiesced(l, v);
    }
#endif
#endif
    if (_dispatch_once_gate_tryenter(l)) {//尝试进入
        return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);//无限次等待
}
复制代码

进入 dispatch_once_f 源码,其中的 val 是外界传入的 onceToken 静态变量,而 func_dispatch_Block_invoke(block),其中单例的底层主要分为以下几步

  • val 也就是静态变量转换为 dispatch_once_gate_t 类型的变量l

  • 通过 os_atomic_load 获取此时的任务的标识符 v

    • 如果 v 等于 DLOCK_ONCE_DONE,表示任务已经执行过了,直接 return
    • 如果 任务执行后,加锁失败了,则走到 _dispatch_once_mark_done_if_quiesced 函数,再次进行存储,将标识符置为 DLOCK_ONCE_DONE
    • 反之,则通过 _dispatch_once_gate_tryenter 尝试进入任务,即解锁,然后执行 _dispatch_once_callout 执行 block 回调
  • 如果此时有任务正在执行,再次进来一个任务 2,则通过_dispatch_once_wait 函数让任务 2 进入无限次等待

  1. _dispatch_once_gate_tryenter
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
    return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
            (uintptr_t)_dispatch_lock_value_for_self(), relaxed);//首先对比,然后进行改变
}
复制代码

查看其源码,主要是通过底层 os_atomic_cmpxchg 方法进行对比,如果比较没有问题,则进行加锁,即任务的标识符置为DLOCK_ONCE_UNLOCKED

  1. _dispatch_once_callout
DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
        dispatch_function_t func)
{
    _dispatch_client_callout(ctxt, func);//block调用执行
    _dispatch_once_gate_broadcast(l);//进行广播:告诉别人有了归属,不要找我了
复制代码

进入 _dispatch_once_callout 源码,主要就两步

*_dispatch_client_calloutblock 回调执行

  • _dispatch_once_gate_broadcast:进行广播
  1. _dispatch_client_callout
#undef _dispatch_client_callout
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    @try {
        return f(ctxt);
    }
    @catch (...) {
        objc_terminate();
    }
}
复制代码

进入 _dispatch_client_callout 源码,主要就是执行 block 回调,其中的f等于 _dispatch_Block_invoke(block),即异步回调

  1. _dispatch_once_gate_broadcast -> _dispatch_once_mark_done 源码
DISPATCH_ALWAYS_INLINE
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
    //如果不相同,直接改为相同,然后上锁 -- DLOCK_ONCE_DONE
    return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
复制代码

进入 _dispatch_once_gate_broadcast -> _dispatch_once_mark_done 源码,主要就是给 dgo->dgo_once 一个值,然后将任务的标识符为DLOCK_ONCE_DONE,即解锁。

总结
针对单例的底层实现,主要说明如下:

  • 单例只执行一次的原理:GCD单例中,有两个重要参数,onceToken 和

block,其中onceToken是静态变量,具有唯一性,在底层被封装成了dispatch_once_gate_t类型的变量l,l主要是用来获取底层原子封装性的关联,即变量v,通过v来查询任务的状态,如果此时v等于DLOCK_ONCE_DONE,说明任务已经处理过一次了,直接return。

  • block调用时机:如果此时任务没有执行过,则会在底层通过C++函数的比较,将任务进行加锁,即任务状态置为DLOCK_ONCE_UNLOCK,目的是为了保证当前任务执行的唯一性,防止在其他地方有多次定义。加锁之后进行block回调函数的执行,执行完成后,将当前任务解锁,将当前的任务状态置为DLOCK_ONCE_DONE,在下次进来时,就不会在执行,会直接返回。
  • 多线程影响:如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的。
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享