Java 21 虚拟线程:彻底改变高并发编程

Java 21 虚拟线程:彻底改变高并发编程

Java 21 虚拟线程:彻底改变高并发编程

引言:并发编程的新时代

如果你还在为 Java 线程池调优而烦恼,为 N+1 并发问题而头疼,那么 Java 21 的虚拟线程(Virtual Threads)将是你等待已久的解决方案!

2023 年 9 月,Java 21 正式发布,虚拟线程从 preview 变成了正式特性。这标志着 Java 并发编程进入了一个全新的时代。

今天这篇教程将带你深入理解虚拟线程,掌握这个改变游戏规则的强大工具。

第一章:为什么需要虚拟线程?

1.1 传统线程的困境

在虚拟线程出现之前,Java 使用平台线程(Platform Threads)进行并发编程:

“`java
// ❌ 传统线程:1 个请求 = 1 个平台线程
@WebServlet(“/api/users”)
public class UserServlet extends HttpServlet {
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
// 每个请求都占用一个平台线程
User user = userService.findById(req.getParameter(“id”));
// 线程在 I/O 等待期间处于阻塞状态,但占用线程池资源
resp.getWriter().println(user);
}
}


问题很明显:
  • 1 个请求占用 1 个平台线程
  • 线程创建和切换开销大
  • 线程池大小受限(通常几百个)
  • I/O 等待期间线程被阻塞
  • 高并发下线程池耗尽

1.2 虚拟线程的优势

java
// ✅ 虚拟线程:1 个请求 = 1 个虚拟线程
@WebServlet(“/api/users”)
public class UserServlet extends HttpServlet {
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
try (var scope = new ThreadScope()) {
// 每个请求都使用虚拟线程
User user = userService.findById(req.getParameter(“id”));
// I/O 等待时虚拟线程不会阻塞 carrier 线程
resp.getWriter().println(user);
}
}
}


虚拟线程的优势:
  • 1 个虚拟线程几乎零开销
  • 可创建数百万个虚拟线程
  • I/O 阻塞时自动切换
  • 不需要复杂的异步编程
  • 保留同步代码的简洁性

第二章:虚拟线程的核心原理

2.1 平台线程 vs 虚拟线程

特性 平台线程 虚拟线程
内核支持 操作系统内核级 JVM 用户态
创建开销 大(MB 栈空间) 小(KB 栈空间)
最大数量 ~几千个 数百万个
阻塞行为 阻塞内核线程 自由调度
内存占用 高(每个线程 1MB) 低(约 1KB)
适用场景 CPU 密集型 I/O 密集型

2.2 虚拟线程的工作原理

┌─────────────────────────────────────────────────┐
│ Carrier Thread │
│ (Platform Thread) │
└─────────────────┬───────────────────────────────┘

┌─────────────┼─────────────┬─────────────┐
│ │ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│Virtual│ │Virtual│ │Virtual│ │Virtual│
│Thread │ │Thread │ │Thread │ │Thread │
│ #1 │ │ #2 │ │ #3 │ │ #4 │
└────┬──┘ └────┬──┘ └────┬──┘ └────┬──┘
│ │ │ │
│ (阻塞时) │ (运行中) │ (阻塞时) │ (运行中)
│ │ │ │
└────────────┴────────────┴────────────┘
动态调度


关键机制:
  1. 用户态调度:由 JVM 在用户态调度虚拟线程
  2. 弹性栈:栈空间按需分配和回收
  3. 阻塞优化:阻塞时自动卸载到 carrier 线程
  4. 直接映射:虚拟线程直接映射到操作系统线程
  5. 2.3 虚拟线程的调度模型

java
// 虚拟线程调度流程图
// 1. 虚拟线程创建 -> 挂载到 carrier 线程
// 2. 执行代码 -> 在 carrier 线程上运行
// 3. I/O 阻塞 -> 虚拟线程休眠,carrier 线程继续运行其他虚拟线程
// 4. I/O 完成 -> 虚拟线程重新挂载到 carrier 线程

// 关键:一个 carrier 线程可以运行多个虚拟线程


第三章:虚拟线程的基础使用

3.1 创建虚拟线程

java
// 方法一:使用 Thread.ofVirtual()
Thread virtualThread = Thread.ofVirtual().start(() -> {
System.out.println(“虚拟线程运行中”);
});

// 方法二:使用 Executors.newVirtualThreadPerTaskExecutor()
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
System.out.println(“虚拟线程任务执行”);
});
}

// 方法三:使用 Thread.startVirtualThread()
Thread.startVirtualThread(() -> {
System.out.println(“使用 startVirtualThread 创建”);
});


3.2 虚拟线程池

java
// 每个任务一个虚拟线程
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();

// 使用示例
List> futures = new ArrayList<>();
for (int i = 0; i < 1000; i++) { futures.add(virtualExecutor.submit(() -> {
// 每个任务运行在独立的虚拟线程上
return “任务-” + Thread.currentThread().getName();
}));
}

// 获取结果
for (Future future : futures) {
System.out.println(future.get());
}


3.3 CompletableFuture 与虚拟线程

java
// CompletableFuture 默认使用 ForkJoinPool
// 使用虚拟线程增强

// 方式一:指定虚拟线程执行器
Executor virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();

CompletableFuture future = CompletableFuture
.supplyAsync(() -> {
return fetchDataFromApi();
}, virtualExecutor)
.thenApply(data -> processData(data))
.thenAccept(result -> System.out.println(result));

// 方式二:使用虚拟线程进行阻塞式操作
String result = CompletableFuture
.supplyAsync(() -> fetchDataFromApi(), virtualExecutor)
.get(); // 使用虚拟线程等待


第四章:实际应用场景

4.1 Web 服务器应用

java
// 使用虚拟线程重构 Web 服务器
public class VirtualThreadWebServer {

private final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();

public void handleRequest(HttpServletRequest request,
HttpServletResponse response) {
// 每个请求使用虚拟线程
CompletableFuture.runAsync(() -> {
try {
// 阻塞式代码,但不会阻塞服务器线程
User user = userRepository.findById(request.getParameter(“id”));
ApiResponse response = new ApiResponse(user);
response.getWriter().write(response.toJson());
} catch (Exception e) {
e.printStackTrace();
}
}, executor);
}
}


4.2 异步 I/O 操作

java
// 高并发 I/O 操作场景
public class HighConcurrencyIoService {

private final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();

public List processMultipleRequests(List ids) {
List> futures = new ArrayList<>();

for (String id : ids) {
futures.add(CompletableFuture.supplyAsync(() -> {
// 每个请求在独立的虚拟线程中执行
return fetchFromDatabase(id);
}, executor));
}

// 等待所有请求完成
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.join();
}
}


4.3 微服务调用

java
// 批量调用微服务
public class MicroServiceOrchestrator {

private final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();

public ServiceResponse orchestrateCalls(List serviceUrls) {
List> futures = new ArrayList<>();

for (String url : serviceUrls) {
futures.add(CompletableFuture.supplyAsync(() -> {
return callService(url);
}, executor));
}

// 等待所有服务调用完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();

return combineResponses(futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
}


第五章:性能对比与 Benchmark

5.1 线程创建性能对比

java
import java.util.concurrent.*;

public class ThreadBenchmark {

public static void main(String[] args) throws Exception {
// 平台线程创建测试
long start1 = System.nanoTime();
ExecutorService platformExecutor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 10000; i++) { platformExecutor.submit(() -> {});
}
long platformTime = System.nanoTime() – start1;

// 虚拟线程创建测试
long start2 = System.nanoTime();
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 10000; i++) { virtualExecutor.submit(() -> {});
}
long virtualTime = System.nanoTime() – start2;

System.out.println(“平台线程: ” + platformTime / 1_000_000 + ” ms”);
System.out.println(“虚拟线程:” + virtualTime / 1_000_000 + ” ms”);

// 结果示例:
// 平台线程: 1500 ms
// 虚拟线程: 200 ms
// 虚拟线程快 7.5 倍
}
}


5.2 并发处理性能对比

java
public class ConcurrencyBenchmark {

private static final int REQUEST_COUNT = 10000;

public static void main(String[] args) throws Exception {
// 使用线程池处理请求
long start1 = System.nanoTime();
processWithThreadPool(REQUEST_COUNT);
long poolTime = System.nanoTime() – start1;

// 使用虚拟线程处理请求
long start2 = System.nanoTime();
processWithVirtualThreads(REQUEST_COUNT);
long virtualTime = System.nanoTime() – start2;

System.out.println(“线程池:” + (poolTime / 1_000_000) + ” ms”);
System.out.println(“虚拟线程:” + (virtualTime / 1_000_000) + ” ms”);

// 结果示例:
// 线程池:8500 ms (线程池大小为 100)
// 虚拟线程:2100 ms (可创建 10000 个虚拟线程)
// 虚拟线程快 4 倍
}

private static void processWithThreadPool(int count) {
ExecutorService executor = Executors.newFixedThreadPool(100);
CountDownLatch latch = new CountDownLatch(count);

for (int i = 0; i < count; i++) { executor.submit(() -> {
try {
Thread.sleep(10); // 模拟 I/O 操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}

try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

executor.shutdown();
}

private static void processWithVirtualThreads(int count) {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
CountDownLatch latch = new CountDownLatch(count);

for (int i = 0; i < count; i++) { executor.submit(() -> {
try {
Thread.sleep(10); // 模拟 I/O 操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}

try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

executor.shutdown();
}
}


5.3 内存使用对比

java
public class MemoryBenchmark {

public static void main(String[] args) {
Runtime runtime = Runtime.getRuntime();

// 平台线程内存占用
System.out.println(“内存使用前:” + runtime.totalMemory() / 1024 / 1024 + ” MB”);

ExecutorService platformExecutor = Executors.newFixedThreadPool(1000);
for (int i = 0; i < 1000; i++) { platformExecutor.submit(() -> {});
}
System.out.println(“平台线程 (1000 个):” + runtime.totalMemory() / 1024 / 1024 + ” MB”);
platformExecutor.shutdown();

// 虚拟线程内存占用
System.out.println(“\n内存重置后:” + runtime.totalMemory() / 1024 / 1024 + ” MB”);

ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 1000; i++) { virtualExecutor.submit(() -> {});
}
System.out.println(“虚拟线程 (1000 个):” + runtime.totalMemory() / 1024 / 1024 + ” MB”);
virtualExecutor.shutdown();

// 结果示例:
// 平台线程 (1000 个):内存增长约 1000 MB
// 虚拟线程 (1000 个):内存增长约 10 MB
// 虚拟线程节省 99% 内存
}
}


第六章:最佳实践与注意事项

6.1 何时使用虚拟线程

✅ 推荐使用:
  • I/O 密集型应用(Web 服务器、API 网关)
  • 高并发场景(大量并发请求)
  • 简化并发代码(希望使用同步代码风格)
  • 微服务编排(批量调用多个服务)
❌ 不推荐使用:
  • CPU 密集型计算
  • 需要长期运行且计算密集的任务
  • 对线程 ID 有严格要求的场景

6.2 性能优化建议

java
// 优化 1:合理配置虚拟线程池大小
public class OptimizedVirtualExecutor {

// 虚拟线程没有太多需要优化的,但可以限制最大数量
private final ExecutorService executor =
new ThreadPoolExecutor(
0, // 核心线程数(虚拟线程不占用)
10000, // 最大线程数
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
Executors.newVirtualThreadPerTaskExecutor(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

// 优化 2:避免嵌套虚拟线程
public class NestedVirtualThreads {

// ❌ 错误:嵌套创建虚拟线程
public void badApproach() {
Thread.startVirtualThread(() -> {
// 再次创建虚拟线程
Thread.startVirtualThread(() -> {
// 这是不必要的
});
});
}

// ✅ 正确:使用单一的虚拟线程池
public void goodApproach() {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
executor.submit(() -> {
// 在这里可以提交其他任务
CompletableFuture.runAsync(() -> {
// 使用同一个 executor
}, executor);
});
}
}


6.3 调试与监控

java
// 监控虚拟线程
public class VirtualThreadMonitor {

public void printVirtualThreads() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadBean.getThreadInfo(
threadBean.getAllThreadIds()
);

long virtualCount = Arrays.stream(threadInfos)
.filter(ThreadInfo::isVirtual)
.count();

long platformCount = Arrays.stream(threadInfos)
.filter(info -> !info.isVirtual())
.count();

System.out.println(“虚拟线程数:” + virtualCount);
System.out.println(“平台线程数:” + platformCount);
}
}


6.4 常见陷阱

java
// 陷阱 1:虚拟线程不能用于长期阻塞
public class LongRunningBlocking {

// ❌ 错误:虚拟线程用于长期阻塞
public void badApproach() {
Thread.startVirtualThread(() -> {
// 长期阻塞会浪费 carrier 线程
while (true) {
Thread.sleep(1000);
}
});
}

// ✅ 正确:使用平台线程处理长期阻塞
public void goodApproach() {
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> {
// 长期阻塞任务使用平台线程
while (true) {
Thread.sleep(1000);
}
});
}
}

// 陷阱 2:不要过度依赖虚拟线程
public class OveruseVirtualThreads {

// ❌ 过度使用虚拟线程
public void overuse() {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 1000000; i++) { executor.submit(() -> {
// 简单计算,使用虚拟线程浪费资源
return i * i;
});
}
}

// ✅ 合理使用虚拟线程
public void properUse() {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 1000; i++) { executor.submit(() -> {
// I/O 操作使用虚拟线程
return fetchDataFromApi();
});
}
}
}


第七章:进阶应用

7.1 响应式编程增强

java
// 虚拟线程 + Reactive Streams
public class ReactiveWithVirtualThreads {

private final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();

public Flux processFlux(Flux source) {
return source.publishOn(Schedulers.fromExecutorService(executor))
.flatMap(data -> {
// 在虚拟线程中执行阻塞操作
return Mono.fromCallable(() -> process(data))
.subscribeOn(Schedulers.fromExecutorService(executor));
})
.publishOn(Schedulers.boundedElastic());
}
}


7.2 数据库连接池优化

java
// 虚拟线程 + 数据库连接池
public class VirtualThreadDatabaseService {

private final DataSource dataSource;
private final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();

// 虚拟线程可以充分利用连接池
public List fetchUsers(List ids) {
List> futures = new ArrayList<>();

for (Long id : ids) {
futures.add(CompletableFuture.supplyAsync(() -> {
// 在虚拟线程中阻塞等待数据库连接
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
“SELECT * FROM users WHERE id = ?”)) {
stmt.setLong(1, id);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
return mapToUser(rs);
}
return null;
}
}, executor));
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.join();
}
}
“`

总结:虚拟线程是革命性的改变

Java 21 虚拟线程彻底改变了 Java 并发编程的模式:

  1. 简单易用:保留同步代码的简洁性
  2. 高性能:可创建数百万个虚拟线程
  3. 低开销:几乎零资源占用
  4. 向后兼容:与现有代码完全兼容
  5. 使用建议:

    • I/O 密集型应用优先使用虚拟线程
    • 避免在 CPU 密集型场景使用
    • 合理使用线程池,避免过度创建
    • 持续监控线程使用情况

    现在,是时候用虚拟线程重构你的 Java 应用了!🚀

    参考资源

    • [Java 21 Virtual Threads](https://openjdk.org/projects/jdk/21/)
    • [Virtual Threads Specification](https://openjdk.org/jeps/444)
    • [Java Concurrency in Practice](https://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601)
    • [Virtual Threads FAQ](https://openjdk.org/groups/virtual-threads/faq.html)

标签

发表评论