这是我参与8月更文挑战的第18天,活动详情查看:[8月更文挑战]
1.单例模式
GCD中使用单例
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
NSLog(@"block---:%@",[NSThread currentThread]);
});
复制代码
来到源码,找到dispatch_once
函数的实现
void dispatch_once(dispatch_once_t *val, dispatch_block_t block) {
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
复制代码
进入dispatch_once_f
函数
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
复制代码
- 将val强转为dispatch_once_gate_t类型,类似于栅栏的使用
三个条件分支:
- 如果执行完成,直接返回
- 如果第一次,执行_dispatch_once_callout函数
- 吐过正在执行,进入_dispatch_once_wait等待
1.1 锁的处理
进入_dispatch_once_gate_tryenter函数
static inline bool _dispatch_once_gate_tryenter(dispatch_once_gate_t l) {
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED, (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
复制代码
- 进行原子锁的处理,防止多线程
1.2 执行任务
进入_dispatch_once_callout函数
static void _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt, dispatch_function_t func) {
_dispatch_client_callout(ctxt, func);
_dispatch_once_gate_broadcast(l);
}
void _dispatch_client_callout(void *ctxt, dispatch_function_t f) {
_dispatch_get_tsd_base();
void *u = _dispatch_get_unwind_tsd();
if (likely(!u)) return f(ctxt);
_dispatch_set_unwind_tsd(NULL);
f(ctxt);
_dispatch_free_unwind_tsd();
_dispatch_set_unwind_tsd(u);
}
复制代码
- 通过f(ctxt)执行任务的回调
进入_dispatch_once_gate_broadcast函数
static inline void _dispatch_once_gate_broadcast(dispatch_once_gate_t l) {
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
复制代码
- 锁的处理,并标记为完成
单例模式的原理:
- 调用
dispatch_once
函数,传入onceToken
和block
。其中onceToken
为静态变量,具有唯一性
,在底层被强转为dispatch_once_gate_t
类型的变量l
,l
通过os_atomic_load
函数获取底层原子封装性的关联,得到变量v
,通过v来查询任务的状态,如果此时v等于BLOCK_ONCE_DONE
,说明任务已经处理过一次了,直接return
- 如果任务首次执行,将任务进行加锁,任务状态置为
DLOCK_ONCE_UNLOCK
,目的保证线程安全。加锁后进行block回调函数的执行,执行完毕后,将当前任务解锁,将当前任务状态置为DLOCK_ONCE_DONE
,在下次进来时就不会再执行,会直接return
- 如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的
2. 线程池
2.1 创建线程
异步函数在_dispatch_root_queue_poke_slow
中,如果是全局队列,使用_pthread_workqueue_addthreads
函数创建并执行
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global " "queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
复制代码
如果是普通队列,使用do...while
进行线程池的创建,在创建之前,还要对线程池的状态进行判断
int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d", remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq);
return;
}
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count, t_count - remaining, &t_count, acquire));
复制代码
- 判断
dgq_thread_pool_size
,源码中标记为1
dgq_thread_pool_size
会根据逻辑自增,加到最大值为止remaining
和floor
为入参,传入1和0- 计算
can_request
线程数,如果t_count
小于floor
返回0,否则返回t_count
减去floor
的差值 - 如果
remaining
线程数大于can_request
,pthread
线程池减少请求,以can_request
线程数为准 - 如果
remaining
为0 ,表示根队列的pthread
线程池已满
使用pthread_creat
函数,创建线程
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
复制代码
2.2 最大线程数
线程池的最大线程数的设定
int thread_pool_size = DISPATCH_WORKQ_MAX_PTHREAD_COUNT;
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
复制代码
- 最大线程数设置255,但实际程序中开辟的线程数,不一定能达到这个最大值
官方文档中,辅助线程为512KB,辅助线程允许的最小堆栈大小为16KB,并且堆栈大小必须是4KB的倍数
程序启动,系统给出的虚拟内存4GB,用户态占3GB,内核态占1GB。但内核态的1GB并不能全部用来开辟线程,所以最大线程数是未知的
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
unsigned dwStackSize = 0;
#else
unsigned dwStackSize = 64 * 1024;
#endif
uintptr_t hThread = 0;
while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
if (errno != EAGAIN) {
(void)dispatch_assume(hThread);
}
_dispatch_temporary_resource_shortage();
}
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
}
#endif
CloseHandle((HANDLE)hThread);
} while (--remaining);
复制代码
- 按照内核态
1GB
满载,最小堆栈大小为16KB
计算,最大线程数可开辟64*1024
。按照辅助线程512KB
计算,最大线程数可开辟2048
3.栅栏函数
iOS中有两种栅栏函数,都是用于控制任务的执行顺序
- dispatch_barrier_async:异步栅栏函数,前面的任务执行完毕才会来到这里
- dispatch_barrier_sync:同步栅栏函数,和异步栅栏函数的作用相同,但是同步栅栏函数会堵塞线程,影响后面的任务执行
使用栅栏函数的注意事项:
- 栅栏函数只能控制同一并发队列
- 同步栅栏函数添加队列,当前线程会被锁死,直到栅栏函数之前的任务和栅栏本身的任务执行完毕,当前线程才会继续执行
- 全局并发队列不支持栅栏函数,因为可能会干扰系统级的任务执行
- 如果是串行队列,使用栅栏函数的作用等同于一个同步函数,没有任何意义
栅栏函数还可以用于线程安全,类似于锁的作用
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i<10000; i++) {
dispatch_async(concurrentQueue, ^{
dispatch_barrier_async(concurrentQueue, ^{
[self.mArray addObject:[NSString stringWithFormat:@"%d",i]];
});
// @synchronized (self) {
// [self.mArray addObject:[NSString stringWithFormat:@"%d",i]];
// }
});
}
复制代码
- 此案例,如果不加栅栏函数,也不加互斥锁,使用并发队列多线程对同一数组进行
addObject
,很有可能会发生崩溃 - 因为数据的写入,本质是对旧值的
release
,对新值的retain
- 当数据不断
release
和retain
时,多线程会造成数据还没有retain
完毕,就开始进行release
,相当于加入空数据,进行release
3.1 同步栅栏函数分析
源码中,找到dispatch_barrier_sync
函数的实现
void dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work) {
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}}
复制代码
_dispatch_barrier_sync_f
–> _dispatch_barrier_sync_f_inline
static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) {
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl, DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
复制代码
- 逻辑中存在进入
_dispatch_sync_f_slow
函数的代码,证明同步栅栏函数也可能出现死锁的情况
_dispatch_sync_recurse
→_dispatch_sync_invoke_and_complete_recurse
→_dispatch_sync_complete_recurse
static void _dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq, uintptr_t dc_flags) {
bool barrier = (dc_flags & DC_FLAG_BARRIER);
do {
if (dq == stop_dq) return;
if (barrier) {
dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
} else {
_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
}
dq = dq->do_targetq;
barrier = (dq->dq_width == 1);
} while (unlikely(dq->do_targetq)); }
复制代码
- 判断
targetq
,存在栅栏调用dx_wakeup
等待 - 否则,调用
_dispatch_lane_non_barrier_complete
函数
并发队列的dx_wakeup
实现
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
复制代码
进入_dispatch_lane_wakeup
函数
void _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags) {
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
复制代码
- 针对栅栏函数进行判断,进入
_dispatch_lane_barrier_complete
函数
进入_dispatch_lane_barrier_complete
函数
static void _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags) {
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
dispatch_lane_t dq = dqu._dl;
if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
if (_dispatch_object_is_waiter(dc)) {
return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
}
} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
return _dispatch_lane_drain_non_barriers(dq, dc, flags);
}
if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
uint64_t owned = DISPATCH_QUEUE_IN_BARRIER + dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
复制代码
- 如果是串行队列,栅栏相当于同步函数,调用
_dispatch_lane_drain_barrier_waiter
函数 - 如果是并发队列,调用
_dispatch_lane_drain_non_barriers
函数,进行栅栏相关处理 - 栅栏之前的任务全部完成,调用
_dispatch_lane_class_barrier_complete
函数
3.2 全局队列中的栅栏函数
全局队列的dx_wakeup实现
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
复制代码
进入_dispatch_root_queue_wakeup函数
void _dispatch_root_queue_wakeup(dispatch_queue_global_t dq, DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags) {
if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
DISPATCH_INTERNAL_CRASH(dq->dq_priority, "Don't try to wake up or override a root queue");
}
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
return _dispatch_release_2_tailcall(dq);
}
}
复制代码
- 全局队列中,没有对栅栏函数的任何判断和处理。所以,栅栏函数在全局队列中,和普通的同步或者异步函数别无二致
4. 信号量
信号量可以让异步任务同步执行,可以当锁使用,并且能够控制GCD最大并发数
dispatch_semaphore_t sem = dispatch_semaphore_create(0);
for (int i = 0; i < 10; i++) {
dispatch_async(queue, ^{
sleep(1);
NSLog(@"当前 - %d, 线程 - %@", i, [NSThread currentThread]);
dispatch_semaphore_signal(sem);
});
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
}
复制代码
–dispatch_semaphore_creat
:初始化信号量,设置GCD最大并发数,必须>=0
–dispatch_semaphore_wait
:等待,对信号量减1,相当于加锁
–dispatch_semaphore_signal
:释放,对信号量加1,相当于解锁
4.1 创建
进入dispatch_semaphore_creat
函数
dispatch_semaphore_t dispatch_semaphore_create(intptr_t value) {
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
复制代码
- 初始化信号量,设置GCD最大并发数
- 最大并发数必须 >= 0
4.2 等待
进入dispatch_semaphore_wait
函数
intptr_t dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
复制代码
- o
s_atomic_dec2o
宏,进行减1操作 - 若信号量
>= 0
,直接返回0,执行wait
之后的代码 - 若信号量
< 0
,将阻塞当前状态,进入_dispatch_semaphore_wait_slow
函数
进入_dispatch_semaphore_wait_slow
函数
static intptr_t _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
复制代码
- 根据
timeout
的值进行不同的逻辑处理
如果为DISPATCH_TIME_FOREVER
类型,进入_dispatch_sema4_wait
函数
void _dispatch_sema4_wait(_dispatch_sema4_t *sema) {
int ret = 0;
do {
ret = sem_wait(sema);
} while (ret == -1 && errno == EINTR);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
复制代码
- 核心代码
do...while
,通过循环使得下面的代码无法执行
4.3 释放
进入dispatch_semaphore_signal
函数
intptr_t dispatch_semaphore_signal(dispatch_semaphore_t dsema) {
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
复制代码
os_atomic_inc2o
宏,进行加1操作- 若信号量
> 0
,直接返回0,继续执行后续代码 - 若信号量等于
LONG_MIN
,抛出异常。这种情况表示wait
操作过多,二者之间无法匹配。之后会调用_dispatch_semaphore_signal_slow
函数,进入延迟等待。