虚拟线程是一种轻量级线程,可大大减少编写、维护和观察高吞吐量并发应用程序的工作量。从JDK19开始发布了虚拟线程的预览功能,直到JDK21最终确定虚拟线程。
虚拟线程既廉价(相比平台线程)又可以创建非常的多,因此绝不应池化:每个应用任务都应创建一个新的虚拟线程。因此,大多数虚拟线程的寿命都很短,调用堆栈也很浅,只需执行一次 HTTP 客户端调用或一次 JDBC 查询。相比之下,平台线程重量级、成本高,因此通常必须池化。这些线程的寿命往往较长,具有较深的调用堆栈,可在多个任务之间共享。
总之,虚拟线程保留了可靠的每请求线程风格,这种风格与 Java 平台的设计相协调,同时还能优化利用可用硬件。使用虚拟线程不需要学习新的概念,但可能需要放弃为应对当前线程的高成本而养成的习惯。虚拟线程不仅能帮助应用程序开发人员,还能帮助框架设计人员提供易于使用的 API,这些 API 与平台设计兼容,同时又不影响可扩展性。
虚拟线程是 java.lang.Thread 的一个实例,它在底层操作系统线程上运行 Java 代码,但在代码的整个生命周期中不会捕获操作系统线程。这意味着许多虚拟线程可以在同一个操作系统线程上运行 Java 代码,从而有效地共享操作系统线程。平台线程会垄断宝贵的操作系统线程,而虚拟线程不会。虚拟线程的数量可能远远大于操作系统线程的数量。
虚拟线程是线程的一种轻量级实现,由 JDK 而不是操作系统提供。它们是用户模式线程的一种形式,在其他多线程语言(如 Go 中的 goroutines(协程(轻量级线程)) 和 Erlang 中的进程)中取得了成功。用户模式线程在 Java 早期版本中甚至被称为 "绿色线程",当时操作系统线程尚未成熟和普及。然而,Java 的绿色线程都共享一个操作系统线程(M:1 调度),最终被作为操作系统线程包装器(1:1 调度)实现的平台线程所超越。虚拟线程采用 M:N 调度,即大量(M)虚拟线程被安排运行在较少数量(N)的操作系统线程上。
虚拟线程是 java.lang.Thread 的一个实例,与特定操作系统线程无关。相比之下,平台线程是以传统方式实现的 java.lang.Thread 实例,是操作系统线程的薄包装。
通常服务器应用程序处理相互独立的并发请求时,在请求的整个持续声明周期内为该请求指定一个线程来处理该请求。这种按请求线程的风格易于理解、易于编程、易于调试和配置。
对于一个请求处理的处理时间,应用程序同时处理的请求数(即并发数)必须与吞吐量成比例增长。例如,假设一个平均延迟为 50 毫秒的请求并发处理 10 个请求,实现了每秒 200 个请求的吞吐量。若要将该应用的吞吐量提高到到每秒 2000 个请求,则需要并发处理 100 个请求。如果每个请求在请求持续时间内都由一个线程处理,那么要使应用程序跟上进度,线程数必须随着吞吐量的增加而增加。
由于 JDK 将线程作为操作系统(OS)线程的包装器来实现。操作系统线程的成本很高,所以我们不能拥有太多的线程,这就使得线程的实现不适合按请求执行的方式。如果每个请求在其生命周期内都要使用一个线程,也就是一个操作系统线程,那么在 CPU 或网络连接等其他资源耗尽之前,线程数量往往就已经成为限制因素了。JDK 当前的线程实现将应用程序的吞吐量限制在远低于硬件支持的水平。即使对线程进行了池化,也会出现这种情况,因为池化有助于避免启动新线程的高昂成本,但不会增加线程总数。
// 创建一个执行器,为每个任务启动一个新的虚拟线程try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { IntStream.range(0, 10_000).forEach(i -> { executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return i; }); });}
本例中的任务是简单的代码--休眠1秒--现代硬件可以轻松支持 10,000 个虚拟线程同时运行此类代码。而实际上,JDK 只在少量操作系统线程(可能只有一个)上运行此代码代码。
如果该程序使用 ExecutorService(例如 Executors.newCachedThreadPool())为每个任务创建一个新的平台线程,情况就会截然不同。ExecutorService 会尝试创建 10,000 个平台线程,从而创建 10,000 个操作系统线程,根据机器和操作系统的不同,程序可能会崩溃。
即便使用Executors.newFixedThreadPool(200)创建固定数量的线程,情况也不会好到哪里去。ExecutorService 将创建 200 个平台线程,供所有 10,000 个任务共享,因此许多任务将顺序运行而非并发运行,程序将需要很长时间才能完成。对于该程序而言,拥有 200 个平台线程的池每秒只能完成 200 个任务,而虚拟线程每秒可完成约 10,000 个任务(经过充分预热后)。此外,如果将示例程序中的 10_000 改为 1_000_000,那么程序将提交 1,000,000 个任务,创建 1,000,000 个虚拟线程并发运行,(充分预热后)吞吐量将达到每秒约 1,000,000 个任务。
注意:如果程序中的任务在一秒钟内执行计算(例如对一个巨大的数组进行排序),而不仅仅是休眠,那么增加线程数超过处理器内核数将无济于事,无论它们是虚拟线程还是平台线程。虚拟线程不是更快的线程--它们运行代码的速度并不比平台线程快。它们的存在是为了提供规模(更高的吞吐量),而不是速度(更低的延迟)。虚拟线程的数量可能比平台线程多得多,因此根据利特尔定律,虚拟线程可以提供更高吞吐量所需的更高并发性。
手动创建虚拟线程
// 创建虚拟线程OfVirtual virtual = Thread.ofVirtual().name("pack") ;virtual.start(() -> { System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;// 创建不自动启动的线程Thread thread = virtual.unstarted(() -> { System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;// 手动启动虚拟线程thread.start() ; // 打印线程对象:VirtualThread[#21,pack]/runnableSystem.out.println(thread) ;// 创建普通线程OfPlatform platform = Thread.ofPlatform().name("pack") ;Thread thread = platform.start(() -> { System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;// 这里输出:Thread[#21,pack,5,main]System.out.println(thread) ;
在上面的代码中,打印thread输出的不是对应的平台线程,而是虚拟线程
VirtualThread[#21,pack]/runnable
在执行的任务中通过Thread.currentThread().getName()方法是没有任何信息,我们可以通过上面的name()方法来设置线程的名称及相关的前缀。如下:
Thread.ofPlatform().name("pack") ;Thread.ofVirtual().name("pack", 0) ;
通过ThreadFactory工厂创建
ThreadFactory threadFactory = Thread.ofVirtual().factory() ;threadFactory.newThread(() -> { System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}).start() ;
直接通过Thread静态方法
Thread.startVirtualThread(() -> { System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ;}) ;
使用虚拟线程
public class Demo06 { static class Task implements Runnable { @Override public void run() { System.err.printf("start - %d%n", System.currentTimeMillis()) ; try { Thread.sleep(Duration.ofSeconds(1)); } catch (InterruptedException e) {} System.err.printf(" end - %d%n", System.currentTimeMillis()) ; } } public static void main(String[] args) throws Exception { ExecutorService es= Executors.newVirtualThreadPerTaskExecutor() ; es.submit(new Task()) ; es.submit(new Task()) ; es.submit(new Task()) ; System.in.read() ; }}
输出结果:
start - 1698827467289start - 1698827467289start - 1698827467291 end - 1698827468317 end - 1698827468317 end - 1698827468317
从结果看出,基本是同时开始,结束也是基本一起结束,总耗时1s。
使用传统线程
任务都一样,只是创建线程池的类型修改
public static void main(String[] args) throws Exception { ExecutorService es= Executors.newFixedThreadPool(1) ; es.submit(new Task()) ; es.submit(new Task()) ; es.submit(new Task()) ;}
输出结果:
start - 1698827686133 end - 1698827687165start - 1698827687165 end - 1698827688177start - 1698827688177 end - 1698827689178
从结果知道这里是一个任务一个任务的执行串行化,但是你注意观察,其实每个任务的的开始start 的输出都是要等前一个线程执行完了后才能执行。结合上面的虚拟线程对比,start是同时输出的,这也是虚拟线程的有点了。
这是一个远程接口调用的示例:
远程3个接口,如下:
@GetMapping("/userinfo")public Object queryUserInfo() { try { TimeUnit.SECONDS.sleep(2) ; } catch (InterruptedException e) {e.printStackTrace();} return "查询用户信息" ;}@GetMapping("/stock")public Object queryStock() { try { TimeUnit.SECONDS.sleep(3) ; } catch (InterruptedException e) {e.printStackTrace();} return "查询库存信息" ;}@GetMapping("/order")public Object queryOrder() { try { TimeUnit.SECONDS.sleep(4) ; } catch (InterruptedException e) {e.printStackTrace();} return "查询订单信息" ;}
接口调用服务,如下:
@Resourceprivate RestTemplate restTemplate ;public Map<String, Object> rpc() { try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { var start = System.currentTimeMillis() ; // 1.查询用户信息 var userinfo = executor.submit(() -> query("http://localhost:8080/demos/userinfo")); // 2.查询库存信息 var stock = executor.submit(() -> query("http://localhost:8080/demos/stock")); // 3.查询订单信息 var order = executor.submit(() -> query("http://localhost:8080/demos/order")); Map<String, Object> res = Map.of("userinfo", userinfo.get(), "stock", stock.get(), "order", order.get()) ; System.out.printf("总计耗时:%d毫秒%n", (System.currentTimeMillis() - start)) ; return res ; } catch (Exception e) { return Map.of() ; }}private Object query(String url) { return this.restTemplate.getForObject(url, String.class) ;}
在这个案例中,如果使用传统的线程池,如果并发量大,那么很可能很多的任务都要排队等待,或者你需要创建更多的平台线程来满足吞吐量问题。但是现在有了虚拟线程你可以不用再考虑线程不够用的情况了,每个任务的执行都会被一个虚拟的线程执行(不是平台线程,可能这些虚拟线程只会对应到一个平台线程)。
虚拟线程可在以下情况显著提高应用吞吐量:
结构化并发目前还是预览功能,并没有在JDK21中正式发布,不过我们可以先来看看什么是结构化并发。
结构化并发 API 是来简化并发编程。结构化并发将在不同线程中运行的一组相关任务视为一个工作单元,从而简化了错误处理和取消,提高了可靠性,并增强了可观察性。
结构化并发的目标是:
通过示例来理解结构化并发。
如下示例是通过传统线程池的方式并发的从远程获取信息,代码如下:
static RestTemplate restTemplate = new RestTemplate() ;public static void main(String[] args) throws Exception { ExecutorService es = Executors.newFixedThreadPool(2) ; Future<Object> userInfo = es.submit(UnstructuredConcurrentDemo::queryUserInfo) ; Future<Object> stock = es.submit(UnstructuredConcurrentDemo::queryStock) ; Object userInfoRet = userInfo.get() ; System.out.printf("执行结果:用户信息:%s%n", userInfoRet.toString()) ; Object stockRet = stock.get() ; System.out.printf("执行结果:库存信息:%s%n", stockRet.toString()) ;}public static Object queryUserInfo() { return restTemplate.getForObject("http://localhost:8080/demos/userinfo", String.class) ;}public static Object queryStock() { return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;}
上面的代码中没有什么问题,程序都能够运行的正常,结果如下:
08:49:53.502 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK08:49:53.504 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"执行结果:用户信息:查询用户信息08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"执行结果:库存信息:查询库存信息
但是如果其中一个任务执行失败了后会如何呢?将其中一个任务抛出异常,如下代码:
public static Object queryStock() { System.out.println(1 / 0) ; return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;}
再次执行代码,结果如下:
发生异常:java.lang.ArithmeticException: / by zero09:06:05.938 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock09:06:05.948 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]09:06:08.972 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK09:06:08.974 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"执行结果:库存信息:查询库存信息
从结果看出,获取用户信息子任务发生异常后,并不会影响到获取库存子任务的执行。
通过结构化并发方式
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Supplier<Object> userInfo = scope.fork(UnstructuredConcurrentDemo::queryUserInfo) ; Supplier<Object> stock = scope.fork(UnstructuredConcurrentDemo::queryStock) ; // 等待在此任务范围内启动的所有子任务完成或某个子任务失败。 scope.join() ; Object userInfoRet = userInfo.get() ; System.out.printf("执行结果:用户信息:%s%n", userInfoRet.toString()) ; Object stockRet = stock.get() ; System.out.printf("执行结果:库存信息:%s%n", stockRet.toString()) ;}
当一个子任务发生错误时,其它的子任务会在未完成的情况下取消,执行结果如下:
08:59:51.951 [] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock08:59:51.961 [] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]Exception in thread "main" java.lang.IllegalStateException: Subtask not completed or did not complete successfully at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:936) at com.pack.rpc.UnstructuredConcurrentDemo.structured(UnstructuredConcurrentDemo.java:26) at com.pack.rpc.UnstructuredConcurrentDemo.main(UnstructuredConcurrentDemo.java:17)
从控制台的输出看出,获取库存的调用被取消了。
完毕!!!
本文链接:http://www.28at.com/showinfo-26-16937-0.html【技术革命】JDK21虚拟线程来袭,让系统的吞吐量翻倍!
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 超越基础:Flutter 中 onTap 事件的 5 条规则让你脱颖而出
下一篇: Sed 原地替换文件时遇到的趣事