1. 等待/通知机制
1.1 不使用等待/通知机制实现线程间通信
public class MyList {
volatile private List list = new ArrayList();
public void add() {
list.add("item");
}
public int getSize() {
return list.size();
}
}
public class ThreadA extends Thread {
private MyList myList;
public ThreadA(MyList myList) {
this.myList = myList;
}
@Override
public void run() {
try {
for (int i = 0;i < 10;i++) {
myList.add();
System.out.println("myList add i=" + i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private MyList myList;
public ThreadB(MyList myList) {
this.myList = myList;
}
@Override
public void run() {
try {
while (true) {
if (myList.getSize() == 5) {
System.out.println(Thread.currentThread().getName() + " out");
throw new InterruptedException();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
MyList myList = new MyList();
ThreadA threadA = new ThreadA(myList);
ThreadB threadB = new ThreadB(myList);
threadA.start();
threadB.start();
}
}
复制代码
- 虽然两个线程间实现了通信,但是 ThreadB 不停通过
while
语句轮序机制检测是否满足条件,浪费 CPU 资源 - 如果轮询时间间隔小,浪费了 CPU 资源;如果轮询时间间隔大,有可能会取不到目标数据
1.2 等待/通知机制的实现
wait()
方法:调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,直到被再次唤醒notify()
方法:随机唤醒等待队列中等待同一共享资源的一个线程,并使该线程退出等待队列,进入可运行状态notifyAll()
方法:使所有正在等待队列中等待同一共享资源的全部线程从等待队列退出,进入可运行状态。此时,优先级最高的那个线程最先执行,但也有可能随机执行,取决于 JVM 虚拟机的实现
public class MyList {
volatile private List list = new ArrayList();
public void add() {
list.add("item");
}
public int getSize() {
return list.size();
}
}
public class ThreadA extends Thread {
private MyList myList;
private Object lock;
public ThreadA(MyList myList, Object lock) {
this.myList = myList;
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0;i < 10;i++) {
myList.add();
if (myList.getSize() == 5) {
System.out.println("notify ThreadB");
lock.notify();
}
System.out.println("myList add i=" + i);
Thread.sleep(500);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private MyList myList;
private Object lock;
public ThreadB(MyList myList, Object lock) {
this.myList = myList;
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (myList.getSize() != 5) {
System.out.println(Thread.currentThread().getName() + " wait begin " + System.currentTimeMillis());
lock.wait();
System.out.println(Thread.currentThread().getName() + " wait end " + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
MyList myList = new MyList();
Object lock = new Object();
ThreadA threadA = new ThreadA(myList, lock);
ThreadB threadB = new ThreadB(myList, lock);
threadB.start();
Thread.sleep(50);
threadA.start();
}
}
复制代码
notify()
方法 和 notifyAll()
方法执行后并不立即释放锁,需要执行完的同步代码块后才释放锁
1.3 当 interrupt 方法遇到 wait 方法
当线程呈 wait()
状态时,调用线程对象的 interrupt()
方法会出现异常
public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait");
lock.wait();
System.out.println("end wait");
}
} catch (InterruptedException e) {
System.out.println("catch InterruptedException");
e.printStackTrace();
}
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
ThreadA threadA = new ThreadA(lock);
threadA.start();
Thread.sleep(5000);
threadA.interrupt();
}
}
复制代码
1.4 wait(long) 方法的使用
wait(long)
方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒
public class RunA implements Runnable {
private Object lock = new Object();
@Override
public void run() {
try {
synchronized (lock) {
System.out.println("begin wait time=" + System.currentTimeMillis());
lock.wait(5000);
System.out.println("end wait time=" + System.currentTimeMillis());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
Thread thread = new Thread(new RunA());
thread.start();
}
}
复制代码
1.5 生产者/消费者模式(一对一)
public class ValueObject {
volatile public static List myList = new ArrayList();
}
public class C {
public void get(Object lock) {
try {
synchronized (lock) {
if (ValueObject.myList.size() == 0) {
lock.wait();
}
Object item = ValueObject.myList.remove(0);
System.out.println(Thread.currentThread().getName() + " get " + item);
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class P {
private static int i;
public void set(Object lock) {
try {
synchronized (lock) {
if (ValueObject.myList.size() != 0) {
lock.wait();
}
Object item = "item " + (++i);
ValueObject.myList.add(item);
System.out.println(Thread.currentThread().getName() + " set " + item);
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
final Object lock = new Object();
Runnable runnable1 = new Runnable() {
@Override
public void run() {
C c = new C();
while (true) {
c.get(lock);
}
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
P p = new P();
while (true) {
p.set(lock);
}
}
};
Thread c = new Thread(runnable1, "consumer");
Thread p = new Thread(runnable2, "producer");
c.start();
p.start();
}
}
复制代码
1.6 生产者/消费者模式(多对多)
public class ValueObject {
volatile public static List myList = new ArrayList();
}
public class C {
public void get(Object lock) {
try {
synchronized (lock) {
while (ValueObject.myList.size() == 0) {
lock.wait();
}
Object item = ValueObject.myList.remove(0);
System.out.println(Thread.currentThread().getName() + " get " + item);
lock.notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class P {
volatile private static int i;
public void set(Object lock) {
try {
synchronized (lock) {
while (ValueObject.myList.size() != 0) {
lock.wait();
}
Object item = "item " + (++i);
ValueObject.myList.add(item);
System.out.println(Thread.currentThread().getName() + " set " + item);
lock.notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
final Object lock = new Object();
final C c = new C();
final P p = new P();
Runnable runnable1 = new Runnable() {
@Override
public void run() {
while (true) {
c.get(lock);
}
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
while (true) {
p.set(lock);
}
}
};
Thread[] cArrays = new Thread[5];
Thread[] pArrays = new Thread[5];
for (int i = 0;i < 5;i++) {
cArrays[i] = new Thread(runnable1, "consumer-" + i);
pArrays[i] = new Thread(runnable2, "producer-" + i);
}
for (int i = 0;i < 5;i++) {
cArrays[i].start();
pArrays[i].start();
}
}
}
复制代码
1.7 通过管道进行线程间通信:字节流
Java 中可以利用管道流(pipeStream)实现不同线程间的通信
-
PipedOutputStream
和PipedInputStream
public class WriteData { public void writeMethod(PipedOutputStream outputStream) { try { System.out.println("write begin"); for (int i = 0;i < 100;i++) { String data = String.valueOf(i + 1); outputStream.write(data.getBytes()); System.out.println(data); } outputStream.close(); System.out.println("write end"); } catch (IOException e) { e.printStackTrace(); } } } public class ReadData { public void readMethod(PipedInputStream inputStream) { try { System.out.println("read begin"); byte[] byteArray = new byte[20]; int readLength = inputStream.read(byteArray); while (readLength != -1) { String newData = new String(byteArray, 0, readLength); System.out.println(newData); readLength = inputStream.read(byteArray); } inputStream.close(); System.out.println("read end"); } catch (IOException e) { e.printStackTrace(); } } } public class ThreadA extends Thread { private PipedOutputStream outputStream; private WriteData writeData; public ThreadA(PipedOutputStream outputStream, WriteData writeData) { this.outputStream = outputStream; this.writeData = writeData; } @Override public void run() { writeData.writeMethod(outputStream); } } public class ThreadB extends Thread { private PipedInputStream inputStream; private ReadData readData; public ThreadB(PipedInputStream inputStream, ReadData readData) { this.inputStream = inputStream; this.readData = readData; } @Override public void run() { readData.readMethod(inputStream); } } public class Test { public static void main(String[] args) throws IOException, InterruptedException { WriteData writeData = new WriteData(); ReadData readData = new ReadData(); PipedOutputStream outputStream = new PipedOutputStream(); PipedInputStream inputStream = new PipedInputStream(); outputStream.connect(inputStream); ThreadB threadB = new ThreadB(inputStream, readData); threadB.start(); ThreadA threadA = new ThreadA(outputStream, writeData); threadA.start(); } } 复制代码
-
PipedWriter
和PipedReader
public class WriteData { public void writeMethod(PipedWriter pipedWriter) { try { System.out.println("write begin"); for (int i = 0;i < 100;i++) { String data = String.valueOf(i + 1); pipedWriter.write(data); System.out.println(data); } pipedWriter.close(); System.out.println("write end"); } catch (IOException e) { e.printStackTrace(); } } } public class ReadData { public void readMethod(PipedReader pipedReader) { try { System.out.println("read begin"); char[] charArray = new char[20]; int readLength = pipedReader.read(charArray); while (readLength != -1) { String newData = new String(charArray, 0, readLength); System.out.println(newData); readLength = pipedReader.read(charArray); } pipedReader.close(); System.out.println("read end"); } catch (IOException e) { e.printStackTrace(); } } } public class ThreadA extends Thread { private PipedWriter pipedWriter; private WriteData writeData; public ThreadA(PipedWriter pipedWriter, WriteData writeData) { this.pipedWriter = pipedWriter; this.writeData = writeData; } @Override public void run() { writeData.writeMethod(pipedWriter); } } public class ThreadB extends Thread { private PipedReader pipedReader; private ReadData readData; public ThreadB(PipedReader pipedReader, ReadData readData) { this.pipedReader = pipedReader; this.readData = readData; } @Override public void run() { readData.readMethod(pipedReader); } } public class Test { public static void main(String[] args) throws IOException, InterruptedException { WriteData writeData = new WriteData(); ReadData readData = new ReadData(); PipedWriter pipedWriter = new PipedWriter(); PipedReader pipedReader = new PipedReader(); pipedWriter.connect(pipedReader); ThreadB threadB = new ThreadB(pipedReader, readData); threadB.start(); ThreadA threadA = new ThreadA(pipedWriter, writeData); threadA.start(); } } 复制代码
2. join 方法
join()
方法的作用是使所属的线程对象 x 正常执行 run()
方法中的任务,而使当前线程 z 进行无限期阻塞,等待线程 x 销毁后再继续执行线程 z 后面的代码
join
与 synchronized
的区别:join
在内部使用 wait()
方法进行等待,而 synchronized
关键字使用的是“对象监视器”原理做同步
2.1 join() 方法的使用
public class MyThread extends Thread {
@Override
public void run() {
try {
System.out.println("MyThread run begin");
Thread.sleep(5000);
System.out.println("MyThread run end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
myThread.join();
System.out.println("main end");
}
}
复制代码
2.2 join() 方法和异常
public class ThreadA extends Thread {
@Override
public void run() {
try {
System.out.println("MyThread run begin");
Thread.sleep(5000);
System.out.println("MyThread run end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
ThreadA a = new ThreadA();
a.start();
a.join();
System.out.println("ThreadB after a.join()");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
ThreadB b = new ThreadB();
b.start();
Thread.sleep(500);
b.interrupt();
System.out.println("main after b.interrupt()");
}
}
复制代码
2.3 join(long) 方法的使用
join(long)
方法中的参数可以设置等待的时间,如果超过这个时间则自动继续执行当前线程后面的代码
public class MyThread extends Thread {
@Override
public void run() {
try {
System.out.println("MyThread run begin time=" + System.currentTimeMillis());
Thread.sleep(5000);
System.out.println("MyThread run end time=" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
myThread.join(2000);
System.out.println("main end time=" + System.currentTimeMillis());
}
}
复制代码
2.4 join(long) 和 sleep(long) 方法的区别
-
Thread.sleep(long)
方法不释放锁 -
join(long)
方法内部是使用wait(long)
方法实现的,所有具有释放锁的特点public class ThreadA extends Thread { @Override public void run() { try { System.out.println("ThreadA run begin time=" + System.currentTimeMillis()); Thread.sleep(5000); System.out.println("ThreadA run end time=" + System.currentTimeMillis()); } catch (InterruptedException e) { } } synchronized public void printMethod() { System.out.println("ThreadA printMethod enter time="+ System.currentTimeMillis()); } } public class ThreadB extends Thread { private ThreadA threadA; public ThreadB(ThreadA threadA) { this.threadA = threadA; } @Override public void run() { try { synchronized (threadA) { threadA.start(); Thread.sleep(6000); // threadA.join(6000); } } catch (InterruptedException e) { e.printStackTrace(); } } } public class Test { public static void main(String[] args) throws InterruptedException { ThreadA threadA = new ThreadA(); ThreadB threadB = new ThreadB(threadA); threadB.start(); Thread.sleep(10); threadA.printMethod(); } } 复制代码
3. ThreadLocal 类
ThreadLocal
类可以实现每一个线程都拥有自己的共享变量
3.1 ThreadLocal 类的使用
public class Tools {
public static ThreadLocal t1 = new ThreadLocal();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0;i < 50;i++) {
Tools.t1.set("ThreadA i=" + i);
System.out.println(Tools.t1.get());
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 0;i < 50;i++) {
Tools.t1.set("ThreadB i=" + i);
System.out.println(Tools.t1.get());
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
ThreadA threadA = new ThreadA();
ThreadB threadB = new ThreadB();
threadA.start();
threadB.start();
for (int i = 0;i < 50;i++) {
Tools.t1.set("main i=" + i);
System.out.println(Tools.t1.get());
Thread.sleep(200);
}
}
}
复制代码
3.2 解决 get() 返回 null 的问题
继承 ThreadLocal
类并覆写 initialValue()
方法
public class MyThreadLocal extends ThreadLocal {
@Override
protected Object initialValue() {
return "defaultValue";
}
}
public class Test {
public static MyThreadLocal t1 = new MyThreadLocal();
public static void main(String[] args) {
if (t1.get() != null) {
System.out.println("never set value");
t1.set("setValue");
}
System.out.println(t1.get());
}
}
复制代码
4 InheritableThreadLocal 类
使用 InheritableThreadLocal
类可以在子线程中取得父类继承下来的值
使用 InheritableThreadLocal
类时需要注意,如果子线程在取值的同时,父线程将 InheritableThreadLocal
中的值进行更改,那么子线程取到值还是旧值,下面验证
4.1 值继承
public class Tools {
public static InheritableThreadLocal t1 = new InheritableThreadLocal();
}
public class ThreadA extends Thread {
@Override
public void run() {
System.out.println("ThreadA value=" + Tools.t1.get());
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Tools.t1.set("mainValue");
System.out.println("main value=" + Tools.t1.get());
Thread.sleep(500);
ThreadA threadA = new ThreadA();
threadA.start();
}
}
复制代码
4.2 值继承再修改
继承 InheritableThreadLocal
类并覆写 childValue()
方法
public class InheritableThreadLocalExt extends InheritableThreadLocal {
@Override
protected Object childValue(Object parentValue) {
return parentValue + " childValue";
}
}
public class Tools {
public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();
}
public class ThreadA extends Thread {
@Override
public void run() {
System.out.println("ThreadA value=" + Tools.t1.get());
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Tools.t1.set("mainValue");
System.out.println("main value=" + Tools.t1.get());
Thread.sleep(500);
ThreadA threadA = new ThreadA();
threadA.start();
}
}
复制代码
4.3 验证
public class InheritableThreadLocalExt extends InheritableThreadLocal {
@Override
protected Object initialValue() {
return "defaultValue";
}
}
public class Tools {
public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 1;i <= 20;i++) {
System.out.println("ThreadA value=" + Tools.t1.get());
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
System.out.println("main before set value=" + Tools.t1.get());
ThreadA threadA = new ThreadA();
threadA.start();
Thread.sleep(2000);
Tools.t1.set("setValue");
System.out.println("main after set value=" + Tools.t1.get());
}
}
复制代码
学自《Java多线程编程核心技术》