在 Java 中,并行流 (parallelStream) 和 CompletableFuture 都是处理并发和异步编程的工具,但它们在使用场景和适用性上有一些区别。下面是一些指导原则,可以帮助你在选择使用并行流或 CompletableFuture 时做出决策:

使用场景:

1. 并行流 (parallelStream)
  • 适用场景:

    • 需要对集合进行并行处理。
    • 操作是独立的,无需等待其他元素的结果。
    • 数据量较大,适合并行分割和处理。
  • 优点:

    • 语法简洁,易于使用。
    • 不需要显式的线程管理。

parallelStream 是 Java 8 引入的一个功能,允许对集合进行并行处理。这是传统顺序 stream 的并行版本。以下是一个简要的解释:

  • 用法:

    List<T> myList = // 一些列表或集合
    myList.parallelStream().forEach(element -> {
        // 要在每个元素上并行执行的代码
    });
    
  • 解释:

    • parallelStream() 将顺序流转换为并行流。
    • 流的元素由多个线程并发处理,这有可能提高在多核处理器上的性能。
    • 然而,重要的是要注意,并非所有操作都适合并行处理,这取决于任务的性质。
2. CompletableFuture
  • 适用场景:

    • 需要更细粒度的控制,例如异步任务之间的依赖关系。
    • 需要等待多个异步任务全部完成或任一完成。
    • 需要处理异常和结果。
  • 优点:

    • 提供了更灵活的异步编程能力。
    • 可以使用回调、组合和链式调用等方式。

CompletableFuture.runAsync 是 Java 8 引入的 CompletableFuture API 的一部分,提供了进行异步编程的一种方式。以下是它的工作原理:

  • 用法:

    // 1、使用ForkJoinPool.commonPool()作为它的线程池执行异步代码
    
    System.out.println("当前调用者线程为:" + Thread.currentThread().getName());
    CompletableFuture.runAsync(() -> {
         // 异步方法内当前执行线程为:ForkJoinPool.commonPool-worker-1
         System.out.println("异步方法内当前执行线程为:" + Thread.currentThread().getName());
          System.out.println("---------------------");
    });
    
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 异步执行的代码
    });
    
    //2、使用指定的线程池执行异步代码。此异步方法无法返回值。
    
    System.out.println("当前调用者线程为:" + Thread.currentThread().getName());   
    // fixme 根据阿里规约 建议真实开发时使用 ThreadPoolExecutor 定义线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(10);
            
    CompletableFuture.runAsync(() -> {
         // 异步方法内当前执行线程为:pool-1-thread-1
        System.out.println("异步方法内当前执行线程为:" + Thread.currentThread().getName());
        System.out.println(111);
    }, threadPool);
            
    // 演示代码,执行完后关闭线程池
    threadPool.shutdown();
  • 解释:

    • runAsync 是一个工厂方法,创建一个新的 CompletableFuture 并在单独的线程中运行指定的 Runnable
    • 它返回一个 CompletableFuture<Void>,因为 Runnable 不产生结果。
    • 你可以附加回调或组合多个 CompletableFuture 实例以链接异步操作。

具体指导原则:

并行流parallelStream 与 CompletableFuture的选择

计算密集型操作:推荐使用 parallelStream:

  • 原因

    • 计算密集型操作可以充分利用并行流的简单实现和高效性能。
    • 并行流的底层实现会自动分配工作给可用的处理器核心,提高整体计算速度。

含有I/O操作:推荐使用 CompletableFuture:

  • 原因

    • CompletableFuture 提供更灵活的异步编程能力,特别适用于等待I/O的操作。
    • 可以根据等待和计算的比率,灵活地控制线程数,避免不必要的线程创建。
    • 在涉及到等待I/O的操作时,使用 CompletableFuture 会更加灵活,可以避免并行流的延迟特性造成的难以判断的情况。
    • 通过 CompletableFuture.supplyAsync 创建异步任务,然后使用 CompletableFuture.allOf 等待它们全部完成。
  • 示例
List<CompletableFuture<Integer>> futures = myList.stream()
   .map(element -> CompletableFuture.supplyAsync(() -> performIoOperation(element)))
   .collect(Collectors.toList());

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join();

List<Integer> result = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
1. 简单的并行处理:
  • 如果任务可以独立并行处理:

    • 选择并行流 (parallelStream):
      • 示例:对集合中的元素执行相同的操作,互不影响。
    myList.parallelStream().forEach(element -> { 
        // 并行处理的操作
    });
2. 复杂的异步任务:
  • 如果任务之间有依赖关系,需要更细粒度的控制:

    • 选择 CompletableFuture:
      • 示例:一个异步任务的结果依赖于另一个异步任务的完成。
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
         // 异步任务1 
    }).thenRun(() -> {
         // 异步任务2,依赖于任务1的完成 
    });
3. 等待多个任务完成:
  • 如果需要等待多个任务全部完成或任一完成:

    • 选择 CompletableFuture:
      • 示例:等待多个异步任务的全部完成或任一完成。
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        // 异步任务1
    });
    
    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        // 异步任务2
    });
    
    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
    combinedFuture.join(); // 等待全部完成
    
4. 异常处理:
  • 如果需要更丰富的异常处理:

    • 选择 CompletableFuture:
      • 示例:对每个异步任务进行异常处理。
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 异步任务
    }).exceptionally(throwable -> {
        // 异常处理
        return null; // 或提供默认值
    });
    
5. 性能考虑:
  • 如果是对集合进行简单操作,数据量较大:

    • 选择并行流 (parallelStream):
      • 示例:对集合进行简单的映射、过滤等操作。
    myList.parallelStream().map(element -> {
        // 简单操作
        return transformedElement;
    });

总结:

  • 简单的并行处理和集合操作:

    • 选择并行流 (parallelStream)
  • 复杂的异步任务、依赖关系、等待多个任务完成:

    • 选择 CompletableFuture

在实际应用中,通常你会根据具体的需求和场景选择合适的工具。有时候也可以结合使用这两者以发挥各自的优势。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐