java多线程基础

java多线程基础笔记

这应该是这个学期唯一一次记的java笔记。

之所以要单独拿出来记记,主要还是因为多线程这部分相关知识点比较庞杂,但也很重要。我们从下面几个部分来介绍多线程模型。

[TOC]

线程的基本概念

线程的创建

先来代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadTest {

public static final int DELAY = 10;
public static final int STEPS = 100;
public static final double MAX_AMOUNT = 1000;

public static void main(String[] args) {
Runnable task1 = () -> {
try {
for (int i = 1; i <= STEPS; ++i) {
double amount = MAX_AMOUNT * Math.random();
System.out.println("thread1" + i + ": " + amount);
Thread.sleep(DELAY);
}

} catch (InterruptedException e) {
}
};
Runnable task2 = () -> {
try {
for (int i = 1; i <= STEPS; ++i) {
double amount = MAX_AMOUNT * Math.random();
System.out.println("thread2" + i + ": " + amount);
Thread.sleep(DELAY);
}
} catch (InterruptedException e) {

}
};
var t = new Thread(task1);
t.setName("mName");
t.start();
new Thread(task2).start();
}
}

Thread构造器接受一个Runnable接口,start函数调用。非常简单!

线程的状态

线程有六种状态:

新建,运行,阻塞,等待,计时等待,终止

  • 新建 使用 new Thread(r)
  • 运行 调用start()方法就处于可运行状态。抢占式调度系统会给不同线程以时间块,用完后会剥夺线程运行权。
    可以用yield方法强制交出运行权。
  • 阻塞/等待 需要由线程调度器重新激活线程。
    • 获取一个锁,但是锁被其他线程占有,则阻塞到其他线程释放锁,称为阻塞状态
    • 调用等待状态,通过判断等待条件实现,称为等待状态
    • 超时参数方法让进程进入计时等待,直到超时期满,称为计时等待状态。
  • 终止 在run方法正常退出或有异常终止run方法时,会退出线程。

线程的属性

对于这些不同的线程,有许多属性。这里我们进行介绍。

(1)中断线程

在Java中,没有方法可以强制的中断某个线程,除去某个遗臭万年的stop以外。但是我们可以通过interrupt对线程发出一个停止的请求。

可以采用isInterrupted 查看是否设置了中断。常见的结构为:

1
2
3
4
5
6
7
8
9
Runnable r = () -> {
try {
while (!Thread.currentThread().isInterrupted()) {
...
}
} catch (InterruptedException e) {

} finally {}
}

这样,当当前线程被设置为中断时,就不能继续执行后面的语句了。

interrupt语句用来设置中断状态,发送一个中断请求。

这里要注意一点,假如当前的线程已经被sleep阻塞,那么这种检查是没有意义的,只会抛出一个中断异常。

(2)守护线程

可以通过setDaemon 设置某个线程为守护线程。

对于计时器一类的线程,用该方法非常方便。

(3)线程名

setName 起个好名字

同步和锁

竞态条件

先来考虑下面的这样一个例子。

我们建立一个银行的Bank类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.Arrays;
import java.util.concurrent.locks.*;

public class Bank {

private final double[] accounts;

public Bank(int n, double initialBalance) {
accounts = new double[n];
Arrays.fill(accounts, initialBalance);
}
public void transfer(int from, int to, double amount) {
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf("%10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf("Total Balance: %10.2f%n", getTotalBalance());

}

public double getTotalBalance() {
double sum = 0;
for (var i : accounts) sum += i;
return sum;
}

public int size() {
return accounts.length;
}

}

接下来我们调用他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class UnsynchBankTest {
public static final int NACCOUNTS = 10;
public static final double INITIAL_BALANCE = 1000;
public static final double MAX_AMOUNT = 1000;
public static final int DELAY = 10;

public static void main(String[] args) {
var bank = new Bank(NACCOUNTS, INITIAL_BALANCE);
for (int i = 0; i < NACCOUNTS; ++i) {
int fromAccount = i;
new Thread(() -> {
try {
while(true) {
int toAccount = (int)(bank.size() * Math.random());
double amount = MAX_AMOUNT * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
} catch (InterruptedException e) {

}
}).start();
}
}
}

这段代码的含义很好解释。我们随机新建了10个账户,然后在其中任意挑两个进行转账操作。但是这样会有一些问题。

假如我们去运行的时候,会发现在某些输出中,银行的总额不是10000了!

这无疑对于金融机构是毁灭性的——我们什么都没做,钱自己变少了。

可是这是为什么呢?考虑下面这个语句:

accounts[to] += amount

其过程大致如下:

  1. 将accounts[to]加载到寄存器

  2. 与amount累加

  3. 将结果写回

现在假设我们的CPU不讲武德,在第二步的时候线程被中断了。现在另一个线程,好巧不巧的,也去更新这个东西了。

假设原来accounts[to]有500块钱,然后第一个线程拿着1145.14元去更新,得到了1645.14块钱,正打算放回去的时候被鲨了;第二个线程拿着19198.10块钱去更新,得到了19698.10块钱,accounts[to]的值变成了19698.10块钱。这个时候第一个线程醒了,发现自己还有个东西在寄存器里扔着,连忙回来执行第三步;于是呢,accounts[to]变成了1145.14块钱。

钱就是这么少的。

锁对象

为此,我们必须给他加锁——保证让被保护的区域中只有一段代码会被执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Bank {

private final double[] accounts;
private ReentrantLock bankLock = new ReentrantLock();

//...

public void transfer(int from, int to, double amount) {
bankLock.lock();
// 加锁
try {
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf("%10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf("Total Balance: %10.2f%n", getTotalBalance());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bankLock.unlock();
// 释放锁
}

}

//...

}

ReentrantLock即是锁对象,假如一个线程调用transfer,就给这部分加了锁,那么第二个线程就会在lock处阻塞。直到第一个线程释放了锁,第二个线程才能获取到锁。

我们称这种锁叫做 重入锁 ,为什么叫重入锁呢?我们 观察这个锁包含的代码片段,会发现里面涉及到了一些方法的调用 。如果这个锁不可重用,由于已经锁了,程序就不能继续执行了。所以,这就进入了一种“死锁”状态。

其原理大致是这样的:锁有一个持有计数来跟踪方法调用,每次调用方法A时,让这个计数+1,退出时-1.当这个持有计数变成0,表明线程可以释放锁了。

如果我们构造的时候传入参数true ,就建立一个 公平锁

在公平的锁中,如果有另一个线程持有锁或者有其他线程在等待队列中等待这个所,那么新发出的请求的线程将被放入到队列中。而非公平锁上,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中(此时和公平锁是一样的)。所以,它们的差别在于非公平锁会有更多的机会去抢占锁。

https://www.jianshu.com/p/eaea337c5e5b

但是这种公平策略带来的后果就是效率的降低。

条件对象

假如我们在这里希望做一件事情:如果钱不够转账,就先等等,直到能够转账,那么应该怎么做?

我们可以新增一个条件对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Bank {

private final double[] accounts;
private ReentrantLock bankLock = new ReentrantLock();
private Condition suffientCondition;

public Bank(int n, double initialBalance) {
accounts = new double[n];
Arrays.fill(accounts, initialBalance);
suffientCondition = bankLock.newCondition();
}
public void transfer(int from, int to, double amount) {
bankLock.lock();
try {
while (accounts[from] < amount) {
suffientCondition.await();
// await
}
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf("%10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf("Total Balance: %10.2f%n", getTotalBalance());
suffientCondition.signalAll();
// signal
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bankLock.unlock();
}
}

这一过程有两个核心:一个await,一个signal

await外面的while会让你在不满足条件的时候一直等待,直到满足条件。而signalAll可以通知所有处于await状态的线程检查条件。

除去signalAll方法,还有一个signal方法可以任意通知某个await中的线程。可是在这种环境下调用signal方法有可能进入死锁:假如某个账户还是没人打钱,这种signal就相当于没用,其他的线程如果正好锁着,那么这个锁就出不去了。

synchronized

上面的过程略显繁琐。如果某个锁只需要一个条件,就可以用java的synchronized关键字直接实现了。

1
2
3
4
5
6
7
8
9
10
public synchronized void transfer2(int from, int to, int amount)
throws InterruptedException{
while (accounts[from] < amount) wait();
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf("%10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf("Total Balance: %10.2f%n", getTotalBalance());
notifyAll();
}

wait() notifyAll()notify() 方法分别对应条件对象的await() signalAll()signal() 方法。

所有的对象都有一个内部锁,指定为synchronized方法可以加一个内部锁上去。

但是无论是使用锁对象还是内部锁,这种锁都是很重的锁,所以除非有必要,可以不使用他。除非有多于1个条件的需求,推荐使用synchronized,方便简洁。

同步块

如果只是某一个部分需要锁,可以使用同步块。例如:

1
2
3
4
5
6
public void transfer(Vector<Double> accounts, int from, int to, int amount) {
synchronized(accounts) {
accounts.set(from, accounts.get(from)-amount);
accounts.set(to, accounts.get(to)+amount);
}
}

这样截获一个锁可以方便的完成这种操作。

线程组

在java中,可以新建一个线程组对线程进行管控。

完成这样操作的线程组我们称为ThreadGroup。

在现在的开发场景中,线程组一般被线程池替代,但是我们还是有必要介绍其使用办法。

新建一个线程组可以用

1
2
ThreadGroup parent = new ThreadGroup("parent");
ThreadGroup child = new ThreadGroup(parent,"child");

每一个线程组有一个固定的名字,如果多传递一个参数则成为一个子组。

向线程组中添加新线程可以用

1
Thread t1 = new Thread(parent, "t1")

来执行。线程组可以调用stop suspend resume来控制线程,但是这些方法 都千万不要使用

之前我们在属性部分,有一个属性没有提及,在这里进行介绍。对于某个线程,如果出现错误,可以设置其unCaughtException方法;

而如果没有单另设定,则直接调用线程组的该方法。

我们可以对一个线程组设置一个统一 的处理机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadGroupTest {
public static void main(String[] args) {
ThreadGroup group = new ThreadGroup("father") {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName() + ":" + e.getMessage());
}
};
Thread thread1 = new Thread(group, () -> {
throw new RuntimeException("qwq");
}, "haha");

thread1.start(); // haha:qwq

}
}

原子性

我们之前提到过,一个普通的赋值语句a=a+1 可能分成了好多条指令实现,而在此过程中如果中断就会出现问题。

那么就有这样的解决思路:假如这个语句并不是很多条指令组合起来的,而是一条 原子指令 ,那么这种赋值就不会出现问题。

java.util.concurrent.atomic 中给出了许多实现。这里简单介绍其中几个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.atomic.*;

public class AtomicTest {

private static AtomicInteger atomicInteger = new AtomicInteger();
private static LongAdder longAdder = new LongAdder();
private static LongAccumulator longAccumulator = new LongAccumulator(Long::max, -10);

public static void main(String[] args) {
atomicInteger.set(5);
long id = atomicInteger.incrementAndGet();
System.out.println(id);
atomicInteger.set(10);
System.out.println(atomicInteger.get());
// atomicInteger.set(Math.max(3,4));
atomicInteger.updateAndGet(x -> Math.max(x,5));
atomicInteger.accumulateAndGet(5,Math::max);
System.out.println(atomicInteger.get());

longAdder.reset();
for (int i = 1; i <= 10; ++i) {
if (i % 3 == 0)
longAdder.increment();
}
long total = longAdder.sum();
System.out.println(total);

for (int i = -3; i <= 5; ++i) {
longAccumulator.accumulate(i);
}
System.out.println(longAccumulator.get());

}
}

atomicInteger是包装类的原子实现。对其执行自增、set、get等操作都是原子的。

但是考虑下面这句话:atomicInteger.set(Math.max(3,4)); ,由于max操作不是原子的,所以本身并不是原子的。

可以用updateAndGet函数,传入一个lambda表达式实现其原子性。accumulateAndGet原理也类似。

但是atomicInteger是一种乐观锁,这就导致,如果有大量线程访问相同原子值,这种乐观更新尝试代价会很大。解决方法是Adder和Accumulator。这里以Long为例。

LongAdder 方法名和atomicLong比较类似,也是累加之后进行sum。LongAccumulator则是在构造器中直接传入处理的函数和初值,调用accumulate进行处理。假设传入的运算是$\oplus$ ,那么get到的结果就是$a_1 \oplus a_2 \oplus \cdots \oplus a_n$ 。

volatile

作用

这个关键字是多线程的难点所在。

这里要稍微涉及一点jvm虚拟机的知识。

volatile关键字的特性在于:

  • 保证不同线程的变量操作的内存可见性
  • 禁止指令重排序

jvm虚拟机做了一种内存模型叫做jmm模型。jmm模型中对于并发过程的核心在于三个特性:原子性、可见性和有序性。原子性我们之前已经介绍过;下面主要介绍后两种。

对于线程来说,每个线程都有一个私有的本地内存,储存了线程所使用主内存的副本拷贝。而线程在本地内存中将某个局部变量进行了修改,可是这个变量可能已经被其他线程的本地内存所缓存,此时得到的是旧值。这就是为什么竞态条件会产生。

由此,引出了重要的可见性概念:

可见性 对于一个变量如果进行修改,之后会立即刷新到主存中,这种特性叫做可见性。

被volatile修饰的变量,在修改后会强制刷新到主存中,并导致其他线程的变量缓存无效。

除此之外,编译器为了优化程序性能会对指令进行一些排序。其排序遵循下面两个原则:

  • 不会对数据依赖关系的操作进行重排序
  • 单线程下执行结果不发生改变。

例如,a=1;b=2;c=a+b;中,我们可以交换前两条指令的位置,但是绝不会将前两条指令放到第三条之后。

可是在多线程环境下,这种依赖关系可能出现问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class test {
int a = 1;
boolean f = false;

public void changeStatus () {
a = 2;
f = true;
}

public void run() {
if (f) {
int b = a + 1;
System.out.println(b);
}
}

}

可是,考虑changeStatus中的两条语句,假如我们先执行了f=true ,然后CPU又不讲武德的中断了当前线程,那么这个时候,f=truea=1 。所以最后的结果可能会变成2.

由此引入了有序性性的概念:

有序性 代码按照顺序执行、不会因为指令重排而改变次序的特性叫做有序性。

JMM同时规定了一系列先天有序性,我们称为happens-before原则。这一原则大致如下:

程序次序规则: 在一个单独的线程中,按照程序代码的执行流顺序,(时间上)先执行的操作happen—before(时间上)后执行的操作
同一个线程中前面的所有写操作对后面的操作可见

管理锁定规则:一个unlock操作happen—before后面(时间上的先后顺序)对同一个锁的lock操作。
如果线程1解锁了monitor a,接着线程2锁定了a,那么,线程1解锁a之前的写操作都对线程2可见(线程1和线程2可以是同一个线程)

volatile变量规则:对一个volatile变量的写操作happen—before后面(时间上)对该变量的读操作。
如果线程1写入了volatile变量v(临界资源),接着线程2读取了v,那么,线程1写入v及之前的写操作都对线程2可见(线程1和线程2可以是同一个线程)

线程启动规则:Thread.start()方法happen—before调用用start的线程前的每一个操作。
假定线程A在执行过程中,通过执行ThreadB.start()来启动线程B,那么线程A对共享变量的修改在接下来线程B开始执行前对线程B可见。注意:线程B启动之后,线程A在对变量修改线程B未必可见。

线程终止规则:线程的所有操作都happen—before对此线程的终止检测,可以通过Thread.join()方法结束、Thread.isAlive()的返回值等手段检测到线程已经终止执行。
(线程t1写入的所有变量,在任意其它线程t2调用t1.join(),或者t1.isAlive() 成功返回后,都对t2可见。)

线程中断规则:对线程interrupt()的调用 happen—before 发生于被中断线程的代码检测到中断时事件的发生。
(线程t1写入的所有变量,调用Thread.interrupt(),被打断的线程t2,可以看到t1的全部操作)

对象终结规则:一个对象的初始化完成(构造函数执行结束)happen—before它的finalize()方法的开始。
(对象调用finalize()方法时,对象初始化完成的任意操作,同步到全部主存同步到全部cache。)

传递性:如果操作A happen—before操作B,操作B happen—before操作C,那么可以得出A happen—before操作C。
A h-b B , B h-b C 那么可以得到 A h-b C

https://www.jianshu.com/p/b9186dbebe8e 摘自【并发重要原则】 happens-before的理解和应用

其他的暂且不管,但有一点很关键:

被volatile修饰的变量,不会因为指令重排而改变读取顺序。

但是应该注意的一点是, volatile 并不会带来原子性。

不过无论如何,volatile是一种轻量级对字段 ”上锁“ 的方式,当然实际上底层并没有用锁来实现。

应用

一个非常经典的应用是用在单例模式的双重锁上。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestInstance{
private volatile static TestInstance instance;

public static TestInstance getInstance(){
if(instance == null){
synchronized(TestInstance.class){
if(instance == null){
instance = new TestInstance();
}
}
}
return instance;
}

如果不使用volatile的话,instance = new TestInstance() 会分解成三部分:

1
2
3
memory = allocate() //分配内存
ctorInstanc(memory) //初始化对象
instance = memory //设置instance指向刚分配的地址

假如交换了2和3,并在3处中断,其他线程就会错误的认为已经创建了单例对象,而返回一个未初始化对象。

线程安全数据结构和算法

BlockingQueue

阻塞队列是一种非常实用的数据结构。传统队列在队列满的塞或者空的时候取时候会抛异常,但是很多多线程场景确实需要实现这样的需求。为此,我们引入阻塞队列,来维护这一特性。

阻塞队列有三种用途:

  • 用于单线程环境下的存取,调用add remove element
  • 用于多线程环境下的存取,调用offer poll peek ,此时只会返回null
  • 用于线程管理,调用put take

阻塞队列可以用于协调多个线程协作。我们可以让工作线程将中间结果存在阻塞队列中,其他工作队列会移除中间结果,然后进一步修改。如果第一组线程较慢,队列会阻塞来等待。如果第一组线程较快,队列会填满,等待第二组线程追上来。

下面展示一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class BlockingQueueTest {
// 维护线程使用put / take
// 插入/队头/删除 offer poll peek

public static final int FILE_QUEUE_SIZE = 10;
public static final int SEARCH_THREADS = 100;
public static final Path DUMMY = Path.of("");
public static BlockingQueue<Path> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);

public static void main(String[] args) {
try (var in = new Scanner(System.in)) {
System.out.print("Enter Directory");
String directory = in.nextLine();
System.out.print("Enter Keywords");
String keywords = in.nextLine();
Runnable enumerator = () -> {
try {
enumerate(Path.of(directory));
queue.put(DUMMY);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; ++i) {
Runnable searcher = () -> {
try {
var done = false;
while (!done) {
Path file = queue.take();
if (file == DUMMY) {
queue.put(file);
done = true;
}
else search(file, keywords);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {

}
};
new Thread(searcher).start();
}
}
}

public static void enumerate(Path directory) throws IOException, InterruptedException {
try (Stream<Path> children = Files.list(directory)) {
for (Path child : children.collect(Collectors.toList())) {
if (Files.isDirectory(child))
enumerate(child);
else
queue.put(child);
}
}
}

public static void search(Path file, String keyword) throws IOException {
try (var in = new Scanner(file, StandardCharsets.UTF_8)) {
int lineNumber = 0;
while (in.hasNextLine()) {
++lineNumber;
String line = in.nextLine();
if (line.contains(keyword))
System.out.printf("%s:%d:%s%n", file, lineNumber, line);
}
}
}

}

这个类可以查找某个目录下所有文件里是否出现字符串。

首先我们创建了一个生产者线程,将所有文件递归的放到阻塞队列中,最后再扔一个DUMMY进去。

接下来我们创建大量搜索线程。这些搜索线程取出阻塞队列的元素,对文件进行查找,直到查找到DUMMY为止。

由于阻塞队列大小有限,所以我们自然而然的实现了线程的同步。

BlockingQueue有一些子类,LinkedBlockingQueue、ArrrayBlockingQueue等。值得一提的是PriorityBlockingQueue,可模拟实现线程的优先级。TransferQueue允许生产者进行线程等待,直到消费者准备就绪。这些具体实现这里不再展开。

ConcurrentHashMap

这是一种特殊的hashmap,可以高效的实现并发。

先看下面这个例子,可以统计目录下所有.java文件的单词词频。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CHMDemo {
public static ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();

public static void process(Path file) {
try (var in = new Scanner(file)) {
while (in.hasNext()) {
String word = in.next();
map.merge(word, 1L, Long::sum);
}
} catch (IOException e) {

}
}

public static Set<Path> descendants(Path rootDir) throws IOException {
try (Stream<Path> entries = Files.walk(rootDir)) {
return entries.collect(Collectors.toSet());
}
}

public static void main(String[] args) throws IOException, InterruptedException {
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(processors);
Path pathtoRoot = Path.of(".");
for (Path p : descendants(pathtoRoot)) {
if (p.getFileName().toString().endsWith(".java"))
executor.execute(() -> process(p));
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
map.forEach((k,v) -> {
if (v >= 10)
System.out.println(k + " occurs " + v + " times");
});
}
}

这里用了线程池,之后再进行讨论。值得注意的是这里:

1
map.merge(word, 1L, Long::sum);

我们已经见过很多merge了,这里的也不例外:如果word有过初值,就进行sum运算,否则赋为1.并且,这一操作是 原子的 。有一个类似的方法叫做compute ,也是原子的更新。

并发数组算法

Arrays类中有一些常见的并行算法。

  • parallelSort 并行排序,例如 Arrays.parallelSort(words, Comparator.comparing(String::length))
  • parallelSetAll 相当于对数组进行一个map操作,例如Arrays.parallelSetAll(values, i -> i % 10)
  • parallelPrefix 相当于对数组进行前缀运算,例如Arrays.parallelPrefix(values, (x, y) -> x * y)

线程池

如果程序中创建了大量生命周期很短的线程,可以用线程池管理。

Callable与Future

Callable与Runnable类似,是一个返回T的函数式接口:

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

Future可以保存异步计算结果。将Future给某个线程,再计算结束之后就会获得结果。

一个Future接口有如下方法:

  • V get 阻塞调用至计算完成
  • V get(long timeout, TimeUnit unit) 阻塞调用至计算完成,在计算完成前超时抛出TimeoutException异常
  • void cancel(boolean mayInterrupt) 未计算则不启动计算,启动之后如果参数是true就中断
  • boolean isCancelled()boolean isDone() 获取状态

可以用FutureTask 组织:

1
2
3
4
5
Callable<Integer> task = ..
var futureTask = new FutureTask<Integer>(task);
var t = new Thread(futureTask);
t.start();
Integer res = task.get();

FutureTask 可以构造一个既是Future又是Runnable的对象。

Executors

方法 描述
newCachedThreadPool 构造线程池并立即执行各个任务,有空闲线程就使用现有线程执行任务。必要时创建新线程,空闲线程保留60s。
newFixedThreadPool 池中包含固定数目线程,空闲线程一直保留。
newWorkStealingPool fork-join型任务线程池
newSingleThreadExecutor 退化的只有一个线程的池,顺序执行任务
newScheduledThreadPool 调度执行的线程池
newSingleThreadScheduleExecutor 调度执行的单线程池

如果需要创建生存期很短的线程,可以使用缓存线程池。

如果要获得最优运行速度,并发线程数等于处理器内核数,此时应该使用固定线程池。

单线程执行器可以用来分析不并发情况下的效率变化情况。

可以用submit方法提交对象

  • Future<T> submit(Callable<T> task) 提交task并获得返回值
  • Future<?> submit(Runnable task) get完成时返回Null
  • Future<T> submit(Runnanle task, T result) get方法完成返回result

在线程池所有操作结束之后,调用shutdown结束操作。

控制任务组

invokeAny 可以取出任意一个元素,通常的最快得出的。

可以用invokeAll 方法提交一个集合到线程池中并返回其中某个任务的结果。例如

1
2
3
4
5
6
List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for (Future<T> result : results) {
var a = result.get();
...
}

这样的问题是,由于get的阻塞性会让效率降低。可以构造一个ExecutorCompletionService来提交任务到此:

1
2
3
4
5
var service = new ExecutorCompletionService<T>(executor);
for (Callable<T> task : tasks) service.submit(task);
for (int i = 0; i < task.size(); ++i) {
var a = service.take().get();
}

其实际上是托管了一个阻塞队列。

下面举一个例子 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ExecutorDemo {
public static long occurance(String word, Path path) {
try (var in = new Scanner(path)) {
int count = 0;
while (in.hasNext())
if (in.next().equals(word)) ++count;
return count;
} catch (IOException e) {
return 0;
}
}

public static Set<Path> descendants (Path rootDir) throws IOException {
try (Stream<Path> entries = Files.walk(rootDir)) {
return entries.filter(Files::isRegularFile)
.collect(Collectors.toSet());
}
}

public static Callable<Path> searchForTask(String word, Path path) {
return () -> {
try (var in = new Scanner(path)) {
while (in.hasNext()) {
if (in.next().equals(word)) return path;
if (Thread.currentThread().isInterrupted()) {
System.out.println("Search in " + path + "canceled");
return null;
}
}
throw new NoSuchElementException();
}
};
}

public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
try (var in = new Scanner(System.in)) {
System.out.print("Enter base directory:");
String start = in.nextLine();
System.out.print("Keyword");
String word = in.nextLine();

Set<Path> files = descendants(Path.of(start));
var tasks = new ArrayList<Callable<Long>>();
for (Path file : files) {
Callable<Long> task = () -> occurance(word, file);
tasks.add(task);
}
ExecutorService executor = Executors.newCachedThreadPool();

Instant startTime = Instant.now();
List<Future<Long>> results = executor.invokeAll(tasks);
long total = 0;
for (Future<Long> result : results) {
total += result.get();
}
Instant endTime = Instant.now();
System.out.println("Occurrences of " + word + " : " + total);
System.out.println("Time elapsed: " + Duration.between(startTime, endTime).toMillis() + "ms");

var searchTasks = new ArrayList<Callable<Path>>();
for (Path file : files) {
searchTasks.add(searchForTask(word,file));
}
Path found = executor.invokeAny(searchTasks);
System.out.println(word + " occurs in: " + found);

if (executor instanceof ThreadPoolExecutor)
System.out.println("Largest pool size: " + ((ThreadPoolExecutor) executor).getLargestPoolSize());
executor.shutdown();

}
}

}

这个例子可以用在文件搜索中。

fork-join框架

下面的需求用来确定数组有多少元素符合某个条件。

RecursiveTask可以看成一种递归类,我们将计算结果进行合并,得到最终结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.function.DoublePredicate;

public class ForkJoinTest {
public static void main(String[] args) {
final int SIZE = 10000000;
var numbers = new double[SIZE];
for (int i = 0; i < SIZE; ++i) {
numbers[i] = Math.random();
}
var counter = new Counter(numbers, 0, numbers.length, x -> x > 0.5);
var pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}

class Counter extends RecursiveTask<Integer> {
public static final int THERESHOLD = 1000;
private double[] values;
private int from;
private int to;
private DoublePredicate filter;

public Counter(double[] values, int from, int to, DoublePredicate filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}

protected Integer compute() {
if (to - from < THERESHOLD) {
int count = 0;
for (int i = from; i < to; ++i)
if (filter.test(values[i]))
++count;
return count;
} else {
int mid = (from + to) / 2;
var first = new Counter(values, from, mid, filter);
var second = new Counter(values, mid + 1, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}

}

}

线程同步工具

代码直接贴金老师的代码了。

Semaphore

Semaphore是一个信号灯系统。具体来说支持下面的操作:

  • acquire 发出调用请求
  • release 释放调用请求

假如我们设置了一个路口最多通过三辆车的信号灯,那么可以借此来协调这些调用,保证同时通过路口的车不超过3辆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
public static void main(String[] args) {
//ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"进入,当前已有" + (3-sp.availablePermits()) + "个并发");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"即将离开");
sp.release();
//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
System.out.println("线程" + Thread.currentThread().getName() +
"已离开,当前已有" + (3-sp.availablePermits()) + "个并发");
}
};
Thread th=new Thread(runnable);
th.start();

}
}

}

CyclicBarrier

CyclicBarrier支持如下操作:

  • await

CyclicBarrier可以看作一个可重用的栅栏,假如有5个线程调用速度不一样,可以通过这一栅栏把他拦下来,进入await状态。当等待的线程数量达到预设的阈值,栅栏就会跳闸,然后继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.edu.bit.cs.util;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

public static void main(String[] args) {

final CyclicBarrier cb = new CyclicBarrier(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();

Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();
System.out.println("线程" + Thread.currentThread().getName() + "工作结束");
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread th=new Thread(runnable);
th.start();
}

}
}

CountDownLatch

CountDownLatch主要解决的是前后顺序问题。其支持下面的操作:

  • await
  • countDown

考虑下面的场景:

  • 太空人A,B,C准备就绪
  • 指挥中心向太空人发送请求
  • 太空人出舱
  • 太空人返回
  • 指挥中心收到太空人成功返回的消息

那么就可以通过两个CountDownLatch实现:

  • 太空人A,B,C线程启动,await指挥中心命令
  • 指挥中心线程启动,给太空人发送countDown,开始await
  • 太空人A、B、C同时出舱并返回,分别向指挥中心发送countDown
  • 所有countDown都接收到后,指挥中心继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.CountDownLatch;

public class CountdownLatchTest {

public static void main(String[] args) {

final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
System.out.println("线程" + Thread.currentThread().getName() +
"正准备接受命令");
cdOrder.await();
System.out.println("线程" + Thread.currentThread().getName() +
"已接受命令");
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"回应命令处理结果");
cdAnswer.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread th=new Thread(runnable);
th.start();
}
try {
Thread.sleep((long)(Math.random()*10000));

System.out.println("线程" + Thread.currentThread().getName() +
"即将发布命令");
cdOrder.countDown();
System.out.println("线程" + Thread.currentThread().getName() +
"已发送命令,正在等待结果");
cdAnswer.await();
System.out.println("线程" + Thread.currentThread().getName() +
"已收到所有响应结果");
} catch (Exception e) {
e.printStackTrace();
}


}
}

Exchanger

假如有两个格子,Exchanger在格子的信息都填满之后就会进行交换操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable(){
public void run() {
try {

String data1 = "aaa";
System.out.println("线程" + Thread.currentThread().getName() +
"正在等待把数据" + data1 +"换出去");
Thread.sleep((long)(Math.random()*10000));
String data2 = (String)exchanger.exchange(data1);
System.out.println("完成交易,线程" + Thread.currentThread().getName() +
"换回的数据为" + data2);
service.shutdown();
}catch(Exception e){

}
}
});
service.execute(new Runnable(){
public void run() {
try {

String data1 = "bbb";
System.out.println("线程" + Thread.currentThread().getName() +
"正在等待把数据" + data1 +"换出去");
Thread.sleep((long)(Math.random()*10000));
String data2 = (String)exchanger.exchange(data1);
System.out.println("完成交易:线程" + Thread.currentThread().getName() +
"换回的数据为" + data2);
service.shutdown();
}catch(Exception e){

}
}
});
}
}

Phaser

Phaser提供了比较灵活的控制流程。其核心可以分成三个部分:

  • register() 注册一个任务
  • arriveAndAwaitAdvance() 到达某个阶段并等待其他任务
  • arriveAndDeregister() 记录到达某个阶段

每一个Phaser都会维护一个phaser number,表示某个阶段。当所有任务都到达这一阶段,phaser number会累加。

下面举一个经典的例子:考生到达考场

假设一个考试分成三部分,并且每个阶段都需要所有考生做完才能进入下一个阶段,就可以使用这样的结构。

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Main {

public static void main(String[] args) {

MyPhaser phaser=new MyPhaser();

Student students[]=new Student[5];
// 注册
for (int i=0; i<students.length; i++){
students[i]=new Student(phaser);
phaser.register();
}

// 运行线程
Thread threads[]=new Thread[students.length];
for (int i=0; i<students.length; i++) {
threads[i]=new Thread(students[i],"Student "+i);
threads[i].start();
}

// 等待所有考生做完
for (int i=0; i<threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.printf("Main: The phaser has finished: %s.\n",phaser.isTerminated());

}
}

MyPhaser.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package phaserdemo2;

import java.util.concurrent.Phaser;

public class MyPhaser extends Phaser {

// 不同阶段调用的函数。
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
return studentsArrived();
case 1:
return finishFirstExercise();
case 2:
return finishSecondExercise();
case 3:
return finishExam();
default:
return true;
}
}

private boolean studentsArrived() {
System.out.printf("Phaser: The exam are going to start. The students are ready.\n");
System.out.printf("Phaser: We have %d students.\n",getRegisteredParties());
return false;
}

private boolean finishFirstExercise() {
System.out.printf("Phaser: All the students has finished the first exercise.\n");
System.out.printf("Phaser: It's turn for the second one.\n");
return false;
}

private boolean finishSecondExercise() {
System.out.printf("Phaser: All the students has finished the second exercise.\n");
System.out.printf("Phaser: It's turn for the third one.\n");
return false;
}

private boolean finishExam() {
System.out.printf("Phaser: All the students has finished the exam.\n");
System.out.printf("Phaser: Thank you for your time.\n");
return true;
}

}

Student.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package phaserdemo2;

import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Student implements Runnable {

private Phaser phaser;

public Student(Phaser phaser) {
this.phaser=phaser;
}

// 每次await都让phaser计数器+1。
public void run() {
System.out.printf("%s: Has arrived to do the exam. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the first exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise1();
System.out.printf("%s: Has done the first exercise. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the second exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise2();
System.out.printf("%s: Has done the second exercise. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the third exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise3();
System.out.printf("%s: Has finished the exam. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
}

private void doExercise1() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise2() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise3() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


}

异步

在许多语言中都有Promise,java也不例外。

我们先前已经提过Future类,但是Future类的主要问题在于其阻塞性。如果我们用get获得值,这种阻塞会很大程度上降低代码执行效率。

CompletebleFuture就是这样一个工具类。先来看下面的代码:

1
2
3
4
5
6
7
8
9
public CompletableFuture<String> readPage(URL url) {
return CompletableFuture.supplyAsync(() -> {
try {
return new String(url.openStream().readAllBytes(), "UTF-8");
} catch (IOException e) {
throw new UncheckedIOExecption(e);
}
}, executor);
}

这样我们就得到一个异步的网页读取器,并默认放在executor上运行。在执行完成之后,我们可以调用

1
f.thenAccept(s -> ...)

形成一次回调。

我们常见的返回ele,err的回调则是使用

1
2
3
4
5
6
7
f.whenComplete((ele, err) -> {
if (err == null) {
..
} else {
..
}
})

对于一个Promise,我们还可以显式的设置receive

1
2
3
4
5
var f = new CompletebleFuture<Integer>();
executor.execute(() -> {
int n = ...;
f.complete(n);
})

Promise一般还有一个重要特性:组合。

下面列出了一些常见的可组合的函数:

方法名 类型 描述
thenApply T -> U 对结果应用函数
thenAccept T -> void 对结果应用函数
thenCompose T -> CompletebleFuture<U> 调用函数并返回future
handle (res, err) -> U 处理结果或错误
whenComplete (res, err) -> void 处理结果或错误
exceptionally Throwable -> T 从错误计算结果
completeOnTimeout T, long, TimeUnit 超时生成定值
orTimeout long, TimeUnit 超时抛出异常
thenRun Runnable 执行Runnale

下面举一个爬虫的例子。假如我们要爬一个网页上的图片,分成四个步骤

  • 读取页面(url -> CompletebleFuture)
  • 获取页面图片的url (String -> List)
  • 获取图片(List -> CompletebleFuture)
  • 将图片存到本地(List -> void)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class CompletableFutureDemo {
public static final Pattern IMG_PATTERN = Pattern.compile(
"[<]\\s*[iI][mM][gG]\\s*[^/>]*[sS][rR][cC]\\s*[=]\\s*['\"]([^'\"]*)['\"][^>]*[>]");
private ExecutorService executor = Executors.newCachedThreadPool();
private URL urlToProcess;

public CompletableFuture<String> readPage(URL url) {
return CompletableFuture.supplyAsync(() -> {
try {
var contents = new String(url.openStream().readAllBytes(), StandardCharsets.UTF_8);
System.out.println("Read Page from " + url);
return contents;
} catch (IOException e) {
e.printStackTrace();
throw new UncheckedIOException(e);
}
}, executor);
}

public List<URL> getImageURLs(String webpage) {
try {
var res = new ArrayList<URL>();
Matcher matcher = IMG_PATTERN.matcher(webpage);
while (matcher.find()) {
var url = new URL(urlToProcess, matcher.group(1));
res.add(url);
System.out.println("Found URL:" + res);
}
return res;
} catch (MalformedURLException e) {
e.printStackTrace();
throw new UncheckedIOException(e);
}
}

public CompletableFuture<List<BufferedImage>> getImage(List<URL> urls) {
return CompletableFuture.supplyAsync(() -> {
try {
var res = new ArrayList<BufferedImage>();
for (URL url : urls) {
res.add(ImageIO.read(url));
System.out.println("Loaded" + url);
}
return res;
} catch (IOException e) {
e.printStackTrace();
throw new UncheckedIOException(e);
}
},executor);
}

public void saveImages(List<BufferedImage> images) {
System.out.println("Svaing " + images.size() + " images");
try {
for (int i = 0; i < images.size(); ++i) {
String filename = "E:\\LittleRewriter\\CS-2020-2021-1" + (i+1) + ".png";
ImageIO.write(images.get(i), "PNG", new File(filename));
System.out.println("write" + filename);
}
} catch (IOException e) {
e.printStackTrace();
throw new UncheckedIOException(e);
}
executor.shutdown();
}

public void run(URL url) {
urlToProcess = url;
CompletableFuture.completedFuture(url)
.thenComposeAsync(this::readPage, executor)
.thenApply(this::getImageURLs)
.thenCompose(this::getImage)
.thenAccept(this::saveImages);

}

public static void main(String[] args) throws MalformedURLException {
new CompletableFutureDemo().run(new URL("http://lirewriter.cn/"));
}

}

值得一提的是,假如一个方法返回的是T -> CompletebleFuture<U> ,另一个方法返回的是U -> R ,那么用.thenCompose().thenApply 的效果相当于执行了一个T -> U

并行流

在java中有parallelStream其实现机制是并行的。

还是举经典的单词筛选的例子,使用

1
2
3
Map<Integer, Long> shortWordCounts = words.parallelStream()
.filter(s -> s.length() < 10)
.collect(groupingBy(String::length, counting()));

一般而言,这种操作都具有顺序性。如果不需要引入顺序化,可以指定unordered()

JavaFX中的Task

// TODO…

进程

最后我们来谈谈进程。

进程的建立

使用ProcessBuilder建立进程

1
var builder = new ProcessBuilder("gcc", "qwq.c");

每个进程有一个工作目录,默认在java程序启动目录,可以更改目录

1
builder = builder.directory(path.toFile());

接下来可以建立输入输出和错误流

1
2
3
OutputStream processIn = p.getOutputStream();
InputStream processOut = p.getInputStream();
InputStream processOut = p.getErrorStream();

可以将输入输出重定向到文件中

1
2
3
builder.redirectInput(inputFile)
.redirectOutput(outputFile)
.redirectError(errorFile)

Java9有一个管道方法startPipeline

1
2
3
4
5
6
7
8
List<Process> processes = ProcessBuilder.startPipeline(
List.of(new ProcessBuilder("find", "/opt/jdk-9"),
new ProcessBuilder("grep", "-o", "\\.[^./]*$"),
new ProcessBuilder("sort"),
new ProcessBuilder("uniq")
));
Process last = processes.get(processes.size()-1);
var res = new String(last.getInputStream().readAllBytes());

进程的执行

现在我们运行进程了

1
2
3
4
5
6
7
Process process = new ProcessBuilder("/bin/ls", "-l")
.directory(Path.of("/tmp").toFile())
.start();
try(var in = new Scanner(process.getInputStream())) {
while (in.hasNextLine())
System.out.println(in.nextLine());
}

阻塞式的写法是

1
int result = process.waitFor();

也可以异步的获取数据:

1
process.onExit().thenExit(p -> ...)

可以用p.toHandle() 获取句柄,查看进程ID、父进程、子进程、后代进程等信息。

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×