如何估算 Java 线程池的大小与队列长度

估算算法

第一种估算算法

先来一个天真的估算算法:假设要求一个系统的 TPS(Transaction Per Second 或者 Task Per Second)至少为 20,然后假设每个 Transaction 由一个线程完成,继续假设平均每个线程处理一个 Transaction 的时间为 4s。那么问题可以转化为:如何设计线程池大小,使得可以在 1s 内处理完 20 个 Transaction?这里计算过程可以很简单,每个线程的处理能力为 0.25TPS,那么要达到 20TPS,显然需要 20/0.25=80 个线程。

很显然这个估算算法很天真,因为它没有考虑到 CPU 数目。一般服务器的 CPU 核数为 16 或者 32,如果有 80 个线程,那么肯定会带来太多不必要的线程上下文切换开销。

第二种估算算法

第二种估算算法比较简单,但不知是否可行(N 为 CPU 总核数):

  • 如果是 CPU 密集型应用,则线程池大小设置为 N+1
  • 如果是 IO 密集型应用,则线程池大小设置为 2N+1

如果一台服务器上只部署这一个应用并且只有一个线程池,那么这种估算或许合理,具体还需自行测试验证。

第三种估算算法

第三种方法是在服务器性能 IO 优化中发现的一个估算公式:

最佳线程数目 = ((线程等待时间 + 线程 CPU 时间)/ 线程 CPU 时间 )* CPU 数目

比如平均每个线程 CPU 运行时间为 0.5s,而线程等待时间(非 CPU 运行时间,比如 IO)为 1.5s,CPU 核心数为 8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式可以进一步转化为:

最佳线程数目 = (线程等待时间与线程 CPU 时间之比 + 1)* CPU 数目

这里可以得出一个结论(第二种估算算法也可以和这个结论相结合):

  • 线程 CPU 时间所占比例越高,需要越少线程
  • 线程等待时间所占比例越高,需要越多线程

估算算法总结

一个系统最快的部分是 CPU,所以决定一个系统吞吐量上限的是 CPU。增强 CPU 处理能力,可以提高系统吞吐量上限。但根据短板效应,真实的系统吞吐量并不能单纯根据 CPU 来计算。那要提高系统吞吐量,就需要从 系统短板(比如网络延迟、磁盘 IO)着手:

  • 尽量提高短板操作的并行化比率,比如多线程下载技术
  • 增强短板能力,比如用 NIO 替代 IO

第一条可以联系到 Amdahl 定律,这条定律定义了串行系统并行化后的加速比计算公式(如下),加速比越大,表明系统并行化的优化效果越好:

加速比 = 优化前系统耗时 / 优化后系统耗时

Addahl 定律还给出了系统并行度、CPU 数目和加速比的关系(如下),加速比为 Speedup,系统串行化比率(指串行执行代码所占比率)为 F,CPU 数目为 N:

Speedup <= 1 / (F + (1-F)/N)

当 N 足够大时,串行化比率 F 越小,加速比 Speedup 越大。

问答

使用线程池后,是不是就一定比使用单线程高效呢?

答案是否定的,比如 Redis 就是单线程的,但它却非常高效,基本操作都能达到十万量级 /s。从线程这个角度来看,部分原因在于多线程带来线程上下文切换开销,单线程就没有这种开销。当然 Redis 速度快的本质原因在于:Redis 基本都是内存操作,这种情况下单线程可以很高效地利用 CPU。而多线程适用场景一般是:存在相当比例的 IO 和网络操作。

所以即使有上面的估算算法,也许看似合理,但实际上也未必合理,都需要结合系统真实情况(比如是 IO 密集型或者是 CPU 密集型或者是纯内存操作)和硬件环境(CPU、内存、硬盘读写速度、网络状况等)来不断尝试达到一个符合实际的合理估算值。

估算代码

为了方便估算 Java 线程池的大小与队列长度,可以使用下述的两个 Java 类进行多次测试,这样可以得出最终的估算结果。

PoolSizeCalculator 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package threadpool;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;

/**
* A class that calculates the optimal thread pool boundaries. It takes the
* desired target utilization and the desired work queue memory consumption as
* input and retuns thread count and work queue capacity.
*
* @author Niklas Schlimm
*/
public abstract class PoolSizeCalculator {

/**
* The sample queue size to calculate the size of a single {@link Runnable}
* element.
*/
private static final int SAMPLE_QUEUE_SIZE = 1000;

/**
* Accuracy of test run. It must finish within 20ms of the testTime
* otherwise we retry the test. This could be configurable.
*/
private static final int EPSYLON = 20;

/**
* Control variable for the CPU time investigation.
*/
private volatile boolean expired;

/**
* Time (millis) of the test run in the CPU time calculation.
*/
private final long elapsed = 3000;

/**
* Calculates the boundaries of a thread pool for a given {@link Runnable}.
*
* @param targetUtilization the desired utilization of the CPUs (0 <= targetUtilization <= 1)
* @param targetQueueSizeBytes the desired maximum work queue size of the thread pool (bytes)
*/
void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {
calculateOptimalCapacity(targetQueueSizeBytes);
Runnable task = createTask();
start(task);
start(task); // warm up phase
long cputime = getCurrentThreadCPUTime();
start(task); // test interval
cputime = getCurrentThreadCPUTime() - cputime;
long waitTime = (elapsed * 1000000) - cputime;
calculateOptimalThreadCount(cputime, waitTime, targetUtilization);
}

private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {
long mem = calculateMemoryUsage();
BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem),
RoundingMode.HALF_UP);
System.out.println("Target queue memory usage (bytes): "
+ targetQueueSizeBytes);
System.out.println("createTask() produced " + createTask().getClass().getName() + " which took " + mem + " bytes in a queue");
System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);
System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);
}

/**
* Brian Goetz' optimal thread count formula, see 'Java Concurrency in
* * Practice' (chapter 8.2) *
* * @param cpu
* * cpu time consumed by considered task
* * @param wait
* * wait time of considered task
* * @param targetUtilization
* * target utilization of the system
*/
private void calculateOptimalThreadCount(long cpu, long wait,
BigDecimal targetUtilization) {
BigDecimal computeTime = new BigDecimal(cpu);
BigDecimal waitTime = new BigDecimal(wait);
BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime()
.availableProcessors());
BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization)
.multiply(new BigDecimal(1).add(waitTime.divide(computeTime,
RoundingMode.HALF_UP)));
System.out.println("Number of CPU: " + numberOfCPU);
System.out.println("Target utilization: " + targetUtilization);
System.out.println("Elapsed time (nanos): " + (elapsed * 1000000));
System.out.println("Compute time (nanos): " + cpu);
System.out.println("Wait time (nanos): " + wait);
System.out.println("Formula: " + numberOfCPU + " * "
+ targetUtilization + " * (1 + " + waitTime + " / "
+ computeTime + ")");
System.out.println("* Optimal thread count: " + optimalthreadcount);
}

/**
* * Runs the {@link Runnable} over a period defined in {@link #elapsed}.
* * Based on Heinz Kabbutz' ideas
* * (http://www.javaspecialists.eu/archive/Issue124.html).
* *
* * @param task
* * the runnable under investigation
*/
public void start(Runnable task) {
long start = 0;
int runs = 0;
do {
if (++runs > 10) {
throw new IllegalStateException("Test not accurate");
}
expired = false;
start = System.currentTimeMillis();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
expired = true;
}
}, elapsed);
while (!expired) {
task.run();
}
start = System.currentTimeMillis() - start;
timer.cancel();
} while (Math.abs(start - elapsed) > EPSYLON);
collectGarbage(3);
}

private void collectGarbage(int times) {
for (int i = 0; i < times; i++) {
System.gc();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

/**
* Calculates the memory usage of a single element in a work queue. Based on
* Heinz Kabbutz' ideas
* (http://www.javaspecialists.eu/archive/Issue029.html).
*
* @return memory usage of a single {@link Runnable} element in the thread
* pools work queue
*/
private long calculateMemoryUsage() {
BlockingQueue<Runnable> queue = createWorkQueue(SAMPLE_QUEUE_SIZE);
for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
queue.add(createTask());
}
long mem0 = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
long mem1 = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
queue = null;
collectGarbage(15);
mem0 = Runtime.getRuntime().totalMemory()
- Runtime.getRuntime().freeMemory();
queue = createWorkQueue(SAMPLE_QUEUE_SIZE);
for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
queue.add(createTask());
}
collectGarbage(15);
mem1 = Runtime.getRuntime().totalMemory()
- Runtime.getRuntime().freeMemory();
return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;
}

/**
* Create your runnable task here.
*
* @return an instance of your runnable task under investigation
*/
protected abstract Runnable createTask();

/**
* Return an instance of the queue used in the thread pool.
*
* @return queue instance
*/
protected abstract BlockingQueue<Runnable> createWorkQueue(int capacity);

/**
* Calculate current cpu time. Various frameworks may be used here,
* depending on the operating system in use. (e.g.
* http://www.hyperic.com/products/sigar). The more accurate the CPU time
* measurement, the more accurate the results for thread count boundaries.
*
* @return current cpu time of current thread
*/
protected abstract long getCurrentThreadCPUTime();

}

SimplePoolSizeCaculator 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package threadpool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class SimplePoolSizeCaculator extends PoolSizeCalculator {

@Override
protected Runnable createTask() {
return new AsyncIOTask();
}

@Override
protected BlockingQueue<Runnable> createWorkQueue(int capacity) {
return new LinkedBlockingQueue<Runnable>(capacity);
}

@Override
protected long getCurrentThreadCPUTime() {
//the total CPU time for the current thread in nanoseconds
return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
}

public static void main(String[] args) {
PoolSizeCalculator poolSizeCalculator = new SimplePoolSizeCaculator();
poolSizeCalculator.calculateBoundaries(new BigDecimal(1.0), new BigDecimal(100000));
}

}

/**
* 自定义的异步IO任务
* @author Will
*
*/
class AsyncIOTask implements Runnable {

@Override
public void run() {
HttpURLConnection connection = null;
BufferedReader reader = null;
try {
URL url = new URL("http://baidu.com");

connection = (HttpURLConnection) url.openConnection();
connection.connect();
reader = new BufferedReader(new InputStreamReader(
connection.getInputStream()));

String line;
StringBuilder stringBuilder;
while ((line = reader.readLine()) != null) {
stringBuilder = new StringBuilder();
stringBuilder.append(line);
}
}

catch (IOException e) {

} finally {
if(reader != null) {
try {
reader.close();
}
catch(Exception e) {

}
}
if (connection != null)
connection.disconnect();
}

}

}

源码剖析

PoolSizeCalculator 类

  • calculateBoundaries():计算线程池大小和队列长度,接收两个方法参数,分别是 CPU 负载和队列总内存的大小(bytes)

  • calculateMemoryUsage():计算单个任务的内存大小,计算方法如下:

1
2
3
4
5
6
1. 手动 GC
2. 计算可用内存大小 m0
3. 创建一个队列,并往里面放 1000 个任务
4. 再次 GC
5. 计算可用内存大小 m1
6. (m1 - m0) / 1000 即每个任务的大小
  • calculateOptimalCapacity():计算队列长度
    • 计算公式:队列总内存 / 单个任务的内存
    • 接收一个参数,即队列总内存的大小
  • calculateOptimalThreadCount():计算线程池大小

    • 计算公式:CPU 核数 *(1 + 线程等待时间 / 线程 CPU 时间)
  • collectGarbage():循环手动执行 GC 操作

  • start():计算执行 3 秒的任务所消耗 CPU 的实际使用时间

SimplePoolSizeCaculator 类

  • SimplePoolSizeCaculator 类PoolSizeCalculator 抽象类的一个实现,用于计算 CPU 负载 ,包括队列总内存的大小为 100k 左右的 IO 密集型的线程池大小和队列长度

  • AsyncIOTask 类:IO 密集型应用的一个简单例子

估算代码的运行结果

1
2
3
4
5
6
7
8
9
10
11
Target queue memory usage (bytes): 100000
createTask () produced threadpool.AsyncIOTask which took 40 bytes in a queue
Formula: 100000 / 40
* Recommended queue capacity (bytes): 2500
Number of CPU: 4
Target utilization: 1
Elapsed time (nanos): 3000000000
Compute time (nanos): 125000000
Wait time (nanos): 2875000000
Formula: 4 * 1 * (1 + 2875000000 / 125000000)
* Optimal thread count: 96

如果不修改队列内存大小和任务,队列长度可能都是 2500

参考资料