Java并发编程—— Semaphore 应用

这是我参与更文挑战的第 7 天,活动详情查看: 更文挑战

1. 简介

前面我们针对 CountDownLatchCyclicBarrier 进行了学习,CountDownLatch 用于帮助我们实现“倒计时”的功能,当 count=0 的时候则触发 await的线程进行执行。而CyclicBarrier 则适用于到达某一状态后接着执行下一步操作的情况。本次介绍的 Semaphore 是一个计信号数量。

Semaphore 通过 acquire() 方法申请信号量,如果当前无信号量可用,则线程处于阻塞状态,如果有可用信号量,线程正常进行。release 方法用于释放一个“锁定占用”的信号量。

Semaphore 通常用于限制访问某些资源(物理或逻辑的)的线程数目。

下面看一个 jdk 中带的例子,该例子中展示了使用 Semaphone 控制数据的访问。

class Pool {
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
    }

    public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
    }

    // Not a particularly efficient data structure; just for demo

    protected Object[] items = ... whatever kinds of items being managed
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;          
          return items[i];
       }
     }
     return null; // not reached
    }

    protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
    }
}
复制代码

2、简单案例

Semaphore 提供了两个关键的方法 acquire()release() 方法。分别用于申请信号量和释放信号量。

如下面示例代码所示:

public class FirstDemo {

	static Semaphore semaphore = new Semaphore(2);
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		for(int i=0;i<4;i++){
			new Thread(){
				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName() + " ready");
					try {
						semaphore.acquire();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " go");
					semaphore.release();
				}
			}.start();
		}
	}
}
复制代码

执行结果:

Thread-0 ready
Thread-0 go
Thread-2 ready
Thread-2 go
Thread-1 ready
Thread-3 ready
Thread-1 go
Thread-3 go
复制代码

从执行结果来看,最多只能连续执行 go 两次,间接的印证最多同时只有两个信号量在工作。补充一个特殊情况,当信号量 Semaphore = 1 时,它可以当作互斥锁使用。其中 0、1 就相当于它的状态,当 =1 时表示其他线程可以获取,当 =0 时,排他,即其他线程必须要等待。

3. 源码解析

在源码中,Semaphore 同样包含一个继承 AQS 的子类 Sync

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;

    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    ....
}
复制代码

通过使用 AQS 类的 state 值来存储信号量,原理类似于 CountDownLatch,都是通过 AQSstate 值进行计数。

创建一个 Semaphore 对象可以通过构造方法:

/**指定信号量个数*/
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

/**
 * Creates a {@code Semaphore} with the given number of
 * permits and the given fairness setting.
 *
 * @param permits the initial number of permits available.
 *        This value may be negative, in which case releases
 *        must occur before any acquires will be granted.
 * @param fair {@code true} if this semaphore will guarantee
 *        first-in first-out granting of permits under contention,
 *        else {@code false}
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码

通过上面的构造函数,可以看到默认情况下 Semaphore 使用的非公平同步状态,同样也可以通过构造函数进行指定状态。

3.1 acquire() 方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
复制代码

acquire 方法中,调用 syncacquireSharedInterruptibly(1) 方法,如果线程被 interrupted 则抛出 InterruptedException 异常。通过 tryAcquireShared(arg) 获取状态值,最关键的代码是:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
复制代码

获取当前的 state 值,如果可用 state 值为 -1 或小于 0,即当前信号量被占满,则返回一个负数,然后执行 doAcquireSharedInterruptibly() 方法添加到等待队列。

4. 总结

无论 ReentrantLockCyclicBarrier 内部都是基于 AQS 进行实现,实现的原理有所差异而已。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享