OC底层原理15之多线程GCD的函数与队列

推荐先看多线程原理,了解各种概念

本章内容

  1. 函数与队列
  2. 同步或异步函数的任务块怎么执行的
  3. 队列是怎么创建的

本章目的

  1. 理解同步异步函数,串行并发队列
  2. 理解函数任务的调用执行,比较重要
  3. 了解串行,并发队列

提示:本章代码十分繁琐,但是流程一些东西,你可以过一眼看看就行了,并不用过于关心,了解底层是为了我们以后更好开发。

函数与队列

这里函数意思是指同步或异步执行。而任务的执行是离不开队列的。队列就是任务执行的排序,你可以这样认为。

函数

GCD的任务执行分为同步(任务会一步一步走,比较护犊子)或异步(任务可能没走完就走下面的东西)例如:

同步

执行结果会先执行 1111 再执行2222。
image.png

异步

执行结果会是 2222 1111。
image.png

队列

其实队列只是两种:串行和并发队列,
串行(口子窄只能一个个执行FIFO,先进先出,想象单排)。并发(口子大,能一块执行,想象多排)

  1. 串行队列:dispatch_queue_t queue = dispatch_queue_create("testCurrent", DISPATCH_QUEUE_SERIAL)

  2. 并发队列:dispatch_queue_t queue = dispatch_queue_create("testCurrent", DISPATCH_QUEUE_CONCURRENT)

  3. 全局队列:dispatch_queue_t queue = dispatch_get_global_queue(0, 0) 属于并发队列

  4. 主队列:dispatch_queue_t queue = dispatch_get_main_queue()属于串行队列

几种情况

提示:其实同步函数是不会开辟线程的,这点探究我会在下面源码中指出。另外主队列和全局队列想一下就明白了。在源码中无论是什么执行方式它们的源码会越来越趋于相似,这点你可以不必记。

  1. 同步串行:不开线程,按顺序执行。
  2. 同步并发:不开线程,按顺序执行。
  3. 异步串行:开线程(但是只开一个),桉顺序执行。
  4. 异步并发:开线程,无序执行。

几个问题

这个输出结果是什么?
    __block int num = 0;
    while (num < 5) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            num++;
        });
    }
    NSLog(@"end : %d",num);
输出答案:大于等于5
复制代码
这个输出结果是什么?
    __block int num = 0;
    for (int i= 0; i<10000; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            num++;
        });
    }
    NSLog(@"end : %d",num);
输出答案:小于等于10000
复制代码
这个输出的结果是什么?如果改成并发呢?
dispatch_queue_t queue = dispatch_queue_create("com.lg.cooci.cn", DISPATCH_QUEUE_SERIAL);

    dispatch_async(queue, ^{
        sleep(0.1);
        NSLog(@"1");
    });
    
    dispatch_async(queue, ^{
        NSLog(@"2");
    });

    dispatch_sync(queue, ^{ NSLog(@"3"); });
    
    NSLog(@"0");

    dispatch_async(queue, ^{
        NSLog(@"7");
    });
    dispatch_async(queue, ^{
        NSLog(@"8");
    });
    dispatch_async(queue, ^{
        NSLog(@"9");
    });
输出结果:
1. 串行情况:1,2,3,0,7,8,9
2. 并发情况:1有sleep所以大概率会往后,2,3顺序不一定。 0 在3后面,7,8,9顺序不固定
复制代码

队列的创建

对于dispatch_queue_t底层本质就是个结构体里面包含了isa指针等东西。

从外向内是,dispatch_queue_t -> dispatch_queue_s -> dispatch_object_s -> _os_object_s -> dispatch_object_t(这个是个联合体包含了前面两种。你可以认为这个是GCD底层的根类)

上面的继承关系你可以认为是 NSObject -> objc_class -> objc_object

在这里你可以认为”结构体继承“

typedef union {
	struct _os_object_s *_os_obj;
	struct dispatch_object_s *_do; //
	struct dispatch_queue_s *_dq; //
	struct dispatch_queue_attr_s *_dqa;
	struct dispatch_group_s *_dg;
	struct dispatch_source_s *_ds;
	struct dispatch_channel_s *_dch;
	struct dispatch_mach_s *_dm;
	struct dispatch_mach_msg_s *_dmsg;
	struct dispatch_semaphore_s *_dsema;
	struct dispatch_data_s *_ddata;
	struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
复制代码
这个没什么意义是根据 DISPATCH_DECL(dispatch_queue)来看的,也就是我们自己点dispatch_queue_t的时候看到的。这里展示的是直接赋值后的宏
// dispatch_queue_t
#define DISPATCH_DECL(dispatch_queue) OS_OBJECT_DECL_SUBCLASS(dispatch_queue, dispatch_object)

// OS_dispatch_object,宏定义中##符号是连接符,你看源码的时候会看到
#define OS_OBJECT_DECL_SUBCLASS(dispatch_queue, dispatch_object)  OS_OBJECT_DECL_IMPL(dispatch_queue, NSObject, <OS_dispatch_object>)

#define OS_OBJECT_DECL_IMPL(dispatch_queue, NSObject, ...) 
OS_OBJECT_DECL_PROTOCOL(dispatch_queue, __VA_ARGS__)  
typedef NSObject<OS_dispatch_queue>  
* __attribute__((objc_independent_class)) dispatch_queue_t

#define OS_OBJECT_DECL_PROTOCOL(dispatch_queue, ...) 
@protocol OS_OBJECT_CLASS(dispatch_queue) __VA_ARGS__ 
@end

#define OS_OBJECT_CLASS(dispatch_queue) OS_dispatch_queue
复制代码
  1. 队列有两种形式,而GCD又扩充了全局队列和主队列。我们暂且记住_dispatch_lane_create_with_target这个函数
    // 没什么看的中间过程,只是一个api。
    dispatch_queue_t
    dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
    {
    return _dispatch_lane_create_with_target(label, attr,
    		DISPATCH_TARGET_QUEUE_DEFAULT, true);
    }
    复制代码
  2. _dispatch_lane_create_with_target函数,最主要做了两件事,就是对优先级以及传过来的attr的处理,以及队列的创建。第一件事就是为队列创建所准备的,所以核心就是_dispatch_queue_init这个函数,本文不再展示,因为没什么东西,无非就是结构体成员的赋值等等
    static dispatch_queue_t
    _dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa, dispatch_queue_t tq, bool legacy)
    {
            //dqai是个结构体,而这里会对其dqa进行包装,当我们传NULL的时候会直接返回空实现,就是串行队列
            // 就是对优先级什么的包装
            dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
    
            //
            // Step 1: Normalize arguments (qos, overcommit, tq)
            //
            // 优先级的处理,并发队列和串行队列的dispatch_queue_attr_t处理
            dispatch_qos_t qos = dqai.dqai_qos;
        #if !HAVE_PTHREAD_WORKQUEUE_QOS
            if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
                    dqai.dqai_qos = qos = DISPATCH_QOS_USER_INITIATED;
            }
            if (qos == DISPATCH_QOS_MAINTENANCE) {
                    dqai.dqai_qos = qos = DISPATCH_QOS_BACKGROUND;
            }
        #endif // !HAVE_PTHREAD_WORKQUEUE_QOS
    
            _dispatch_queue_attr_overcommit_t overcommit = dqai.dqai_overcommit;
            if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
                    if (tq->do_targetq) {
                            DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                                            "a non-global target queue");
                    }
            }
    
            if (tq && dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
                    // Handle discrepancies between attr and target queue, attributes win
                    if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
                            if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
                                    overcommit = _dispatch_queue_attr_overcommit_enabled;
                            } else {
                                    overcommit = _dispatch_queue_attr_overcommit_disabled;
                            }
                    }
                    if (qos == DISPATCH_QOS_UNSPECIFIED) {
                            qos = _dispatch_priority_qos(tq->dq_priority);
                    }
                    tq = NULL;
            } else if (tq && !tq->do_targetq) {
                    // target is a pthread or runloop root queue, setting QoS or overcommit
                    // is disallowed
                    if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
                            DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
                                            "and use this kind of target queue");
                    }
            } else {
                    if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
                            // Serial queues default to overcommit!
                            overcommit = dqai.dqai_concurrent ?
                                            _dispatch_queue_attr_overcommit_disabled :
                                            _dispatch_queue_attr_overcommit_enabled;
                    }
            }
            if (!tq) {
                    tq = _dispatch_get_root_queue(
    			qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
    			overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;
                    if (unlikely(!tq)) {
                            DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
                    }
            }
    
            //
            // Step 2: Initialize the queue
            //
    
            if (legacy) {
                    // if any of these attributes is specified, use non legacy classes
                    if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
                            legacy = false;
                    }
            }
    
            const void *vtable;
            dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
            if (dqai.dqai_concurrent) {
                    vtable = DISPATCH_VTABLE(queue_concurrent);
            } else {
                    vtable = DISPATCH_VTABLE(queue_serial);
            }
            switch (dqai.dqai_autorelease_frequency) {
            case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
                    dqf |= DQF_AUTORELEASE_NEVER;
                    break;
            case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
                    dqf |= DQF_AUTORELEASE_ALWAYS;
                    break;
            }
            if (label) {
                    const char *tmp = _dispatch_strdup_if_mutable(label);
                    if (tmp != label) {
                            dqf |= DQF_LABEL_NEEDS_FREE;
                            label = tmp;
                    }
            }
            // 提示一点,在这里如果你从源码跟下去的话,会发现一个继承连
            // dispatch_lane_t -> dispatch_lane_s ->
            
            // 这里是dq的内存开辟申请
            dispatch_lane_t dq = _dispatch_object_alloc(vtable,
                            sizeof(struct dispatch_lane_s));
                            
            // 初始化,如果说是并发就传DISPATCH_QUEUE_WIDTH_MAX,否则传1。
            // 也就是说在这里就可以确定队列的类型了 --- 串行还是并发
            _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
                                DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
                            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
    
            dq->dq_label = label;
            dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
                            dqai.dqai_relpri);
            if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
                    dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
            }
            if (!dqai.dqai_inactive) {
                    _dispatch_queue_priority_inherit_from_target(dq, tq);
                    _dispatch_lane_inherit_wlh_from_target(dq, tq);
            }
            _dispatch_retain(tq);
            dq->do_targetq = tq;
            _dispatch_object_debug(dq, "%s", __func__);
            return _dispatch_trace_queue_create(dq)._dq;
        }
    复制代码

主队列(串行队列)

主队列我们研究的是dispatch_get_main_queue这个api

// 这个因为环境的不同有不同,我只拿了一个,而在这种情况下
// 其实 OS_OBJECT_BRIDGE 这个宏是空的。
#define DISPATCH_GLOBAL_OBJECT(type, object) ((OS_OBJECT_BRIDGE type)&(object))

dispatch_queue_main_t
dispatch_get_main_queue(void)
{
// 真正的结果是:dispatch_queue_main_t & _dispatch_main_q
// 而dispatch_queue_main_t是type,所以我们要看_dispatch_main_q
    return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
}
复制代码

关于serialnum的补充:

// skip zero 越过0,代表这个值不会为0

// 1 – main_q 代表主队列

// 2 – mgr_q

// 3 – mgr_root_q

// 4,5,6,7,8,9,10,11,12,13,14,15 – global queues

// 17 – workloop_fallback_q

struct dispatch_queue_static_s _dispatch_main_q = {
	DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
	.do_targetq = _dispatch_get_default_queue(true),
#endif
	.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
			DISPATCH_QUEUE_ROLE_BASE_ANON,
        // 看这里,就是线程的名字了,你可以这样认为
	.dq_label = "com.apple.main-thread",
        // 这里表明了主队列是串行队列
	.dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
        // 不要看到这里就以为说主队列是串行队列,这个只是一个标识符
	.dq_serialnum = 1,
};
复制代码

函数的源码分析

上面的内容都是我们在开发中已经知道的,但是同步函数或者异步函数的任务是如何执行的?我们不得而知,下面进行libdispatch研究。先研究其任务的执行过程。

同步函数

dispatch_sync是研究的方向,看源码后你会发现同步函数并不会线程创建,我们首先确定一点这个api第一个参数是队列,第二个参数是任务也就是block。展示这个函数我是源码的顺序展示。这里分析的是:同步全局并发

dispatch_sync

// dq是队列,work是任务
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
        // 这是一个标识不用管
	uintptr_t dc_flags = DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	}
        // 这点跟block的底层相似,一个包装函数
        // #define _dispatch_Block_invoke(bb)  ((dispatch_function_t)((struct Block_layout *)bb)->invoke)
        
	_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码

_dispatch_sync_f

dq 是队列,ctxt是work也就是任务块,func是包装的block,dc_flags是标识

static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
		uintptr_t dc_flags)
{
	_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
复制代码

_dispatch_sync_f_inline

在这个函数中,如果是你第一次进来你只需要关心_dispatch_barrier_sync_f_dispatch_sync_f_slow这两个情况。

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
        // 这里说明了如果是串行队列的话走,其实我们下面所研究的栅栏函数也有这个过程
        // 同步串行
	if (likely(dq->dq_width == 1)) {
		return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
	}
        // 下面就是并发队列以及其他的情况了
        // 底层会崩溃的
	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	}
        // 记得我们上面研究的继承关系。
	dispatch_lane_t dl = upcast(dq)._dl;
	// Global concurrent queues and queues bound to non-dispatch threads
	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
        
        // 这里提示一点,如果说发生死锁的时候也会走这里,记得这个函数。     
        // 如果是全局并发的时候也会走这里
	if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
		return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
	}
        // 这个不必去特别关心,里面是个do-while循环,后面会说明
	if (unlikely(dq->do_targetq->do_targetq)) {
		return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
	}
	_dispatch_introspection_sync_begin(dl);
	_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
			_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
复制代码

_dispatch_sync_f_slow

这个函数里有死锁的处理情况,也有其他同步函数的处理情况。
看到这里要记得对任务的处理有ctxt,func这两个参数

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
		dispatch_function_t func, uintptr_t top_dc_flags,
		dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
	dispatch_queue_t top_dq = top_dqu._dq;
	dispatch_queue_t dq = dqu._dq;
        // 对于全局并发的时候会走这里,而也是从这里去调用block的
        // _dispatch_sync_function_invoke会到下面这个函数
        // _dispatch_sync_function_invoke_inline
        
        if (unlikely(!dq->do_targetq)) {
		return _dispatch_sync_function_invoke(dq, ctxt, func);
	}

	pthread_priority_t pp = _dispatch_get_priority();
	struct dispatch_sync_context_s dsc = {
		.dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
		.dc_func     = _dispatch_async_and_wait_invoke,
		.dc_ctxt     = &dsc,
		.dc_other    = top_dq,
		.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
		.dc_voucher  = _voucher_get(),
		.dsc_func    = func,
		.dsc_ctxt    = ctxt,
		.dsc_waiter  = _dispatch_tid_self(),
	};

	_dispatch_trace_item_push(top_dq, &dsc);
        // 如果发生死锁就在这里给处理了
	__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

	if (dsc.dsc_func == NULL) {
		// dsc_func being cleared means that the block ran on another thread ie.
		// case (2) as listed in _dispatch_async_and_wait_f_slow.
		dispatch_queue_t stop_dq = dsc.dc_other;
		return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
	}

	_dispatch_introspection_sync_begin(top_dq);
	_dispatch_trace_item_pop(top_dq, &dsc);
	_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
			DISPATCH_TRACE_ARG(&dsc));
}
复制代码

_dispatch_sync_function_invoke_inline

这个函数就会对任务进行执行了_dispatch_client_callout就是去执行block了

static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
		dispatch_function_t func)
{
	dispatch_thread_frame_s dtf;
	_dispatch_thread_frame_push(&dtf, dq);
        // 去执行任务,里面其实就是func(ctxt)
	_dispatch_client_callout(ctxt, func);
	_dispatch_perfmon_workitem_inc();
	_dispatch_thread_frame_pop(&dtf);
}
复制代码

异步函数

异步函数的处理就比同步要复杂一点,异步函数会在某些情况创建线程,开始研究dispatch_async

dispatch_async

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
        // 跟同步一样有一个标识
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;
        // 这一点封装了队列以及任务。这里与同步不同进行包装了
	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码

_dispatch_continuation_init

在这个里面就进行了异步函数的一些处理,例如优先级,任务的保存等为的是以后CPU调度的时候有所依据,以及任务执行的时候的回调

static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu, dispatch_block_t work,
		dispatch_block_flags_t flags, uintptr_t dc_flags)
{
	void *ctxt = _dispatch_Block_copy(work);

	dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		dc->dc_flags = dc_flags;
		dc->dc_ctxt = ctxt;
		// will initialize all fields but requires dc_flags & dc_ctxt to be set
		return _dispatch_continuation_init_slow(dc, dqu, flags);
	}

	dispatch_function_t func = _dispatch_Block_invoke(work);
	if (dc_flags & DC_FLAG_CONSUME) {
		func = _dispatch_call_block_and_release;
	}
        // 在这里对优先级的处理与任务等封装。
        // 为什么要这么做?因为异步函数要异步执行,所以要进行封装这样能够根据优先级去安排执行。所以这里会进行保存
	return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
复制代码

_dispatch_continuation_async

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
		dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
        // 这里是记录痕迹的,不用看,跟我们没关系
	if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
		_dispatch_trace_item_push(dqu, dc);
	}
#else
	(void)dc_flags;
#endif
    // 这里就是关键了,dx_push其实是调用的dq_push。
    // 记住任务还有优先级这些东西在qos里面。
    // #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
    // 而dx_vtable这个函数我们可以猜到肯定是判断到底执行的哪个函数
	return dx_push(dqu._dq, dc, qos);
}
复制代码

dq_push

看一个图,可以知道里面执行了很多种,而这里只看_dispatch_root_queue_push也就是全局并发。

提示:普通并发_dispatch_lane_concurrent_push其实有可能执行的是串行的_dispatch_lane_push,但是如果有栅栏函数参与的话就可能会执行_dispatch_continuation_redirect_push
image.png

image.png

_dispatch_root_queue_push

void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
	dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
	if (unlikely(ddi && ddi->ddi_can_stash)) {
		dispatch_object_t old_dou = ddi->ddi_stashed_dou;
		dispatch_priority_t rq_overcommit;
		rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;

		if (likely(!old_dou._do || rq_overcommit)) {
			dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
			dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
			ddi->ddi_stashed_rq = rq;
			ddi->ddi_stashed_dou = dou;
			ddi->ddi_stashed_qos = qos;
			_dispatch_debug("deferring item %p, rq %p, qos %d",
					dou._do, rq, qos);
			if (rq_overcommit) {
				ddi->ddi_can_stash = false;
			}
			if (likely(!old_dou._do)) {
				return;
			}
			// push the previously stashed item
			qos = old_qos;
			rq = old_rq;
			dou = old_dou;
		}
	}
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
	if (_dispatch_root_queue_push_needs_override(rq, qos)) {
		return _dispatch_root_queue_push_override(rq, dou, qos);
	}
#else
	(void)qos;
#endif
	_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
复制代码

_dispatch_root_queue_push_inline

static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
		dispatch_object_t _head, dispatch_object_t _tail, int n)
{
	struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
	if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
		return _dispatch_root_queue_poke(dq, n, 0);
	}
}
复制代码

_dispatch_root_queue_poke

void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
	if (!_dispatch_queue_class_probe(dq)) {
		return;
	}
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
	if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
	{
		if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
			_dispatch_root_queue_debug("worker thread request still pending "
					"for global queue: %p", dq);
			return;
		}
	}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
	return _dispatch_root_queue_poke_slow(dq, n, floor);
}
复制代码

_dispatch_root_queue_poke_slow

static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
	int remaining = n;
#if !defined(_WIN32)
	int r = ENOSYS;
#endif
        // 这里有单例调用。并且在这里_dispatch_root_queues_init_once函数中也有任务的执行回调_dispatch_worker_thread2
        // 会对线程池等等一系列进行判断操作,也有任务的调用执行。
	_dispatch_root_queues_init();
	_dispatch_debug_root_queue(dq, __func__);
	_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);

#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
	if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
	{
		_dispatch_root_queue_debug("requesting new worker thread for global "
				"queue: %p", dq);
		r = _pthread_workqueue_addthreads(remaining,
				_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
		(void)dispatch_assume_zero(r);
		return;
	}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
	dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
	if (likely(pqc->dpq_thread_mediator.do_vtable)) {
		while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
			_dispatch_root_queue_debug("signaled sleeping worker for "
					"global queue: %p", dq);
			if (!--remaining) {
				return;
			}
		}
	}

	bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
	if (overcommit) {
		os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
	} else {
		if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
			_dispatch_root_queue_debug("worker thread request still pending for "
					"global queue: %p", dq);
			return;
		}
	}

	int can_request, t_count;
	// seq_cst with atomic store to tail <rdar://problem/16932833>
	t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
	do {
		can_request = t_count < floor ? 0 : t_count - floor;
		if (remaining > can_request) {
			_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
					remaining, can_request);
			os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
			remaining = can_request;
		}
		if (remaining == 0) {
			_dispatch_root_queue_debug("pthread pool is full for root queue: "
					"%p", dq);
			return;
		}
	} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
			t_count - remaining, &t_count, acquire));

#if !defined(_WIN32)
	pthread_attr_t *attr = &pqc->dpq_thread_attr;
	pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
	if (unlikely(dq == &_dispatch_mgr_root_queue)) {
		pthr = _dispatch_mgr_root_queue_init();
	}
#endif
	do {
		_dispatch_retain(dq); // released in _dispatch_worker_thread
		while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
			if (r != EAGAIN) {
				(void)dispatch_assume_zero(r);
			}
			_dispatch_temporary_resource_shortage();
		}
	} while (--remaining);
#else // defined(_WIN32)
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
	if (unlikely(dq == &_dispatch_mgr_root_queue)) {
		_dispatch_mgr_root_queue_init();
	}
#endif
	do {
		_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
		unsigned dwStackSize = 0;
#else
		unsigned dwStackSize = 64 * 1024;
#endif
		uintptr_t hThread = 0;
		while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
			if (errno != EAGAIN) {
				(void)dispatch_assume(hThread);
			}
			_dispatch_temporary_resource_shortage();
		}
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
		if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
			(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
		}
#endif
		CloseHandle((HANDLE)hThread);
	} while (--remaining);
#endif // defined(_WIN32)
#else
	(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享