ios栅栏函数,调度组,信号量和dispath_source的探索

前情提要:我们在开发过程中,会遇到很多同步异步的问题,例如多网络请求多张图片并且需要回来之后合成一张图片,可变数组的安全存取等等,栅栏函数,信号量和调度组就可以很好的帮我们解决这类问题。

栅栏函数

  • 本质:栅栏函数的本质就是控制任务的执行顺序与同步
  • dispatch_barrier_sync:会阻塞当前影响后面任务并直执行任务块
  • dispatch_barrier_asycn:作用同上,但是不会导致阻塞,后面的任务正常执行
 dispatch_queue_t queue = dispatch_queue_create("queue", nil);
    dispatch_barrier_sync(queue, ^{
        //阻塞在当前,快里面的任务执行完成才会执行后面的任务
    });
    dispatch_barrier_async(queue, ^{
        //不会阻塞任务执行,后面任务正常执行
    });
复制代码

栅栏函数最直接的作用就是控制线程安全,怎么来控制的呢?看下面这段代码

dispatch_queue_t queue = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
    NSMutableArray *testList = [NSMutableArray array];
    for (NSInteger i = 0; i < 10000; ++i) {
        dispatch_async(queue, ^{
            [testList addObject:@(i)];
        });
    }
复制代码

运行之后就会崩溃,崩溃信息如下:

Learning(1966,0x700004449000) malloc: *** error for object 0x7fb25c604940: pointer being freed was not allocated
Learning(1966,0x700004449000) malloc: *** set a breakpoint in malloc_error_break to debug
复制代码

出现问题的原因就是,多线程访问同一个数组的时候,创建释放添加的过程会冲突,已经混乱,怎么解决这个问题,栅栏函数就可以很好的给我们解决的很好,稍作修改就正常执行

 dispatch_queue_t queue = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
    NSMutableArray *testList = [NSMutableArray array];
    for (NSInteger i = 0; i < 10000; ++i) {
        dispatch_async(queue, ^{
            dispatch_barrier_async(queue, ^{
                [testList addObject:@(i)];
            });
            
        });
    }
复制代码

看到这里好像一切都很美好,但其实隐藏了两个比较严重的问题是导致栅栏函数的使用率并不是特别高,是什么问题呢?

  1. 栅栏函数不受全局并发队列控制dispatch_get_global_queue(0, 0)
  2. 栅栏函数不能控制多队列任务

所以要了解这个本质,我们需要打开libdispatch源码来探索,源码地址:opensource.apple.com/source/libd…

通过观察源码可以看到

static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
		uintptr_t dc_flags)
{
	bool barrier = (dc_flags & DC_FLAG_BARRIER);
	do {
		if (dq == stop_dq) return;
		if (barrier) {
			dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
		} else {
			_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
		}
		dq = dq->do_targetq;
		barrier = (dq->dq_width == 1);
	} while (unlikely(dq->do_targetq));
}
复制代码

dowakeup之前是处理栅栏函数之前的任务,dowakeup做的事情就是去栅栏并且处理任务,自定义并发队列会有栅栏处理:
static void

_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
		dispatch_wakeup_flags_t flags)
{
	dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
	dispatch_lane_t dq = dqu._dl;

	if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
		struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
		if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
			if (_dispatch_object_is_waiter(dc)) {
				return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
			}
		} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
			return _dispatch_lane_drain_non_barriers(dq, dc, flags);
		}

		if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
			_dispatch_retain_2(dq);
			flags |= DISPATCH_WAKEUP_CONSUME_2;
		}
		target = DISPATCH_QUEUE_WAKEUP_TARGET;
	}

	uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
			dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
	return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
复制代码

全局并发队列里面没有对栅栏的处理函数,意味着栅栏函数没有任何的作用,就是一个普通的异步函数:

void
_dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
		DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
{
	if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
		DISPATCH_INTERNAL_CRASH(dq->dq_priority,
				"Don't try to wake up or override a root queue");
	}
	if (flags & DISPATCH_WAKEUP_CONSUME_2) {
		return _dispatch_release_2_tailcall(dq);
	}
}
复制代码

总结:系统全局并发队列中可能有一些系统级别的任务是不能阻塞的,所有在这里不能用系统的全局队列,它本身也没有栅栏函数的处理逻辑,这就解释了我们第一个问题。多队列也是不能用做栅栏函数,因为栅栏函数会去阻塞队列,你有多个队列,你让栅栏函数怎么去阻塞呢?下层是不知道上层的队列个数的,所以这里用起来就感觉很鸡肋,由此产生了调度组dispatch_group,我们先说信号量,因为在以前的版本调度组是封装的信号量。

信号量

  1. dispatch_semaphore_create(n)n控制最大并发数
  2. dispatch_semaphore_wait
  3. dispatch_semaphore_signal

这三个函数里面最重要的就是dispatch_semaphore_wait,打开源码

intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
	long value = os_atomic_inc2o(dsema, dsema_value, release);
	if (likely(value > 0)) {
		return 0;
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}
复制代码

dispatch_semaphore_signal对信号量进行+1,大于0直接返回,大于0之后被wait消费

intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
	long value = os_atomic_dec2o(dsema, dsema_value, acquire);
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}

复制代码
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
	int ret = 0;
	do {
		ret = sem_wait(sema);
	} while (ret == -1 && errno == EINTR);
	DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}

复制代码

dispatch_semaphore_wait会do-while循环等待信号量产生,然后进行处理,如果信号量一直小于等于0,那么会一直等待,等待signal大于0再消费处理任务

调度组

  • 创建组:dispatch_group_create
  • 进组任务:dispatch_group_async
  • 进组任务执行通知完毕:dispatch_group_notify
  • 进组任务执行等待时间:dispatch_group_wait
  • 进组:dispatch_group_enter
  • 出组:dispatch_group_leave
-(void)disGroup{
    dispatch_queue_t globalQuene = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
    dispatch_queue_t selfQuene = dispatch_queue_create("myQuene", 0);
    dispatch_group_t group = dispatch_group_create();
    dispatch_group_async(group, globalQuene, ^{
        NSLog(@"run task 1");
    });
    dispatch_group_async(group, selfQuene, ^{
        NSLog(@"run task 2");
    });
    dispatch_group_async(group, selfQuene, ^{
        NSLog(@"run task 3");
    });
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"run task 4");
    });
}
复制代码

底层实现

void
dispatch_group_enter(dispatch_group_t dg)
{
	// The value is decremented on a 32bits wide atomic so that the carry
	// for the 0 -> -1 transition is not propagated to the upper 32bits.
	uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
			DISPATCH_GROUP_VALUE_INTERVAL, acquire);
	uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
	if (unlikely(old_value == 0)) {
		_dispatch_retain(dg); // <rdar://problem/22318411>
	}
	if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
		DISPATCH_CLIENT_CRASH(old_bits,
				"Too many nested calls to dispatch_group_enter()");
	}
}
复制代码

os_atomic_sub_orig2会先减1变成-1

void
dispatch_group_leave(dispatch_group_t dg)
{
	// The value is incremented on a 64bits wide atomic so that the carry for
	// the -1 -> 0 transition increments the generation atomically.
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release);
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		old_state += DISPATCH_GROUP_VALUE_INTERVAL;
		do {
			new_state = old_state;
			if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
				new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			} else {
				// If the group was entered again since the atomic_add above,
				// we can't clear the waiters bit anymore as we don't know for
				// which generation the waiters are for
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			}
			if (old_state == new_state) break;
		} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
				old_state, new_state, &old_state, relaxed)));
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}
复制代码

注意点:先enter,再leave,只有两个状态-1,和0,不断do-while循环,只有当leave出来打破循环才会weakup-notify执行任务。

dispatch_source

PSBCTimerManger.h

#import <Foundation/Foundation.h>

@protocol PSBCTimerMangerDelegate<NSObject>
- (void)countDown;
@end

@interface PSBCTimerManger : NSObject
@property(nonatomic, weak) id<PSBCTimerMangerDelegate> rootPage;
- (void)startTimer;
- (void)closeTimer;

@end
复制代码

PSBCTimerManger.m

#import "PSBCTimerManger.h"


@interface PSBCTimerManger()

@property(nonatomic, strong) dispatch_source_t timer;
@property(nonatomic, assign) BOOL hasPSBCTimerMangerDelegate;
@end

@implementation PSBCTimerManger

- (instancetype)init{
    if (self = [super init]) {
        dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
        self.timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
        dispatch_time_t start = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0*NSEC_PER_SEC));
        dispatch_time_t interval = 1.0 * NSEC_PER_SEC;
        dispatch_source_set_timer(self.timer, start, interval, 0);
        WeakSelf(ws)
        dispatch_source_set_event_handler(self.timer, ^{
            dispatch_async(dispatch_get_main_queue(), ^{
                __strong typeof(self) strongSelf = ws;
                if(strongSelf.hasPSBCTimerMangerDelegate){
                    [strongSelf.rootPage countDown];
                }
            });
        });
    }
    return self;
}

- (void)startTimer{
    if(self.timer) {
        dispatch_resume(self.timer);
    }
}

- (void)closeTimer{
    if(self.timer) {
        dispatch_suspend(self.timer);
        dispatch_source_cancel(self.timer);
    }
}

- (void)dealloc{
    [self closeTimer];
}

#pragma mark PSBCTimerMangerDelegate
- (void)setRootPage:(id<PSBCTimerMangerDelegate>)rootPage{
    if (rootPage && [rootPage conformsToProtocol:@protocol(PSBCTimerMangerDelegate)]) {
        _hasPSBCTimerMangerDelegate = YES;
        _rootPage = rootPage;
    }
}
@end

复制代码

dispatch_source的时间准度较之NSTimer更准确,基于系统级别。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享