GCD原理分析下篇

这是我参与8月更文挑战的第6天,活动详情查看:8月更文挑战

同步函数死锁

主线程因为你同步函数的原因等着先执⾏任务

主队列等着主线程的任务执⾏完毕再执⾏⾃⼰的任务
主队列和主线程相互等待会造成死锁

- (void)textDemo2{
    // 
    dispatch_queue_t queue = dispatch_queue_create("cooci", DISPATCH_QUEUE_SERIAL);
    NSLog(@"1");
    // 异步函数
    dispatch_async(queue, ^{
        NSLog(@"2");
        // 串行队列和同步函数会造成死锁问题,本题是这个原因,但是不能说串行队列和同步函数一定会造成死锁问题
        dispatch_sync(queue, ^{
        });
    });
    NSLog(@"5");
}
复制代码

上面造成死锁之后,bt一下看下调用栈
image.png
我们顺着源码也来分析下:

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
	uintptr_t dc_flags = DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	}
	_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码

这个流程比较熟悉,我们上一篇也讲过, _dispatch_sync_f -> _dispatch_sync_f_inline在这个函数中我们知道,同步函数的dq_width = 1

if (likely(dq->dq_width == 1)) {
		return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
复制代码

所以我们定位到了_dispatch_barrier_sync_f -> _dispatch_barrier_sync_f_inline
我们看下上面的死锁时候的调用栈,发现有个_dispatch_sync_f_slow,好的那么我们就在_dispatch_barrier_sync_f_inline中定位到_dispatch_sync_f_slow发现了报错的时候的函数__DISPATCH_WAIT_FOR_QUEUE__

static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
	uint64_t dq_state = _dispatch_wait_prepare(dq);
    // 第二个参数   .dsc_waiter  = _dispatch_tid_self()也就是线程的id
	if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
		DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
				"dispatch_sync called on queue "
				"already owned by current thread");
	}
}
复制代码

_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter)这里的第二个参数 .dsc_waiter = _dispatch_tid_self()也就是线程的id

#define _dispatch_tid_self()		((dispatch_tid)_dispatch_thread_port())
复制代码

_dq_state_drain_locked_by -> _dispatch_lock_is_locked_by

static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
	// equivalent to _dispatch_lock_owner(lock_value) == tid
	return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
复制代码

DLOCK_OWNER_MASK是一个极大值,意思是说只要lock_value ^ tid 不为0,那么这个就不成立。相反如果要成立的话,也就是死锁的话,lock_value ^ tid = 0那么异或的两个值什么情况下为0呢,答案当然是相等。也就是说当前等待线程的tid和当前准备要调用的线程tid相等就会造成死锁

同步函数 + 全局队列

dispatch_sync + global_queue

   dispatch_queue_t queue1 = dispatch_get_global_queue(0, 0);
   dispatch_sync(queue1, ^{
        NSLog(@"%@",[NSThread currentThread]);
   });
复制代码

同时bt一下
image.png
我们可以在第三行的时候下符号断点,在_dispatch_sync_f_inline函数中
image.png
在工程中符号断点这3个方法
image.png
发现来到了这个方法结合上面bt的调用栈发现_dispatch_sync_function_invoke这个方法。接下来的流程就比较简单了。_dispatch_sync_function_invoke -> _dispatch_sync_function_invoke_inline ->_dispatch_client_callout

异步函数 + 全局队列/并发队列

dispatch_async + DISPATCH_QUEUE_CONCURRENT
image.png
dispatch_async + global_queue
image.png

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码

dispatch_async -> _dispatch_continuation_async -> dx_push这个流程比较熟悉上一篇已经讲过的。.dq_push = _dispatch_root_queue_push

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
	.do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
	.do_dispose     = _dispatch_lane_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_lane_invoke,

	.dq_activate    = _dispatch_lane_activate,
	.dq_wakeup      = _dispatch_lane_wakeup,
	.dq_push        = _dispatch_lane_concurrent_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
	.do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
	.do_dispose     = _dispatch_object_no_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_object_no_invoke,

	.dq_activate    = _dispatch_queue_no_activate,
	.dq_wakeup      = _dispatch_root_queue_wakeup,
	.dq_push        = _dispatch_root_queue_push,
);
复制代码

这里全局队列对应的是_dispatch_root_queue_push,并发队列对应的是dispatch_lane_concurrent_push,全局队列对应的是_dispatch_root_queue_push,我们先看复杂一点的并发队列吧

void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
	// <rdar://problem/24738102&24743140> reserving non barrier width
	// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
	// width equivalent), so we have to check that this thread hasn't
	// enqueued anything ahead of this call or we can break ordering
	if (dq->dq_items_tail == NULL &&
			!_dispatch_object_is_waiter(dou) &&
			!_dispatch_object_is_barrier(dou) &&
			_dispatch_queue_try_acquire_async(dq)) {
		return _dispatch_continuation_redirect_push(dq, dou, qos);
	}

	_dispatch_lane_push(dq, dou, qos);
}
复制代码

_dispatch_lane_push这个其实也是串行队列的.dq_push

void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
	//...
	if (unlikely(_dispatch_object_is_waiter(dou))) {
		return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
	}

	//...
	if (flags) {
		return dx_wakeup(dq, qos, flags);
	}
}
复制代码

可以看到这里面其实只有两个return,在demo中下符号断点。dx_wakeup全局搜索定位到是一个宏

#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
复制代码

搜索dq_wakeup,由于现在研究的是并发队列所以.dq_wakeup = _dispatch_lane_wakeup,
image.png
由符号断点可以知道并没有走_dispatch_lane_push_waiter这个方法,所以异步并发流程链路应该是:dispatch_async -> _dispatch_continuation_async -> dx_push -> _dispatch_lane_concurrent_push -> _dispatch_lane_push -> dx_wakeup -> _dispatch_lane_wakeup -> _dispatch_queue_wakeup
_dispatch_queue_wakeup中一共有四个函数返回,我们分别下符号断点
image.png
_dispatch_lane_class_barrier_complete中发现已经到了系统级别的函数了。我们知道全局队列的.dq_push = _dispatch_root_queue_push我们试着下个断点,发现确实走到了这一步。所以可以这么总结:手动系统创建的并发队列_dispatch_lane_concurrent_push经过一段复杂的逻辑计算之后合流到了全局并发队列_dispatch_root_queue_push这里。这也从侧方便验证了全局队列是一种特殊的并发队列。
_dispatch_root_queue_push_inline -> _dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow -> _dispatch_root_queues_init
说到这个_dispatch_root_queues_init就适当插入另外的一个单例知识

单例底层原理

static inline void
_dispatch_root_queues_init(void)
{
	dispatch_once_f(&_dispatch_root_queues_pred, NULL,
			_dispatch_root_queues_init_once);
}
复制代码

这就是单例,我们上层调用的dispatch_once底层就是dispatch_once_f

void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
	dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
复制代码
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
	dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
#endif
#endif
	if (_dispatch_once_gate_tryenter(l)) {
		return _dispatch_once_callout(l, ctxt, func);
	}
	return _dispatch_once_wait(l);
}
复制代码

第一次调用的时候会来到_dispatch_once_gate_tryenter(l)那里,_dispatch_once_gate_tryenter进行一些原子操作加锁当前的线程,所以单例也是线程安全的_dispatch_lock_value_for_self(),_dispatch_once_callout -> _dispatch_once_gate_broadcast -> _dispatch_once_mark_done

static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
	return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
复制代码

这里全局标记DLOCK_ONCE_DONE所以下次进入的时候直接到了上面的likely(v == DLOCK_ONCE_DONE代码return

异步函数 + 全局队列/并发队列

回到上面的流程,这里意思就是让_dispatch_root_queues_init_once这个函数只执行一次,那么_dispatch_root_queues_init_once做了什么?我们在这里发现了_dispatch_worker_thread2为了方便阅读,把之前异步并发的调用栈再贴一下
image.png
可以看出下面就到了libsystem_pthread.dylib下层api,也就是说GCD就是在pthread之上封装改变而来。回到之前的函数

static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
	//从外面传递的参数知道 n=1,floor=0
    int remaining = n;
	_dispatch_root_queues_init();
 	// ...
	int can_request, t_count;
	t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
	do {
        // 剩下的可获取的线程数量
		can_request = t_count < floor ? 0 : t_count - floor;
        // 需要的数量 > 剩下的数量 报异常
		if (remaining > can_request) {
			_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
					remaining, can_request);
			os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
			remaining = can_request;
		}
		if (remaining == 0) {
			_dispatch_root_queue_debug("pthread pool is full for root queue: "
					"%p", dq);
			return;
		}
	} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
			t_count - remaining, &t_count, acquire));
	// ...
}
复制代码

这里面的do whlie循环dgq_thread_pool_size = 1,而上一篇提到过线程数量

  _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
      DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
      (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0))
复制代码
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
#define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2)
复制代码

所以这里的DISPATCH_QUEUE_WIDTH_POOLDISPATCH_QUEUE_WIDTH_MAX = 1

struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
	.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
	.dgq_thread_pool_size = 1,
};
复制代码

所以从这里可以得出:
全局并发队列dq_atomic_flags_dispatch_queue_init创建的并发队列dq_atomic_flags =1
全局并发队列.dgq_thread_pool_size =1,而创建的并发队列.dgq_thread_pool_size=0
所以上面的do while就是线程池调度的规则。那么到底可以开辟多少线程呢?

dgq_thread_pool_size 线程池大小

static inline void
_dispatch_root_queue_init_pthread_pool(dispatch_queue_global_t dq,
		int pool_size, dispatch_priority_t pri)
{ 
    //...
	int thread_pool_size = DISPATCH_WORKQ_MAX_PTHREAD_COUNT;
	dq->dgq_thread_pool_size = thread_pool_size;
}
复制代码
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
复制代码

也就是线程池的最大数量是255
查找苹果官方文档pthread说明,512kb的大小但是最小能开16kb
image.png
我们知道栈的内存是一定的,如果一个线程占的空间越大那么可以开辟的数量就越少,所以当是线程大小是16kb的时候此时开的线程数量是最多的。假如内核态1gb全部用来开线程的话,那么数量就在

  • 最大:1024 * 1024 / 16 = 64 * 1024
  • 最小:1024 * 1024 /512 = 2048

所以这个线程数量是不固定的。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享