这是我参与8月更文挑战的第6天,活动详情查看:8月更文挑战
同步函数死锁
主线程因为你同步函数的原因等着先执⾏任务
主队列等着主线程的任务执⾏完毕再执⾏⾃⼰的任务
主队列和主线程相互等待会造成死锁
- (void)textDemo2{
//
dispatch_queue_t queue = dispatch_queue_create("cooci", DISPATCH_QUEUE_SERIAL);
NSLog(@"1");
// 异步函数
dispatch_async(queue, ^{
NSLog(@"2");
// 串行队列和同步函数会造成死锁问题,本题是这个原因,但是不能说串行队列和同步函数一定会造成死锁问题
dispatch_sync(queue, ^{
});
});
NSLog(@"5");
}
复制代码
上面造成死锁之后,bt一下看下调用栈
我们顺着源码也来分析下:
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码
这个流程比较熟悉,我们上一篇也讲过, _dispatch_sync_f
-> _dispatch_sync_f_inline
在这个函数中我们知道,同步函数的dq_width = 1
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
复制代码
所以我们定位到了_dispatch_barrier_sync_f
-> _dispatch_barrier_sync_f_inline
我们看下上面的死锁时候的调用栈,发现有个_dispatch_sync_f_slow
,好的那么我们就在_dispatch_barrier_sync_f_inline
中定位到_dispatch_sync_f_slow
发现了报错的时候的函数__DISPATCH_WAIT_FOR_QUEUE__
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
// 第二个参数 .dsc_waiter = _dispatch_tid_self()也就是线程的id
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
}
复制代码
_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter)
这里的第二个参数 .dsc_waiter = _dispatch_tid_self()
也就是线程的id
#define _dispatch_tid_self() ((dispatch_tid)_dispatch_thread_port())
复制代码
_dq_state_drain_locked_by
-> _dispatch_lock_is_locked_by
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
复制代码
DLOCK_OWNER_MASK
是一个极大值,意思是说只要lock_value ^ tid
不为0,那么这个就不成立。相反如果要成立的话,也就是死锁的话,lock_value ^ tid = 0
那么异或的两个值什么情况下为0呢,答案当然是相等。也就是说当前等待线程的tid和当前准备要调用的线程tid相等就会造成死锁
同步函数 + 全局队列
dispatch_sync
+ global_queue
dispatch_queue_t queue1 = dispatch_get_global_queue(0, 0);
dispatch_sync(queue1, ^{
NSLog(@"%@",[NSThread currentThread]);
});
复制代码
同时bt一下
我们可以在第三行的时候下符号断点,在_dispatch_sync_f_inline
函数中
在工程中符号断点这3个方法
发现来到了这个方法结合上面bt的调用栈发现_dispatch_sync_function_invoke
这个方法。接下来的流程就比较简单了。_dispatch_sync_function_invoke
-> _dispatch_sync_function_invoke_inline
->_dispatch_client_callout
异步函数 + 全局队列/并发队列
dispatch_async
+ DISPATCH_QUEUE_CONCURRENT
dispatch_async
+ global_queue
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码
dispatch_async
-> _dispatch_continuation_async
-> dx_push
这个流程比较熟悉上一篇已经讲过的。.dq_push = _dispatch_root_queue_push
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
复制代码
这里全局队列对应的是_dispatch_root_queue_push
,并发队列对应的是dispatch_lane_concurrent_push
,全局队列对应的是_dispatch_root_queue_push
,我们先看复杂一点的并发队列吧
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
复制代码
_dispatch_lane_push
这个其实也是串行队列的.dq_push
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
//...
if (unlikely(_dispatch_object_is_waiter(dou))) {
return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
}
//...
if (flags) {
return dx_wakeup(dq, qos, flags);
}
}
复制代码
可以看到这里面其实只有两个return
,在demo中下符号断点。dx_wakeup
全局搜索定位到是一个宏
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
复制代码
搜索dq_wakeup
,由于现在研究的是并发队列所以.dq_wakeup = _dispatch_lane_wakeup,
由符号断点可以知道并没有走_dispatch_lane_push_waiter
这个方法,所以异步并发流程链路应该是:dispatch_async
-> _dispatch_continuation_async
-> dx_push
-> _dispatch_lane_concurrent_push
-> _dispatch_lane_push
-> dx_wakeup
-> _dispatch_lane_wakeup
-> _dispatch_queue_wakeup
在_dispatch_queue_wakeup
中一共有四个函数返回,我们分别下符号断点
在_dispatch_lane_class_barrier_complete
中发现已经到了系统级别的函数了。我们知道全局队列的.dq_push = _dispatch_root_queue_push
我们试着下个断点,发现确实走到了这一步。所以可以这么总结:手动系统创建的并发队列_dispatch_lane_concurrent_push
经过一段复杂的逻辑计算之后合流到了全局并发队列_dispatch_root_queue_push
这里。这也从侧方便验证了全局队列是一种特殊的并发队列。
_dispatch_root_queue_push_inline
-> _dispatch_root_queue_poke
-> _dispatch_root_queue_poke_slow
-> _dispatch_root_queues_init
说到这个_dispatch_root_queues_init
就适当插入另外的一个单例知识
单例底层原理
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
复制代码
这就是单例,我们上层调用的dispatch_once
底层就是dispatch_once_f
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
复制代码
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
复制代码
第一次调用的时候会来到_dispatch_once_gate_tryenter(l)
那里,_dispatch_once_gate_tryenter
进行一些原子操作加锁当前的线程,所以单例也是线程安全的_dispatch_lock_value_for_self()
,_dispatch_once_callout
-> _dispatch_once_gate_broadcast
-> _dispatch_once_mark_done
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
复制代码
这里全局标记DLOCK_ONCE_DONE
所以下次进入的时候直接到了上面的likely(v == DLOCK_ONCE_DONE
代码return
异步函数 + 全局队列/并发队列
回到上面的流程,这里意思就是让_dispatch_root_queues_init_once
这个函数只执行一次,那么_dispatch_root_queues_init_once
做了什么?我们在这里发现了_dispatch_worker_thread2
为了方便阅读,把之前异步并发的调用栈再贴一下
可以看出下面就到了libsystem_pthread.dylib
下层api,也就是说GCD就是在pthread之上封装改变而来。回到之前的函数
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
//从外面传递的参数知道 n=1,floor=0
int remaining = n;
_dispatch_root_queues_init();
// ...
int can_request, t_count;
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
// 剩下的可获取的线程数量
can_request = t_count < floor ? 0 : t_count - floor;
// 需要的数量 > 剩下的数量 报异常
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
// ...
}
复制代码
这里面的do whlie循环dgq_thread_pool_size
= 1,而上一篇提到过线程数量
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0))
复制代码
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
#define DISPATCH_QUEUE_WIDTH_MAX (DISPATCH_QUEUE_WIDTH_FULL - 2)
复制代码
所以这里的DISPATCH_QUEUE_WIDTH_POOL
– DISPATCH_QUEUE_WIDTH_MAX
= 1
struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
.dgq_thread_pool_size = 1,
};
复制代码
所以从这里可以得出:
全局并发队列dq_atomic_flags
– _dispatch_queue_init
创建的并发队列dq_atomic_flags
=1
全局并发队列.dgq_thread_pool_size =1
,而创建的并发队列.dgq_thread_pool_size=0
所以上面的do while就是线程池调度的规则。那么到底可以开辟多少线程呢?
dgq_thread_pool_size 线程池大小
static inline void
_dispatch_root_queue_init_pthread_pool(dispatch_queue_global_t dq,
int pool_size, dispatch_priority_t pri)
{
//...
int thread_pool_size = DISPATCH_WORKQ_MAX_PTHREAD_COUNT;
dq->dgq_thread_pool_size = thread_pool_size;
}
复制代码
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
复制代码
也就是线程池的最大数量是255
查找苹果官方文档pthread说明,512kb的大小但是最小能开16kb
我们知道栈的内存是一定的,如果一个线程占的空间越大那么可以开辟的数量就越少,所以当是线程大小是16kb的时候此时开的线程数量是最多的。假如内核态1gb全部用来开线程的话,那么数量就在
- 最大:1024 * 1024 / 16 = 64 * 1024
- 最小:1024 * 1024 /512 = 2048
所以这个线程数量是不固定的。