OC底层原理探索之GCD源码分析终章

这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战

栅栏函数

栅栏函数最直接作用是控制任务的执行顺序产生同步的效果。

  • dispatch_barrier_async:前面的任务执行完毕才会来到这里
  • dispatch_barrier_sync:作用相同,但是会阻塞线程,影响后面函数的执行。

示例演示

- (void)demo1 {
    dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
    // 这里是可以的额!
    /* 1.异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"123");
    });
    /* 2.异步函数 */
    dispatch_async(concurrentQueue, ^{
        sleep(1);
        NSLog(@"456");
    });
    /* 3. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_async(concurrentQueue, ^{
        NSLog(@"----%@-----",[NSThread currentThread]);
    });
    /* 4. 异步函数 */
    dispatch_async(concurrentQueue, ^{
        NSLog(@"加载那么多,喘口气!!!");
    });
    // 5
    NSLog(@"**********起来干!!");
}
复制代码

image.png
这里使用了异步并发队列,在异步并发的时候使用栅栏函数,前面的任务执行完毕才会来到这里,但是不会阻塞后面任务的执行,所以步骤3栅栏函数必然在步骤1、2之后执行。步骤1、2无序,步骤4、5无序。
注意:如果我们把这个并发队列换成全局并发队列呢?

dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
复制代码

image.png
我们发现此时栅栏函数并没有效果了,也就是说在并发队列中的栅栏函数在全局并发队列中失效了,那么为什么呢?我们此时照例需要上一份dispatch的源码来一窥究竟

dispatch_barrier_sync

void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
	uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	}
	_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码

这个代码一样就跟之前介绍的同步函数的很像,我们定位下调用链
dispatch_barrier_sync -> _dispatch_barrier_sync_f_inline
通过符号断点我们定位到了_dispatch_sync_f_slow -> _dispatch_sync_invoke_and_complete_recurse -> _dispatch_sync_complete_recurse

static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
		uintptr_t dc_flags)
{
	bool barrier = (dc_flags & DC_FLAG_BARRIER);
	do {
		if (dq == stop_dq) return;
        // 是否存在barrier 存在的话 前面的队列全部执行
		if (barrier) {
			dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
		} else {
            // 不存在 执行普通的同步函数
			_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
		}
		dq = dq->do_targetq;
		barrier = (dq->dq_width == 1);
	} while (unlikely(dq->do_targetq));
}
复制代码

存在栅栏函数的话走dx_wakeup

#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
复制代码
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
	.do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
	.do_dispose     = _dispatch_lane_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_lane_invoke,

	.dq_activate    = _dispatch_lane_activate,
	.dq_wakeup      = _dispatch_lane_wakeup,
	.dq_push        = _dispatch_lane_concurrent_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
	.do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
	.do_dispose     = _dispatch_object_no_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_object_no_invoke,

	.dq_activate    = _dispatch_queue_no_activate,
	.dq_wakeup      = _dispatch_root_queue_wakeup,
	.dq_push        = _dispatch_root_queue_push,
);
复制代码

自己创建的并发队列的话=_dispatch_lane_wakeup,
全局并发队列的话=_dispatch_root_queue_wakeup

queue_concurrent VS queue_global

void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
	dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

	if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
		return _dispatch_lane_barrier_complete(dqu, qos, flags);
	}
	if (_dispatch_queue_class_probe(dqu)) {
		target = DISPATCH_QUEUE_WAKEUP_TARGET;
	}
	return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
复制代码
void
_dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
		DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
{
	if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
		DISPATCH_INTERNAL_CRASH(dq->dq_priority,
				"Don't try to wake up or override a root queue");
	}
	if (flags & DISPATCH_WAKEUP_CONSUME_2) {
		return _dispatch_release_2_tailcall(dq);
	}
}
复制代码

从源码的地方我们也可以明显的看出来不同。在全局并发队列中并没有判断跟栅栏函数有关的地方,而自己创建的并发队列则有对栅栏函数的判断_dispatch_lane_barrier_complete

static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
	dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
	dispatch_lane_t dq = dqu._dl;

	if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
		struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
        // 同步函数
		if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
			if (_dispatch_object_is_waiter(dc)) {
				return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
			}
		} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
			return _dispatch_lane_drain_non_barriers(dq, dc, flags);
		}
		// ...
	}
	//...
	return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
复制代码

如果是同步队列就会等待否则就进入到了完成_dispatch_lane_class_barrier_complete,也就是说要保证前面的所有任务都执行完成。

由此也验证了上面的结论:全局并发队列不处理栅栏函数相关,所以栅栏函数在全局并发队列中无用。这样设计的原因是,系统级别的也会调用全局并发队列,而栅栏函数本质是卡住了当前的线程,这样影响效率。栅栏函数必须要在同一个队列中使用,比如使用AFN的时候我们并不能拿到AFN当前的队列,所以这个栅栏函数平时使用的场景并不多,而我们使用的最多的是调度组

信号量dispatch_semaphore_t

dispatch_semaphore_create 创建信号量,里面的数字是表示最大并发数
dispatch_semaphore_wait 信号量等待 -1
dispatch_semaphore_signal 信号量释放 +1

 -(void)test {	
	dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
   dispatch_semaphore_t sem = dispatch_semaphore_create(0); 
    //任务1
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
        NSLog(@"执行任务1");
        NSLog(@"任务1完成");
    });
    
    //任务2
    dispatch_async(queue, ^{
        sleep(2);
        NSLog(@"执行任务2");
        NSLog(@"任务2完成");
        dispatch_semaphore_signal(sem); // 发信号 +1
    });
    
    //任务3
    dispatch_async(queue, ^{
        dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
        sleep(2);
        NSLog(@"执行任务3");
        NSLog(@"任务3完成");
        dispatch_semaphore_signal(sem);
    });
 }
复制代码

image.png

任务3不会执行,永远等待下去了。

dispatch_semaphore_create

 * @param value
 * The starting value for the semaphore. Passing a value less than zero will
 * cause NULL to be returned.
复制代码

信号量的起始值。传递一个小于零的值将导致返回NULL。

dispatch_semaphore_signal

intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema) //0
{
	long value = os_atomic_inc2o(dsema, dsema_value, release); // +=1  value = 1
	if (likely(value > 0)) {
		return 0;   //直接返回0
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}
复制代码

dispatch_semaphore_wait

intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
	long value = os_atomic_dec2o(dsema, dsema_value, acquire); //-=1  0-1 = -1
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout); // 走这里
}
复制代码

上面的例子中,创建的信号量的最大并发数是0,进入到wait这里,0-1 = -1直接到了_dispatch_semaphore_wait_slow这个函数

static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout) // 参数1:0    参数2:FOREVER
{

	switch (timeout) {
	default:
		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
			break;
		}
	// ...
	case DISPATCH_TIME_FOREVER:
		_dispatch_sema4_wait(&dsema->dsema_sema);
		break;
	}
	return 0;
}
复制代码
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
	int ret = 0;
	do {
		ret = sem_wait(sema);
	} while (ret == -1 && errno == EINTR);
	DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
复制代码

sem_wait底层Pthread封装的内核代码,我们只要关注这里的do while循环,本质就是do while死循环等待信号量变为正。

调度组dispatch_group

最直接的作⽤:控制任务执⾏顺序

  • dispatch_group_create :创建组
  • dispatch_group_async: 进组任务
  • dispatch_group_notify : 进组任务执⾏完毕通知
  • dispatch_group_wait : 进组任务执⾏等待时间
  • dispatch_group_enter: 进组
  • dispatch_group_leave :出组

方案一: dispatch_group_async使用:

- (void)groupDemo{
    dispatch_group_t group = dispatch_group_create();
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    
    dispatch_group_async(group, queue, ^{
    });
    
    dispatch_group_async(group, queue, ^{
        
    });
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{

    });
    
}
复制代码

方案二: enterleave搭配

- (void)groupDemo{
    dispatch_group_t group = dispatch_group_create();
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    
    dispatch_group_async(group, queue, ^{
    });
    
    dispatch_group_enter(group);
    dispatch_async(queue, ^{
       dispatch_group_leave(group);
    });
    
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{

    });
}
复制代码

上面两个效果一样。
为什么dispatch_group_async = dispatch_group_enter + dispatch_group_leave

dispatch_group_create

dispatch_group_t
dispatch_group_create(void)
{
	return _dispatch_group_create_with_count(0);
}
复制代码

_dispatch_group_create_with_count函数跟信号量那个挺类似的

dispatch_group_enter

进入到源码里面发现这个是个--操作

dispatch_group_leave

void
dispatch_group_leave(dispatch_group_t dg)
{
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release); //++  -1-> 0
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);// -1 & 极大值
		// old_value == DISPATCH_GROUP_VALUE_MASK
    	// 所以这一句的判断就是当 old_value = -1时
	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		old_state += DISPATCH_GROUP_VALUE_INTERVAL;
		do {
			new_state = old_state;
			if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
				new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			} else {
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			}
			if (old_state == new_state) break;
		} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
				old_state, new_state, &old_state, relaxed)));
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}
复制代码

由后面的注释分析可以知道,当dg = -1的时候,回来到一个do while的循环直到唤醒_dispatch_group_wake,这里唤醒的是dispatch_group_notify。回到上面的分析,我们先进组enter也就是先--,此时dg=-1, 在出组leave函数来到do while循环,函数块走完之后,唤醒notify

dispatch_group_notify

static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dsn)
{
    //...
	if ((uint32_t)old_state == 0) {
			os_atomic_rmw_loop_give_up({
					return _dispatch_group_wake(dg, new_state, false);
			});
	}
}
复制代码

这里可以看到当old_state == 0 ,_dispatch_group_wake开启正常的异步或者同步函数也就是block的call out流程。如果在异步的时候,先执行到了notify,那么把此时的block跟当前的组绑定,等到leave出组的通知的时候,_dispatch_group_wake(dg, old_state, true)。这也就是为什么有两处都调用了这个方法,主要的目的是为了解决异步加载的时序问题,是不是设计的非常nice!

dispatch_group_async = dispatch_group_enter + dispatch_group_leave

由上面的例子我们知道这两个是等效的,那么dispatch_group_async是怎么封装进组和出组的实现呢

void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_block_t db)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
	_dispatch_continuation_group_async(dg, dq, dc, qos);
}
复制代码
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{
	dispatch_group_enter(dg);
	dc->dc_data = dg;
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码

明显可以看到在函数_dispatch_continuation_group_async有个进组的操作dispatch_group_enter(dg)_dispatch_continuation_async -> dx_push -> _dispatch_root_queue_push -> _dispatch_root_queue_push_inline -> _dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow -> _dispatch_root_queues_init -> _dispatch_root_queues_init_once -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline
这些流程之前都来过,就不多说

		if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
			_dispatch_continuation_with_group_invoke(dc);
		} else {
			_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
			_dispatch_trace_item_complete(dc);
		}
复制代码

_dispatch_continuation_with_group_invoke

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
	struct dispatch_object_s *dou = dc->dc_data;
	unsigned long type = dx_type(dou);
	if (type == DISPATCH_GROUP_TYPE) {
		_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
		_dispatch_trace_item_complete(dc);
		dispatch_group_leave((dispatch_group_t)dou);
	} else {
		DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
	}
}
复制代码

_dispatch_client_callout之后看到了出组的代码dispatch_group_leave,我们想想也是,在当前的队列的任务执行完成之后才能调用出组来通知。

dispatch_source

GCDrunLoop其实是同等级,没有所谓的归属关系,dispatch_source本质上就是通过条件来控制block的执行,它的CPU负荷⾮常⼩,尽量不占⽤资源。在任⼀线程上调⽤它的的⼀个函数dispatch_source_merge_data后,会执⾏dispatch_source事先定义好的句柄(可以把句柄简单理解为⼀个block)这个过程叫Customevent⽤户事件。

  • dispatch_source_create :创建源
  • dispatch_source_set_event_handler :设置源事件回调
  • dispatch_source_merge_data :源事件设置数据
  • dispatch_source_get_data :获取源事件数据
  • dispatch_resume :继续
  • dispatch_suspend :挂起

使用方法比较简单

-(void)demo {
    // 1.创建队列
    self.queue = dispatch_queue_create("hb.com", NULL);
    // 2.创建源
    self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
    // 3.源事件回调
    dispatch_source_set_event_handler(self.source, ^{
        
        NSLog(@"%@",[NSThread currentThread]);
        
        NSUInteger value = dispatch_source_get_data(self.source);
        self.totalComplete += value;
        NSLog(@"进度: %.2f",self.totalComplete/100.0);
        self.progressView.progress = self.totalComplete/100.0;
    });
    
    self.isRunning = YES;
    dispatch_resume(self.source);
}
// 4.在使用的地方dispatch_source_merge_data修改源数据
// 5.dispatch_resume  继续
复制代码

并且这里不受runLoop的影响是一个workLoop,本质是一个pthread下层封装。

补充:可变数组线程安全吗

在多线程中操作同一个数组并不安全,因为会出现同时写入的情况,也就是同一时间对同一片内存空间操作不安全。而atomic只能保证自身安全不能保证外部访问安全,解决方法可以在对可变数组操作的时候加入一个栅栏函数相当于加锁的功能

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享