线程间通信

1. 等待/通知机制

1.1 不使用等待/通知机制实现线程间通信

public class MyList {

    volatile private List list = new ArrayList();

    public void add() {
        list.add("item");
    }

    public int getSize() {
        return list.size();
    }
}

public class ThreadA extends Thread {

    private MyList myList;

    public ThreadA(MyList myList) {
        this.myList = myList;
    }

    @Override
    public void run() {
        try {
            for (int i = 0;i < 10;i++) {
                myList.add();
                System.out.println("myList add i=" + i);
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ThreadB extends Thread {

    private MyList myList;

    public ThreadB(MyList myList) {
        this.myList = myList;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (myList.getSize() == 5) {
                    System.out.println(Thread.currentThread().getName() + " out");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) {
        MyList myList = new MyList();
        ThreadA threadA = new ThreadA(myList);
        ThreadB threadB = new ThreadB(myList);

        threadA.start();
        threadB.start();
    }
}
复制代码
  • 虽然两个线程间实现了通信,但是 ThreadB 不停通过 while 语句轮序机制检测是否满足条件,浪费 CPU 资源
  • 如果轮询时间间隔小,浪费了 CPU 资源;如果轮询时间间隔大,有可能会取不到目标数据

1.2 等待/通知机制的实现

  • wait() 方法:调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,直到被再次唤醒
  • notify() 方法:随机唤醒等待队列中等待同一共享资源的一个线程,并使该线程退出等待队列,进入可运行状态
  • notifyAll() 方法:使所有正在等待队列中等待同一共享资源的全部线程从等待队列退出,进入可运行状态。此时,优先级最高的那个线程最先执行,但也有可能随机执行,取决于 JVM 虚拟机的实现
public class MyList {

    volatile private List list = new ArrayList();

    public void add() {
        list.add("item");
    }

    public int getSize() {
        return list.size();
    }
}

public class ThreadA extends Thread {

    private MyList myList;

    private Object lock;

    public ThreadA(MyList myList, Object lock) {
        this.myList = myList;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                for (int i = 0;i < 10;i++) {
                    myList.add();
                    if (myList.getSize() == 5) {
                        System.out.println("notify ThreadB");
                        lock.notify();
                    }
                    System.out.println("myList add i=" + i);
                    Thread.sleep(500);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ThreadB extends Thread {

    private MyList myList;

    private Object lock;

    public ThreadB(MyList myList, Object lock) {
        this.myList = myList;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (myList.getSize() != 5) {
                    System.out.println(Thread.currentThread().getName() + " wait begin " + System.currentTimeMillis());
                    lock.wait();
                    System.out.println(Thread.currentThread().getName() + " wait end " + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        MyList myList = new MyList();
        Object lock = new Object();
        ThreadA threadA = new ThreadA(myList, lock);
        ThreadB threadB = new ThreadB(myList, lock);

        threadB.start();
        Thread.sleep(50);
        threadA.start();
    }
}
复制代码

notify() 方法 和 notifyAll() 方法执行后并不立即释放锁,需要执行完的同步代码块后才释放锁

1.3 当 interrupt 方法遇到 wait 方法

当线程呈 wait() 状态时,调用线程对象的 interrupt() 方法会出现异常

public class Service {

    public void testMethod(Object lock) {
        try {
            synchronized (lock) {
                System.out.println("begin wait");
                lock.wait();
                System.out.println("end wait");
            }
        } catch (InterruptedException e) {
            System.out.println("catch InterruptedException");
            e.printStackTrace();
        }
    }
}

public class ThreadA extends Thread {

    private Object lock;

    public ThreadA(Object lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        ThreadA threadA = new ThreadA(lock);
        threadA.start();
        Thread.sleep(5000);
        threadA.interrupt();
    }
}
复制代码

1.4 wait(long) 方法的使用

wait(long) 方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒

public class RunA implements Runnable {

    private Object lock = new Object();

    @Override
    public void run() {
        try {
            synchronized (lock) {
                System.out.println("begin wait time=" + System.currentTimeMillis());
                lock.wait(5000);
                System.out.println("end wait time=" + System.currentTimeMillis());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) {
        Thread thread = new Thread(new RunA());
        thread.start();
    }
}
复制代码

1.5 生产者/消费者模式(一对一)

public class ValueObject {

    volatile public static List myList = new ArrayList();
}

public class C {
    
    public void get(Object lock) {
        try {
            synchronized (lock) {
                if (ValueObject.myList.size() == 0) {
                    lock.wait();
                }
                Object item = ValueObject.myList.remove(0);
                System.out.println(Thread.currentThread().getName() + " get " + item);
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class P {

    private static int i;

    public void set(Object lock) {
        try {
            synchronized (lock) {
                if (ValueObject.myList.size() != 0) {
                    lock.wait();
                }
                Object item = "item " + (++i);
                ValueObject.myList.add(item);
                System.out.println(Thread.currentThread().getName() + " set " + item);
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) {
        final Object lock = new Object();

        Runnable runnable1 = new Runnable() {
            @Override
            public void run() {
                C c = new C();
                while (true) {
                    c.get(lock);
                }
            }
        };

        Runnable runnable2 = new Runnable() {
            @Override
            public void run() {
                P p = new P();
                while (true) {
                    p.set(lock);
                }
            }
        };

        Thread c = new Thread(runnable1, "consumer");
        Thread p = new Thread(runnable2, "producer");
        c.start();
        p.start();
    }
}
复制代码

1.6 生产者/消费者模式(多对多)

public class ValueObject {

    volatile public static List myList = new ArrayList();
}

public class C {

    public void get(Object lock) {
        try {
            synchronized (lock) {
                while (ValueObject.myList.size() == 0) {
                    lock.wait();
                }
                Object item = ValueObject.myList.remove(0);
                System.out.println(Thread.currentThread().getName() + " get " + item);
                lock.notifyAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class P {

    volatile private static int i;

    public void set(Object lock) {

        try {
            synchronized (lock) {
                while (ValueObject.myList.size() != 0) {
                    lock.wait();
                }
                Object item = "item " + (++i);
                ValueObject.myList.add(item);
                System.out.println(Thread.currentThread().getName() + " set " + item);
                lock.notifyAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) {
        final Object lock = new Object();
        final C c = new C();
        final P p = new P();

        Runnable runnable1 = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    c.get(lock);
                }
            }
        };

        Runnable runnable2 = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    p.set(lock);
                }
            }
        };

        Thread[] cArrays = new Thread[5];
        Thread[] pArrays = new Thread[5];
        for (int i = 0;i < 5;i++) {
            cArrays[i] = new Thread(runnable1, "consumer-" + i);
            pArrays[i] = new Thread(runnable2, "producer-" + i);
        }
        for (int i = 0;i < 5;i++) {
            cArrays[i].start();
            pArrays[i].start();
        }
    }
}
复制代码

1.7 通过管道进行线程间通信:字节流

Java 中可以利用管道流(pipeStream)实现不同线程间的通信

  • PipedOutputStreamPipedInputStream

    public class WriteData {
    
        public void writeMethod(PipedOutputStream outputStream) {
            try {
                System.out.println("write begin");
                for (int i = 0;i < 100;i++) {
                    String data = String.valueOf(i + 1);
                    outputStream.write(data.getBytes());
                    System.out.println(data);
                }
                outputStream.close();
                System.out.println("write end");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ReadData {
    
        public void readMethod(PipedInputStream inputStream) {
            try {
                System.out.println("read begin");
                byte[] byteArray = new byte[20];
                int readLength = inputStream.read(byteArray);
                while (readLength != -1) {
                    String newData = new String(byteArray, 0, readLength);
                    System.out.println(newData);
                    readLength = inputStream.read(byteArray);
                }
                inputStream.close();
                System.out.println("read end");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ThreadA extends Thread {
    
        private PipedOutputStream outputStream;
    
        private WriteData writeData;
    
        public ThreadA(PipedOutputStream outputStream, WriteData writeData) {
            this.outputStream = outputStream;
            this.writeData = writeData;
        }
    
        @Override
        public void run() {
            writeData.writeMethod(outputStream);
        }
    }
    
    public class ThreadB extends Thread {
    
        private PipedInputStream inputStream;
    
        private ReadData readData;
    
        public ThreadB(PipedInputStream inputStream, ReadData readData) {
            this.inputStream = inputStream;
            this.readData = readData;
        }
    
        @Override
        public void run() {
            readData.readMethod(inputStream);
        }
    }
    
    public class Test {
    
        public static void main(String[] args) throws IOException, InterruptedException {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();
    
            PipedOutputStream outputStream = new PipedOutputStream();
            PipedInputStream inputStream = new PipedInputStream();
            outputStream.connect(inputStream);
    
            ThreadB threadB = new ThreadB(inputStream, readData);
            threadB.start();
            ThreadA threadA = new ThreadA(outputStream, writeData);
            threadA.start();
        }
    }
    复制代码
  • PipedWriterPipedReader

    public class WriteData {
    
        public void writeMethod(PipedWriter pipedWriter) {
            try {
                System.out.println("write begin");
                for (int i = 0;i < 100;i++) {
                    String data = String.valueOf(i + 1);
                    pipedWriter.write(data);
                    System.out.println(data);
                }
                pipedWriter.close();
                System.out.println("write end");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ReadData {
    
        public void readMethod(PipedReader pipedReader) {
            try {
                System.out.println("read begin");
                char[] charArray = new char[20];
                int readLength = pipedReader.read(charArray);
                while (readLength != -1) {
                    String newData = new String(charArray, 0, readLength);
                    System.out.println(newData);
                    readLength = pipedReader.read(charArray);
                }
                pipedReader.close();
                System.out.println("read end");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ThreadA extends Thread {
    
        private PipedWriter pipedWriter;
    
        private WriteData writeData;
    
        public ThreadA(PipedWriter pipedWriter, WriteData writeData) {
            this.pipedWriter = pipedWriter;
            this.writeData = writeData;
        }
    
        @Override
        public void run() {
            writeData.writeMethod(pipedWriter);
        }
    }
    
    public class ThreadB extends Thread {
    
        private PipedReader pipedReader;
    
        private ReadData readData;
    
        public ThreadB(PipedReader pipedReader, ReadData readData) {
            this.pipedReader = pipedReader;
            this.readData = readData;
        }
    
        @Override
        public void run() {
            readData.readMethod(pipedReader);
        }
    }
    
    public class Test {
    
        public static void main(String[] args) throws IOException, InterruptedException {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();
    
            PipedWriter pipedWriter = new PipedWriter();
            PipedReader pipedReader = new PipedReader();
            pipedWriter.connect(pipedReader);
    
            ThreadB threadB = new ThreadB(pipedReader, readData);
            threadB.start();
            ThreadA threadA = new ThreadA(pipedWriter, writeData);
            threadA.start();
        }
    }
    复制代码

2. join 方法

join() 方法的作用是使所属的线程对象 x 正常执行 run() 方法中的任务,而使当前线程 z 进行无限期阻塞,等待线程 x 销毁后再继续执行线程 z 后面的代码

joinsynchronized 的区别:join 在内部使用 wait() 方法进行等待,而 synchronized 关键字使用的是“对象监视器”原理做同步

2.1 join() 方法的使用

public class MyThread extends Thread {

    @Override
    public void run() {
        try {
            System.out.println("MyThread run begin");
            Thread.sleep(5000);
            System.out.println("MyThread run end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        MyThread myThread = new MyThread();
        myThread.start();
        myThread.join();
        System.out.println("main end");
    }
}
复制代码

2.2 join() 方法和异常

public class ThreadA extends Thread {

    @Override
    public void run() {
        try {
            System.out.println("MyThread run begin");
            Thread.sleep(5000);
            System.out.println("MyThread run end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ThreadB extends Thread {

    @Override
    public void run() {
        try {
            ThreadA a = new ThreadA();
            a.start();
            a.join();
            System.out.println("ThreadB after a.join()");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        ThreadB b = new ThreadB();
        b.start();
        Thread.sleep(500);
        b.interrupt();
        System.out.println("main after b.interrupt()");
    }
}
复制代码

2.3 join(long) 方法的使用

join(long) 方法中的参数可以设置等待的时间,如果超过这个时间则自动继续执行当前线程后面的代码

public class MyThread extends Thread {

    @Override
    public void run() {
        try {
            System.out.println("MyThread run begin time=" + System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println("MyThread run end time=" + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        MyThread myThread = new MyThread();
        myThread.start();
        myThread.join(2000);
        System.out.println("main end time=" + System.currentTimeMillis());
    }
}
复制代码

2.4 join(long) 和 sleep(long) 方法的区别

  • Thread.sleep(long) 方法不释放锁

  • join(long) 方法内部是使用 wait(long) 方法实现的,所有具有释放锁的特点

    public class ThreadA extends Thread {
    
        @Override
        public void run() {
            try {
                System.out.println("ThreadA run begin time=" + System.currentTimeMillis());
                Thread.sleep(5000);
                System.out.println("ThreadA run end time=" + System.currentTimeMillis());
            } catch (InterruptedException e) {
    
            }
        }
    
        synchronized public void printMethod() {
            System.out.println("ThreadA printMethod enter time="+ System.currentTimeMillis());
        }
    }
    
    public class ThreadB extends Thread {
    
        private ThreadA threadA;
    
        public ThreadB(ThreadA threadA) {
            this.threadA = threadA;
        }
    
        @Override
        public void run() {
            try {
                synchronized (threadA) {
                    threadA.start();
                    Thread.sleep(6000);
    //                threadA.join(6000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class Test {
    
        public static void main(String[] args) throws InterruptedException {
            ThreadA threadA = new ThreadA();
            ThreadB threadB = new ThreadB(threadA);
            threadB.start();
            Thread.sleep(10);
            threadA.printMethod();
        }
    }
    复制代码

3. ThreadLocal 类

ThreadLocal 类可以实现每一个线程都拥有自己的共享变量

3.1 ThreadLocal 类的使用

public class Tools {

    public static ThreadLocal t1 = new ThreadLocal();
}

public class ThreadA extends Thread {

    @Override
    public void run() {
        try {
            for (int i = 0;i < 50;i++) {
                Tools.t1.set("ThreadA i=" + i);
                System.out.println(Tools.t1.get());
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ThreadB extends Thread {

    @Override
    public void run() {
        try {
            for (int i = 0;i < 50;i++) {
                Tools.t1.set("ThreadB i=" + i);
                System.out.println(Tools.t1.get());
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        ThreadA threadA = new ThreadA();
        ThreadB threadB = new ThreadB();
        threadA.start();
        threadB.start();

        for (int i = 0;i < 50;i++) {
            Tools.t1.set("main i=" + i);
            System.out.println(Tools.t1.get());
            Thread.sleep(200);
        }
    }
}
复制代码

3.2 解决 get() 返回 null 的问题

继承 ThreadLocal 类并覆写 initialValue() 方法

public class MyThreadLocal extends ThreadLocal {

    @Override
    protected Object initialValue() {
        return "defaultValue";
    }
}

public class Test {

    public static MyThreadLocal t1 = new MyThreadLocal();

    public static void main(String[] args) {
        if (t1.get() != null) {
            System.out.println("never set value");
            t1.set("setValue");
        }
        System.out.println(t1.get());
    }
}
复制代码

4 InheritableThreadLocal 类

使用 InheritableThreadLocal 类可以在子线程中取得父类继承下来的值

使用 InheritableThreadLocal 类时需要注意,如果子线程在取值的同时,父线程将 InheritableThreadLocal 中的值进行更改,那么子线程取到值还是旧值,下面验证

4.1 值继承

public class Tools {

    public static InheritableThreadLocal t1 = new InheritableThreadLocal();
}

public class ThreadA extends Thread {

    @Override
    public void run() {
        System.out.println("ThreadA value=" + Tools.t1.get());
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        Tools.t1.set("mainValue");
        System.out.println("main value=" + Tools.t1.get());

        Thread.sleep(500);

        ThreadA threadA = new ThreadA();
        threadA.start();
    }
}
复制代码

4.2 值继承再修改

继承 InheritableThreadLocal 类并覆写 childValue() 方法

public class InheritableThreadLocalExt extends InheritableThreadLocal {

    @Override
    protected Object childValue(Object parentValue) {
        return parentValue + " childValue";
    }
}

public class Tools {

    public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();
}

public class ThreadA extends Thread {

    @Override
    public void run() {
        System.out.println("ThreadA value=" + Tools.t1.get());
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        Tools.t1.set("mainValue");
        System.out.println("main value=" + Tools.t1.get());

        Thread.sleep(500);

        ThreadA threadA = new ThreadA();
        threadA.start();
    }
}
复制代码

4.3 验证

public class InheritableThreadLocalExt extends InheritableThreadLocal {

    @Override
    protected Object initialValue() {
        return "defaultValue";
    }
}

public class Tools {

    public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();
}

public class ThreadA extends Thread {

    @Override
    public void run() {
        try {
            for (int i = 1;i <= 20;i++) {
                System.out.println("ThreadA value=" + Tools.t1.get());
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("main before set value=" + Tools.t1.get());

        ThreadA threadA = new ThreadA();
        threadA.start();

        Thread.sleep(2000);
        Tools.t1.set("setValue");
        System.out.println("main after set value=" + Tools.t1.get());
    }
}
复制代码

学自《Java多线程编程核心技术》

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享