微信扫描,分享到朋友圈和群
一道有趣的并发编程试题
今天看到《如何优雅的让3个线程打印ABC》这篇文章,就像文章里讲到的,3个线程顺序打印ABC确实没有必要,很浪费,单纯考察多线程的控制机制,面稍微有点窄,也是挺鸡肋的。实际上,既要多线程并发执行,又要输出结果有序,这样的场景还真有,类似于存储系统里面的单个文件顺序存储,但是又要支持多个客户端并发写,即:多个线程并发提交IO,由一个类似于总线的机制排序,打包,按包写盘,完成后同时通知各个等待响应的客户端。这里不打算介绍复杂的IO逻辑,想拿一个简单的模型来实例解读一下。
在之前公司经常给面试者出一道题:计算 [10亿, 12亿) 范围内的质数,要求:
- 使用多线程并发计算
- 边计算边输出,按照从小到大排序输出
- 内存使用量越小越优,已知的可用内存量不足以存储所有计算结果
- 耗时越短越优
同上面的题目一样,没有华丽的算法,简单考察应试者对多线程的理解和编码能力。但这道题比上面题目要更注重实用,并不是为了多线程而多线程,并且多线程能真正起到性能提升的效果,上面的ABC只是借锁来串行化了任务执行过程,严格意义上讲,不能算是实用的多线程实例。
下面给出思路,质数计算有一定的工作量,并且随着数字越大计算量越大,题目要求的计算范围很广,因此可以简单的以数字为单位分给多个线程去运算。
尝试写了一版,发现结果队列中的记录积压严重,调整线程池的大小也无济于事。这个并发模型用得不对,用LinkedBlockingQueue仅仅解决两个线程之间的同步,可能太浪费了。take调用的开销应该太大了。
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
private int i = 1000000000;
private static final int MAX_NUMBER = 1100000000;
public static void main(String[] args) throws InterruptedException {
final Main m = new Main();
ThreadPoolExecutor executor = new ThreadPoolExecutor(200, 200, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
CalTask[] cs = new CalTask[200];
for (int i = 0; i < 200; i++) {
cs[i] = new CalTask(m);
executor.submit(cs[i]);
}
TreeSet<OneItem> s = new TreeSet<>();
for (int i = 0; i < 200; i++) {
s.add(new OneItem(i, cs[i].takeFirstOne()));
}
while (true) {
OneItem one = s.first();
if (one.primeNumber == MAX_NUMBER) {
break;
}
s.remove(one);
// System.out.println(one.primeNumber + ", ");
s.add(new OneItem(one.threadNum, cs[one.threadNum].takeFirstOne()));
}
executor.shutdown();
}
private synchronized int getNext() {
return i++;
}
private static class OneItem implements Comparable<OneItem> {
private final int threadNum;
private final int primeNumber;
private OneItem(int t, int p) {
this.threadNum = t;
this.primeNumber = p;
}
@Override
public int compareTo(OneItem oneItem) {
return this.primeNumber - oneItem.primeNumber;
}
}
private static class CalTask implements Runnable {
private final LinkedBlockingQueue<Integer> result = new LinkedBlockingQueue<>();
private final Main m;
private volatile boolean isEnd = false;
public CalTask(Main m) {
this.m = m;
}
public void run() {
while (true) {
int curnum = m.getNext();
if (curnum > MAX_NUMBER) {
System.out.println("end this task.");
isEnd = true;
break;
}
boolean isPrime = true;
int max = (int) Math.sqrt(curnum);
for (int j = 2; j <= max; j++) {
if (curnum % j == 0) {
isPrime = false;
break;
}
}
if (isPrime) {
try {
result.put(curnum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public int takeFirstOne() throws InterruptedException {
if (isEnd && result.isEmpty()) {
return MAX_NUMBER;
}
System.out.println("queue size is: " + result.size());
return result.take();
}
}
}
试着改进回忆当年是怎么给这个答案的,然后又写了下面这一版,似乎能运行起来了 :-),感觉锁的粒度大了,有浪费,然后存在明显的busy-waiting问题。
import java.util.Iterator;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
private static int i = 1000000000;
private static final int MAX = 1100000000;
private static final int THREAD_COUNT = 32;
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
CalTask[] cs = new CalTask[THREAD_COUNT];
TreeSet<Integer> res = new TreeSet<>();
for (int i = 0; i < THREAD_COUNT; i++) {
cs[i] = new CalTask(res);
executor.submit(cs[i]);
}
while (true) {
Thread.sleep(100);
int min = MAX;
for (int i = 0; i < THREAD_COUNT; i++) {
int tmpNum = cs[i].getCurNum();
if (tmpNum <= min) {
min = tmpNum;
}
}
synchronized (Main.class) {
Iterator<Integer> itr = res.iterator();
while (itr.hasNext()) {
int n = itr.next();
if (n <= min) {
System.out.println(n);
itr.remove();
} else {
break;
}
}
}
if (MAX <= min) {
break;
}
}
System.out.println("");
executor.shutdown();
}
private static synchronized int getNext() {
return i++;
}
private static class CalTask implements Runnable {
private final TreeSet<Integer> res;
private volatile int curNum = 0;
public CalTask(TreeSet<Integer> res) {
this.res = res;
}
public void run() {
while (true) {
curNum = getNext();
if (curNum >= MAX) {
System.out.println("end this task.");
break;
}
boolean isPrime = true;
int max = (int) Math.sqrt(curNum);
for (int j = 2; j <= max; j++) {
if (curNum % j == 0) {
isPrime = false;
break;
}
}
if (isPrime) {
synchronized (Main.class) {
res.add(curNum);
// System.out.println("queue size is: " + res.size());
}
}
}
}
public int getCurNum() {
return Math.min(curNum, MAX);
}
}
}