Java线程通信

线程通信是Java线程部分的重点,我们介绍一下常见的几种线程通信方式。

线程锁与同步

锁机制是线程通信的一种重要方式。当多个线程竞争某一个对象时,一旦某个线程获得对象就会立刻将其上锁,其他线程只能等待锁被释放才可以继续竞争。当我们需要进行多个线程间的同步时,经常利用线程锁完成。

在下面的代码中,两个线程总会有一个执行先后顺序,但后执行的线程必须等待先执行的代码运行结束才可以执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ObjectLock {

private static Object obj = new Object();

static class MyRunnableA implements Runnable {
@Override
public void run() {
synchronized (obj) {
for (int i = 1; i <= 100; ++i) System.out.println("MyRunnableA: " + i);
}
}
}

static class MyRunnableB implements Runnable {
@Override
public void run() {
synchronized (obj) {
for (int i = 1; i <= 100; ++i) System.out.println("MyRunnableB: " + i);
}
}
}

public static void main(String[] args) {
new Thread(new MyRunnableA(), "线程1").start();
new Thread(new MyRunnableB(), "线程2").start();
}
}

结果显示一个线程运行结束了以后才可以运行下一个线程。

等待/通知机制

等待/通知机制同样是线程通信方式中比较重要的方法,这里的等待使用的是Object类中的wait()方法,通知使用的是Object类中的notify()notifyAll()方法,以上三个方法都必须在synchronized中使用。notify()方法一次只能随机唤醒一个等待线程,而notifyAll()会唤醒所有的等待线程。当线程使用wait()方法时,会自动释放占有的锁资源。我们用代码来查看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class WaitAndNotify {

private static Object obj = new Object();

static class MyRunnableA implements Runnable {
@Override
public void run() {
synchronized (obj) {
for (int i = 0; i < 5; ++i) {
System.out.println("MyRunnableA: " + i);
obj.notify();
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
obj.notify();
}
}
}

static class MyRunnableB implements Runnable {
@Override
public void run() {
synchronized (obj) {
for (int i = 0; i < 5; ++i) {
System.out.println("MyRunnableB: " + i);
obj.notify();
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
obj.notify();
}
}
}

public static void main(String[] args) {
new Thread(new MyRunnableA(), "线程1").start();
new Thread(new MyRunnableB(), "线程2").start();
}
}

上面的代码交替等待和唤醒对方从而实现交替打印的效果,结果如下。

volatile共享内存

volatile关键字保证了线程间的内存可见性,本文暂不展开讲,后面会有专门的文章讲volatile和JMM(Java内存模型)相关的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class VolatileDemo {

private static volatile boolean flag = true;

static class MyRunnableA implements Runnable {
@Override
public void run() {
System.out.println("start time: " + System.currentTimeMillis());
while (flag) ;
System.out.println("flag已被置为false");
System.out.println("end time: " + System.currentTimeMillis());
}
}

static class MyRunnableB implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("flag即将被置为false");
flag = false;
}
}

public static void main(String[] args) {
new Thread(new MyRunnableA(), "线程1").start();
new Thread(new MyRunnableB(), "线程2").start();
}
}

以上的代码线程1初始时会阻塞在while循环里,但是线程2休眠1秒后将主内存中的flag改成了false,线程1第一时间获取了主内存的变量值变化并退出循环。具体结果如下:

管道

Java提供了四个类来实现管道,分别是PipedReaderPipedWriterPipedInputStreamPipedOutputStream。前两个是面向字符的,后两个是面向字节的。下面的代码实现了一个简单的管道通信,模拟了C/S结构的通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Pipeline {

private static PipedReader reader = new PipedReader();
private static PipedWriter writer = new PipedWriter();

static class Server implements Runnable {

@Override
public void run() {
try {

int receivedData = 0;
while ((receivedData = reader.read()) != -1) {
System.out.print((char)receivedData);
}
System.out.println();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

static class Client implements Runnable {

@Override
public void run() {
try {
writer.write("test");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) throws InterruptedException, IOException {
writer.connect(reader);
new Thread(new Server()).start();
TimeUnit.SECONDS.sleep(1);
new Thread(new Client()).start();
}
}

信号量

学过操作系统的同学们一定对信号量的概念不陌生,在Java中有一个定义信号量的工具类Semaphore。我们以一个经典的生产者-消费者模型来测试一下这种通信方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class SemaphoreDemo {

static class Producer implements Runnable {

Semaphore mutex;
Semaphore empty;
Semaphore full;

public Producer(Semaphore mutex, Semaphore empty, Semaphore full) {
this.mutex = mutex;
this.empty = empty;
this.full = full;
}

@Override
public void run() {
while (true) {
try {
System.out.println("生产一些东西");
TimeUnit.SECONDS.sleep(1);
empty.acquire();
mutex.acquire();
System.out.println("将数据放入缓冲中");
TimeUnit.SECONDS.sleep(1);
mutex.release();
full.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class Consumer implements Runnable {
Semaphore mutex;
Semaphore empty;
Semaphore full;

public Consumer(Semaphore mutex, Semaphore empty, Semaphore full) {
this.mutex = mutex;
this.empty = empty;
this.full = full;
}
@Override
public void run() {
while (true) {
try {
full.acquire();
mutex.acquire();
System.out.println("将数据从缓冲中取出");
TimeUnit.SECONDS.sleep(1);
mutex.release();
empty.release();
System.out.println("消费一些东西");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
Semaphore mutex = new Semaphore(1);
Semaphore empty = new Semaphore(10);
Semaphore full = new Semaphore(0);
new Thread(new Producer(mutex, empty, full)).start();
new Thread(new Consumer(mutex, empty, full)).start();
}
}

上面的代码模拟了经典的生产者-消费者模型,生产者一直在生产产品,消费者一直在消费产品,我们可以得到下面的运行结果:

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2015-2022 sky-ng
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信