Java 并发编程—— Lock 和 Condition

这是我参与更文挑战的第 2 天,活动详情查看: 更文挑战

我们知道 AQS 类是所有并发编程锁的核心,那么在实际的使用中,我们就需要认识下 Lock 接口给我们定义了哪些方法。

Lock 用于并发编程中针对共享资源的访问,通常,一个 lock 对象提供独占的方式来访问对象,即独占锁——在同一时间只有一个线程能够获取锁并访问资源。但是也有一些锁提供并发访问共享资源,比如 ReadWriteLock,这类称之为共享锁。

synchronized 和 Lock 对比:

使用 synchronized 同步的方法或者代码段具有隐式的锁监听器(锁)。

public synchronized void doGet(){

}
复制代码

但是 lock 的使用要求是块状成对结构,当不同的 lock 被申请的时候,他们必须依照合适的顺序在相同的作用域进行释放。

Lock l = ...;
l.lock();
try {
  // access the resource protected by this lock
 } finally {
  l.unlock();
}}
复制代码

1. Lock 源码分析

首先来看一下 Lock 的定义。

public interface Lock {

    /**
     * Acquires the lock.
     * 该方法用于获取锁
     * 如果当前线程无法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。
     * 如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
     */
    void lock();

    /**
     * 1)如果当前线程未被中断,则获取锁。 
     * 2)如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。 
     * 3)如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
     * 4)如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以
	 *		前,该线程将一直处于休眠状态: 
     *		1)锁由当前线程获得;或者 
     *		2)其他某个线程中断当前线程。 
	 * 5)如果当前线程获得该锁,则将锁保持计数设置为 1。 
   	 *	如果当前线程: 
     *  		1)在进入此方法时已经设置了该线程的中断状态;
     *  		2)在等待获取锁的同时被中断。 
	 *	则抛出 InterruptedException,并且清除当前线程的已中断状态。 
	 * 6)在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或
	 *	重入获取。
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 尝试获取锁,仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
     */
    boolean tryLock();

    /**
     * 
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 释放锁
     * 只有持有这个lock的线程才能释放锁
     */
    void unlock();

    /**
     * 返回一个绑定到Lock对象上的Condition实例,在获取condition对象前,当前线程
     * 必须持有对应的lock对象。
     */
    Condition newCondition();
}
复制代码

在上面的源码中,我们可以看到Lock 提供了 lock() 方法用于获取锁,unlock() 方法用于释放锁。比如下面的例子:

public class ConcrateDemo {
	private Lock lock = new ReentrantLock();
	
	public static void main(String []args){
		MyRunnable runnable = new ConcrateDemo().new MyRunnable();
		Thread thread1 = new Thread(runnable);
		Thread thread2 = new Thread(runnable);
		thread1.start();
		thread2.start();
	}
	class MyRunnable implements Runnable{

		public void run() {
			lock.lock();
			for(int i=0;i<5;i++){
				System.out.println("currentThread:" + Thread.currentThread().getName()
						+ "==Cnt:" + i);
			}
			lock.unlock();
		}
	}
}
复制代码

2. Condition 源码分析

在上面我们介绍 Lock 类时,有一个 newCondition 方法:

/**
 * 返回一个绑定到Lock对象上的Condition实例,在获取condition对象前,当前线程
 * 必须持有对应的lock对象。
 */
Condition newCondition();
复制代码

从这里可以猜想到一个 Lock 中应该绑定一个 Condition 对象。ConditionJava 提供用来实现等待/通知的类。

我们知道 Object 对象提供了 waitwaitAllnotifynotifyAll 的方法用来实现线程的同步、等待和唤醒。但 Condition 类提供了比 wait/notify 更丰富的功能,Condition 对象由 lock 对象所创建的,同时一个 Lock 可以创建多个 Condition 对象,即创建多个对象监听器,这样就可以指定唤醒具体线程,而 notify 是随机唤醒线程。

public interface Condition {

    /**
     * 造成当前线程在接到信号或被 中断之前一直处于等待状态。
     * @throws InterruptedException if the current thread is interrupted
     *         (and interruption of thread suspension is supported)
     */
    void await() throws InterruptedException;

    /**
     * 造成当前线程在接到信号或被 中断之前一直处于等待状态。
     */
    void awaitUninterruptibly();

    /**
     * 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
     * @param nanosTimeout the maximum time to wait, in nanoseconds
     * @return an estimate of the {@code nanosTimeout} value minus
     *         the time spent waiting upon return from this method.
     *         A positive value may be used as the argument to a
     *         subsequent call to this method to finish waiting out
     *         the desired time.  A value less than or equal to zero
     *         indicates that no time remains.
     * @throws InterruptedException if the current thread is interrupted
     *         (and interruption of thread suspension is supported)
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
     * @param time the maximum time to wait
     * @param unit the time unit of the {@code time} argument
     * @return {@code false} if the waiting time detectably elapsed
     *         before return from the method, else {@code true}
     * @throws InterruptedException if the current thread is interrupted
     *         (and interruption of thread suspension is supported)
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 唤醒一个等待线程。
     */
    void signal();

    /**
     * 唤醒所有等待线程。
     */
    void signalAll();
}
复制代码

通过上面的源码注释能看到,Condition 提供了以下方法:

  • void await():造成当前线程在接到信号或被中断之前一直处于等待状态。
  • boolean await(long time, TimeUnit unit):造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  • long awaitNanos(long nanosTimeout):造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  • void awaitUninterruptibly():造成当前线程在接到信号之前一直处于等待状态。
  • boolean awaitUntil(Date deadline):造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
  • void signal():唤醒一个等待线程。
  • void signalAll():唤醒所有等待线程。

在实际开发中,通过 await 方法进行线程的等待,signal 进行唤醒。注意,Condition 实例只是一些普通的对象,它们自身可以用作 synchronized 语句中的目标,并且可以调用自己的 waitnotification 监视器方法。

3. 生产者消费者示例

/**
 * 实现生产消费者的例子
 * 要求:
 * 有两股力量:生产和消费
 * 当仓库生产满了的时候就要通知消费者进行消费,并且停止生产
 * 当仓库空的时候,消费者要通知生产者进行生产,并且停止消费
 * 其它情况,正常生产、消费。
 * 
 * 生产者与消费者模型中,要保证以下几点:
 * 1 同一时间内只能有一个生产者生产
 * 2 同一时间内只能有一个消费者消费
 * 3 共享空间空时消费者不能继续消费
 * 4 共享空间满时生产者不能继续生产 
 * @author mr_dsw
 */
public class ConcrateDemo {
	public static void main(String []args){
		Resource resource = new Resource();
		ProduceThread produceThread = new ProduceThread(resource);
		ConsumeThread consumeThread = new ConsumeThread(resource);
		//四个生产者
		new Thread(produceThread).start();
		new Thread(produceThread).start();
		new Thread(produceThread).start();
		new Thread(produceThread).start();
		//四个消费者
		new Thread(consumeThread).start();
		new Thread(consumeThread).start();
		new Thread(consumeThread).start();
		new Thread(consumeThread).start();
	}
}

class Resource{
	private final int MAX_SIZE = 10;
	private LinkedList<Object> list = new LinkedList<Object>();
	private Lock lock = new ReentrantLock();
	private Condition fullCondition = lock.newCondition();
	private Condition emptyCondition = lock.newCondition();
	
	/**
	 * 生产物品,存在多个生产者
	 */
	public void produce(){
		//如果生产满了,则就唤醒消费者
		lock.lock();
		while(list.size() == MAX_SIZE){
			System.out.println("生产满了,暂时无法生产:" + list.size());
			emptyCondition.signal();
			try {
				fullCondition.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		list.add(new Object());
		System.out.println(Thread.currentThread().getName() + "生产新产品,共有:" + list.size());
		lock.unlock();
	}
	
	/**
	 * 消费者,存在多个消费者
	 */
	public void consume(){
		lock.lock();
		while(list.size() == 0){
			System.out.println("没有物品了,需要通知生产了");
			fullCondition.signal();
			try {
				emptyCondition.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + "消费产品,共有:" + list.size());
		list.remove();
		lock.unlock();
	}
}

class ProduceThread implements Runnable{
	private Resource resource;
	
	public ProduceThread(Resource resource){
		this.resource = resource;
	}
	
	public void run() {
		for(;;)
		resource.produce();
	}
}

class ConsumeThread implements Runnable{
	private Resource resource;
	
	public ConsumeThread(Resource resource){
		this.resource = resource;
	}
	
	public void run() {
		for(;;)
		resource.consume();
	}
}
复制代码

4. 总结

前面我们提到 AQS 是所有锁的基础,同样在 AQS 中的 ConditionObject 就是实现Condition 的核心。ConditionObject 的等待队列是一个 FIFO 队列,队列的每个节点都是等待在 Condition 对象上的线程的引用,在调用 Conditionawait() 方法之后,线程释放锁,构造成相应的节点进入等待队列等待。其中节点的定义复用 AQSNode 定义。

以上是 Java 并发编程中的 LockCondition 的基本学习。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享