iOS 底层探究:多线程之GCD分析(单例、线程池、栅栏函数与信号量)

这是我参与8月更文挑战的第18天,活动详情查看:[8月更文挑战]

1.单例模式

GCD中使用单例

static dispatch_once_t onceToken; 
dispatch_once(&onceToken, ^{ 
    NSLog(@"block---:%@",[NSThread currentThread]); 
});
复制代码

来到源码,找到dispatch_once函数的实现

void dispatch_once(dispatch_once_t *val, dispatch_block_t block) {
    dispatch_once_f(val, block, _dispatch_Block_invoke(block)); 
}
复制代码

进入dispatch_once_f函数

void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) { 
    dispatch_once_gate_t l = (dispatch_once_gate_t)val; 
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER 
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire); 
    if (likely(v == DLOCK_ONCE_DONE)) { 
        return;
    } 
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER 
    if (likely(DISPATCH_ONCE_IS_GEN(v))) { 
        return _dispatch_once_mark_done_if_quiesced(l, v); 
    } 
#endif 
#endif 
    if (_dispatch_once_gate_tryenter(l)) { 
        return _dispatch_once_callout(l, ctxt, func);
    } 
    return _dispatch_once_wait(l); 
}
复制代码
  • 将val强转为dispatch_once_gate_t类型,类似于栅栏的使用

三个条件分支:

  • 如果执行完成,直接返回
  • 如果第一次,执行_dispatch_once_callout函数
  • 吐过正在执行,进入_dispatch_once_wait等待

1.1 锁的处理

进入_dispatch_once_gate_tryenter函数

static inline bool _dispatch_once_gate_tryenter(dispatch_once_gate_t l) { 
    return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED, (uintptr_t)_dispatch_lock_value_for_self(), relaxed); 
}
复制代码
  • 进行原子锁的处理,防止多线程

1.2 执行任务

进入_dispatch_once_callout函数

static void _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt, dispatch_function_t func) { 
    _dispatch_client_callout(ctxt, func); 
    _dispatch_once_gate_broadcast(l); 
} 
void _dispatch_client_callout(void *ctxt, dispatch_function_t f) { 
    _dispatch_get_tsd_base(); 
    void *u = _dispatch_get_unwind_tsd(); 
    if (likely(!u)) return f(ctxt); 
    _dispatch_set_unwind_tsd(NULL); 
    f(ctxt); 
    _dispatch_free_unwind_tsd(); 
    _dispatch_set_unwind_tsd(u); 
}
复制代码
  • 通过f(ctxt)执行任务的回调

进入_dispatch_once_gate_broadcast函数

static inline void _dispatch_once_gate_broadcast(dispatch_once_gate_t l) { 
    dispatch_lock value_self = _dispatch_lock_value_for_self(); 
    uintptr_t v; 
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER 
    v = _dispatch_once_mark_quiescing(l); 
#else 
    v = _dispatch_once_mark_done(l); 
#endif 
    if (likely((dispatch_lock)v == value_self)) return; 
    _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
复制代码
  • 锁的处理,并标记为完成

单例模式的原理:

  • 调用dispatch_once函数,传入onceTokenblock。其中onceToken为静态变量,具有唯一性,在底层被强转为dispatch_once_gate_t类型的变量ll通过os_atomic_load函数获取底层原子封装性的关联,得到变量v,通过v来查询任务的状态,如果此时v等于BLOCK_ONCE_DONE,说明任务已经处理过一次了,直接return
  • 如果任务首次执行,将任务进行加锁,任务状态置为DLOCK_ONCE_UNLOCK,目的保证线程安全。加锁后进行block回调函数的执行,执行完毕后,将当前任务解锁,将当前任务状态置为DLOCK_ONCE_DONE,在下次进来时就不会再执行,会直接return
  • 如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的

2. 线程池

2.1 创建线程

异步函数在_dispatch_root_queue_poke_slow中,如果是全局队列,使用_pthread_workqueue_addthreads函数创建并执行

#if !DISPATCH_USE_INTERNAL_WORKQUEUE 
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES 
    if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) 
#endif 
    { 
        _dispatch_root_queue_debug("requesting new worker thread for global " "queue: %p", dq); 
        r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
        (void)dispatch_assume_zero(r); 
        return; 
    } 
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
复制代码

如果是普通队列,使用do...while进行线程池的创建,在创建之前,还要对线程池的状态进行判断

int can_request, t_count; 
// seq_cst with atomic store to tail <rdar://problem/16932833> 
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered); 
do { 
    can_request = t_count < floor ? 0 : t_count - floor; 
    if (remaining > can_request) { 
        _dispatch_root_queue_debug("pthread pool reducing request from %d to %d", remaining, can_request); 
        os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed); 
        remaining = can_request; 
    } 
    if (remaining == 0) { 
        _dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq); 
        return; 
    } 
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count, t_count - remaining, &t_count, acquire));
复制代码
  • 判断dgq_thread_pool_size,源码中标记为1
  • dgq_thread_pool_size会根据逻辑自增,加到最大值为止
  • remainingfloor为入参,传入1和0
  • 计算can_request线程数,如果t_count小于floor返回0,否则返回t_count减去floor的差值
  • 如果remaining线程数大于can_requestpthread线程池减少请求,以can_request线程数为准
  • 如果remaining为0 ,表示根队列的pthread线程池已满

使用pthread_creat函数,创建线程

do { 
    _dispatch_retain(dq); // released in _dispatch_worker_thread 
    while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) { 
        if (r != EAGAIN) { 
            (void)dispatch_assume_zero(r); 
        } 
        _dispatch_temporary_resource_shortage(); 
    } 
} while (--remaining);
复制代码

2.2 最大线程数

线程池的最大线程数的设定

int thread_pool_size = DISPATCH_WORKQ_MAX_PTHREAD_COUNT; 
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
复制代码
  • 最大线程数设置255,但实际程序中开辟的线程数,不一定能达到这个最大值

官方文档中,辅助线程为512KB,辅助线程允许的最小堆栈大小为16KB,并且堆栈大小必须是4KB的倍数
程序启动,系统给出的虚拟内存4GB,用户态占3GB,内核态占1GB。但内核态的1GB并不能全部用来开辟线程,所以最大线程数是未知的

do { 
    _dispatch_retain(dq); // released in _dispatch_worker_thread 
#if DISPATCH_DEBUG 
    unsigned dwStackSize = 0; 
#else 
    unsigned dwStackSize = 64 * 1024; 
#endif 
    uintptr_t hThread = 0; 
    while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) { 
        if (errno != EAGAIN) { 
            (void)dispatch_assume(hThread); 
        } 
        _dispatch_temporary_resource_shortage(); 
    } 
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES 
    if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
        (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE); 
    } 
#endif 
    CloseHandle((HANDLE)hThread); 
} while (--remaining);
复制代码
  • 按照内核态1GB满载,最小堆栈大小为16KB计算,最大线程数可开辟64*1024。按照辅助线程512KB计算,最大线程数可开辟2048

3.栅栏函数

iOS中有两种栅栏函数,都是用于控制任务的执行顺序

  • dispatch_barrier_async:异步栅栏函数,前面的任务执行完毕才会来到这里
  • dispatch_barrier_sync:同步栅栏函数,和异步栅栏函数的作用相同,但是同步栅栏函数会堵塞线程,影响后面的任务执行

使用栅栏函数的注意事项:

  • 栅栏函数只能控制同一并发队列
  • 同步栅栏函数添加队列,当前线程会被锁死,直到栅栏函数之前的任务和栅栏本身的任务执行完毕,当前线程才会继续执行
  • 全局并发队列不支持栅栏函数,因为可能会干扰系统级的任务执行
  • 如果是串行队列,使用栅栏函数的作用等同于一个同步函数,没有任何意义

栅栏函数还可以用于线程安全,类似于锁的作用

dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT); 
for (int i = 0; i<10000; i++) { 
    dispatch_async(concurrentQueue, ^{ 
        dispatch_barrier_async(concurrentQueue, ^{ 
            [self.mArray addObject:[NSString stringWithFormat:@"%d",i]]; 
        }); 
//      @synchronized (self) { 
//           [self.mArray addObject:[NSString stringWithFormat:@"%d",i]]; 
//       } 
    }); 
}
复制代码
  • 此案例,如果不加栅栏函数,也不加互斥锁,使用并发队列多线程对同一数组进行addObject,很有可能会发生崩溃
  • 因为数据的写入,本质是对旧值的release,对新值的retain
  • 当数据不断releaseretain时,多线程会造成数据还没有retain完毕,就开始进行release,相当于加入空数据,进行release

3.1 同步栅栏函数分析

源码中,找到dispatch_barrier_sync函数的实现

void dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work) { 
    uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK; 
    if (unlikely(_dispatch_block_has_private_data(work))) { 
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags); 
    } 
    _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags); 
}}
复制代码

_dispatch_barrier_sync_f –> _dispatch_barrier_sync_f_inline

static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { 
    dispatch_tid tid = _dispatch_tid_self(); 
    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) { 
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync"); 
    } 
    dispatch_lane_t dl = upcast(dq)._dl; 
    // The more correct thing to do would be to merge the qos of the thread 
    // that just acquired the barrier lock into the queue state. 
    // 
    // However this is too expensive for the fast path, so skip doing it.
    // The chosen tradeoff is that if an enqueue on a lower priority thread 
    // contends with this fast path, this thread may receive a useless override. 
    // 
    // Global concurrent queues and queues bound to non-dispatch threads 
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE 
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) { 
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl, DC_FLAG_BARRIER | dc_flags); 
    } 
    if (unlikely(dl->do_targetq->do_targetq)) { 
        return _dispatch_sync_recurse(dl, ctxt, func, DC_FLAG_BARRIER | dc_flags); 
    } 
    _dispatch_introspection_sync_begin(dl); 
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags | DC_FLAG_BARRIER))); 
}
复制代码
  • 逻辑中存在进入_dispatch_sync_f_slow函数的代码,证明同步栅栏函数也可能出现死锁的情况

_dispatch_sync_recurse_dispatch_sync_invoke_and_complete_recurse_dispatch_sync_complete_recurse

static void _dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq, uintptr_t dc_flags) { 
bool barrier = (dc_flags & DC_FLAG_BARRIER); 
do { 
    if (dq == stop_dq) return; 
    if (barrier) { 
        dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE); 
    } else {
        _dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0); 
    } 
    dq = dq->do_targetq; 
    barrier = (dq->dq_width == 1); 
} while (unlikely(dq->do_targetq)); }
复制代码
  • 判断targetq,存在栅栏调用dx_wakeup等待
  • 否则,调用_dispatch_lane_non_barrier_complete函数

并发队列的dx_wakeup实现

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane, 
    .do_type = DISPATCH_QUEUE_CONCURRENT_TYPE, 
    .do_dispose = _dispatch_lane_dispose, 
    .do_debug = _dispatch_queue_debug, 
    .do_invoke = _dispatch_lane_invoke, 
    .dq_activate = _dispatch_lane_activate, 
    .dq_wakeup = _dispatch_lane_wakeup, 
    .dq_push = _dispatch_lane_concurrent_push,
);
复制代码

进入_dispatch_lane_wakeup函数

void _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags) { 
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE; 
    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) { 
        return _dispatch_lane_barrier_complete(dqu, qos, flags); 
    } 
    if (_dispatch_queue_class_probe(dqu)) { 
        target = DISPATCH_QUEUE_WAKEUP_TARGET; 
    } 
    return _dispatch_queue_wakeup(dqu, qos, flags, target); 
}
复制代码
  • 针对栅栏函数进行判断,进入_dispatch_lane_barrier_complete函数

进入_dispatch_lane_barrier_complete函数

static void _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags) { 
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    dispatch_lane_t dq = dqu._dl; 
    if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) { 
        struct dispatch_object_s *dc = _dispatch_queue_get_head(dq); 
        if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) { 
            if (_dispatch_object_is_waiter(dc)) { 
                return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0); 
            }
        } else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) { 
            return _dispatch_lane_drain_non_barriers(dq, dc, flags); 
        } 
        if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            _dispatch_retain_2(dq); 
            flags |= DISPATCH_WAKEUP_CONSUME_2; 
        } 
        target = DISPATCH_QUEUE_WAKEUP_TARGET; 
    } 
    uint64_t owned = DISPATCH_QUEUE_IN_BARRIER + dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL; 
    return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned); 
}
复制代码
  • 如果是串行队列,栅栏相当于同步函数,调用_dispatch_lane_drain_barrier_waiter函数
  • 如果是并发队列,调用_dispatch_lane_drain_non_barriers函数,进行栅栏相关处理
  • 栅栏之前的任务全部完成,调用_dispatch_lane_class_barrier_complete函数

3.2 全局队列中的栅栏函数

全局队列的dx_wakeup实现

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane, 
    .do_type         = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE, 
    .do_dispose      = _dispatch_object_no_dispose, 
    .do_debug        = _dispatch_queue_debug, 
    .do_invoke       = _dispatch_object_no_invoke, 
    .dq_activate     = _dispatch_queue_no_activate, 
    .dq_wakeup       = _dispatch_root_queue_wakeup, 
    .dq_push         = _dispatch_root_queue_push, 
);
复制代码

进入_dispatch_root_queue_wakeup函数

void _dispatch_root_queue_wakeup(dispatch_queue_global_t dq, DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags) {
    if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) { 
        DISPATCH_INTERNAL_CRASH(dq->dq_priority, "Don't try to wake up or override a root queue"); 
    } 
    if (flags & DISPATCH_WAKEUP_CONSUME_2) { 
        return _dispatch_release_2_tailcall(dq); 
    } 
}
复制代码
  • 全局队列中,没有对栅栏函数的任何判断和处理。所以,栅栏函数在全局队列中,和普通的同步或者异步函数别无二致

4. 信号量

信号量可以让异步任务同步执行,可以当锁使用,并且能够控制GCD最大并发数

dispatch_semaphore_t sem = dispatch_semaphore_create(0); 
for (int i = 0; i < 10; i++) { 
    dispatch_async(queue, ^{ 
        sleep(1); 
        NSLog(@"当前 - %d, 线程 - %@", i, [NSThread currentThread]);
        dispatch_semaphore_signal(sem); 
    }); 
    dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
}
复制代码

dispatch_semaphore_creat:初始化信号量,设置GCD最大并发数,必须>=0
dispatch_semaphore_wait:等待,对信号量减1,相当于加锁
dispatch_semaphore_signal:释放,对信号量加1,相当于解锁

4.1 创建

进入dispatch_semaphore_creat函数

dispatch_semaphore_t dispatch_semaphore_create(intptr_t value) { 
    dispatch_semaphore_t dsema; 
    // If the internal value is negative, then the absolute of the value is 
    // equal to the number of waiting threads. Therefore it is bogus to 
    // initialize the semaphore with a negative value. 
    if (value < 0) { 
        return DISPATCH_BAD_INPUT; 
    } 
    dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s)); 
    dsema->do_next = DISPATCH_OBJECT_LISTLESS; 
    dsema->do_targetq = _dispatch_get_default_queue(false); 
    dsema->dsema_value = value; 
    _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); 
    dsema->dsema_orig = value; 
    return dsema; 
}
复制代码
  • 初始化信号量,设置GCD最大并发数
  • 最大并发数必须 >= 0

4.2 等待

进入dispatch_semaphore_wait函数

intptr_t dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { 
    long value = os_atomic_dec2o(dsema, dsema_value, acquire); 
    if (likely(value >= 0)) { 
        return 0; 
    } 
    return _dispatch_semaphore_wait_slow(dsema, timeout); 
}
复制代码
  • os_atomic_dec2o宏,进行减1操作
  • 若信号量 >= 0,直接返回0,执行wait之后的代码
  • 若信号量 < 0,将阻塞当前状态,进入_dispatch_semaphore_wait_slow函数

进入_dispatch_semaphore_wait_slow函数

static intptr_t _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { 
    long orig; 
    _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); 
    switch (timeout) { 
    default: 
        if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { 
            break; 
        } 
        // Fall through and try to undo what the fast path did to 
        // dsema->dsema_value 
    case DISPATCH_TIME_NOW: 
        orig = dsema->dsema_value; 
        while (orig < 0) { 
            if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { 
                return _DSEMA4_TIMEOUT(); 
            } 
        } 
        // Another thread called semaphore_signal(). 
        // Fall through and drain the wakeup. 
    case DISPATCH_TIME_FOREVER: 
        _dispatch_sema4_wait(&dsema->dsema_sema); 
        break; 
    } 
    return 0; 
}
复制代码
  • 根据timeout的值进行不同的逻辑处理

如果为DISPATCH_TIME_FOREVER类型,进入_dispatch_sema4_wait函数

void _dispatch_sema4_wait(_dispatch_sema4_t *sema) { 
    int ret = 0; 
    do { 
        ret = sem_wait(sema); 
    } while (ret == -1 && errno == EINTR); 
    DISPATCH_SEMAPHORE_VERIFY_RET(ret); 
}
复制代码
  • 核心代码do...while,通过循环使得下面的代码无法执行

4.3 释放

进入dispatch_semaphore_signal函数

intptr_t dispatch_semaphore_signal(dispatch_semaphore_t dsema) { 
    long value = os_atomic_inc2o(dsema, dsema_value, release); 
    if (likely(value > 0)) {
        return 0; 
    } 
    if (unlikely(value == LONG_MIN)) { 
        DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()"); 
    } 
    return _dispatch_semaphore_signal_slow(dsema); 
}
复制代码
  • os_atomic_inc2o宏,进行加1操作
  • 若信号量 > 0,直接返回0,继续执行后续代码
  • 若信号量等于LONG_MIN,抛出异常。这种情况表示wait操作过多,二者之间无法匹配。之后会调用_dispatch_semaphore_signal_slow函数,进入延迟等待。
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享