我是一个菜鸡开发,目前遇到一个需求想请教下各位大佬。假设需要改造以下 For 循环
for(int i = 0; i < list.size() ; i ++){
List resultList = HttpRequest.post(url).body(list.get(i));
for(int j = 0 ; j < resultList.size() ; j++){
var resultA = functionA(resultList.get(j));
var resultB = functionB(resultA);
var resultC = functionC(resultB);
}
}
其中 list 数据来源 API 接口,数据量在 100 到 1000 不等。functionA 、B 、C 都有业务逻辑( Http 请求,数据库查询等,都是需要串行执行的)。目前单线程运行比较慢,想问下有什么比较好的办法可以提高处理效率?
我打算使用多线程并行处理 list 的数据,但是里面那层 for 循环数据量也比较大(多的可能有 1 万条),里面那层不知道有没有办法也可以加快效率的?或者针对这类场景是否有比较通用的解决办法?
1
idealhs 2023-03-10 15:03:21 +08:00
没写过 java ,个人思路应该把 List resultList = HttpRequest.post(url).body(list.get(i)); 提出来,结果使用 yield return 。给下面的业务逻辑调用。然后你的 HttpRequest 应该可以换成 Async 的方法,在业务逻辑里面使用到 resultList[i]的时候, 去 await
|
2
sbex 2023-03-10 15:04:49 +08:00
这个主要还是得分析具体性能瓶颈在哪里,单从代码看目前只能想到线程池。
|
3
RyanLeeCUP 2023-03-10 15:07:59 +08:00
你可以先合并一下不可并行的操作,比如 List 可以并行处理的,如果量大就拆分成若干个批次去并行
function A B C 不能并行,所以看成一个行为,对 resultList 也可以并行化 把这个当成管道 pipe(listAction).pipe(resultAction).execute() 大概这样 |
4
LeegoYih 2023-03-10 15:09:46 +08:00
如果只需要保证 functionABC 的调用顺序可以用 Fork/Join
resultList.parallelStream() .map(o -> functionA(o)) .map(o -> functionB(o)) .map(o -> functionC(o)) .collect(Collectors.toList()); |
5
Ericality 2023-03-10 15:11:16 +08:00
function ABC 是否可以直接用多线程来执行? 如果可以的话就可以节约很多资源
或者直接用 parallelStream 来代替 for 循环处理 也可以直接并行处理 但是具体要看对数据的操作方式吧 如果没有时序要求可以考虑 还有我也是个菜鸡 如果说的不对望后面大佬指出 |
6
awalkingman 2023-03-10 15:11:41 +08:00
内层也开线程,CountDownLatch 等待归集结果。
|
7
jiajianjava 2023-03-10 15:14:18 +08:00
这个应该是生产消费模式, 多线程生产者请求数据,把数据提交个阻塞队列, 多线程消费者从队列获取数据 处理 ABC 任务
|
8
Gct012 OP @Ericality 里面的 Function ABC 目前看下来只能串行操作。外层的 For 循环我打算用 parallelStream 来遍历,但是不太确定里面的那层循环是否还能开并行流...
|
9
justNoBody 2023-03-10 15:23:36 +08:00
把 4 楼和 7 楼的答案结合一下就好了,然后记得如果使用 parallelStream 务必要自定义线程池,使用默认的线程池会导致其他任务阻塞
|
10
Gct012 OP @RyanLeeCUP 大佬你说的这个 pipe 是标准还是第三方库的额,我貌似没搜到类似的写法...
|
11
awalkingman 2023-03-10 15:27:00 +08:00 1
for(int i = 0; i < list.size() ; i ++){
List resultList = HttpRequest.post(url).body(list.get(i)); CountDownLatch latch = new CountDownLatch(resultList.size()); for(int j = 0 ; j < resultList.size() ; j++){ // 这里继续开线程 { var resultA = functionA(resultList.get(j)); var resultB = functionB(resultA); var resultC = functionC(resultB); latch.countDown(); } latch.await(); } } |
12
Gct012 OP @justNoBody 那这样的话是不是外层和里层的循环得拆成两个队列?
|
13
dqzcwxb 2023-03-10 15:28:45 +08:00
不要用 parallelStream 去做 io 操作,parallelStream 只推荐在 cpu 密集型任务时使用
你这个用 completablefuture 是很合适的 |
14
Gct012 OP @newskillsget 感谢大佬!我试试
|
15
DreamStar 2023-03-10 15:31:04 +08:00
先从业务上调整, 能整合的整合, 能合并的合并.
其次同步转异步, 事件驱动用消息队列+本地事件表,根据具体的消费能力调整并发即可. 你这个量用单进程多线程做稳定性太差,吞吐量太低,没啥可观测性. |
16
Martin9 2023-03-10 15:31:24 +08:00
以下回答来自 chatgpt, 供你参考。
针对这个场景,使用多线程并行处理 list 数据可以提高处理效率。你可以使用 Java 的线程池来实现多线程处理。Java 提供的线程池可以在多个线程之间共享一组线程,可以重复利用线程,减少线程创建和销毁的开销,从而提高效率。 在处理大量数据时,可以考虑使用分治思想,将数据分成若干份,分别交给不同的线程去处理,处理完成后再将结果合并。这样可以充分利用多核 CPU 的性能,提高并行处理的效率。 对于里面那层 for 循环的处理,你可以使用并行流来提高处理效率。Java 8 引入了 Stream API ,可以方便地进行并行处理。你可以使用 stream() 方法将结果列表转换成流,然后使用 parallel() 方法将流转换为并行流,最后使用 forEach() 方法对流进行处理。 List<List> resultLists = new ArrayList<>(); IntStream.range(0, list.size()) .parallel() .forEach(i -> { List resultList = HttpRequest.post(url).body(list.get(i)); resultLists.add(resultList); }); List results = resultLists.stream() .flatMap(List::stream) .parallel() .map(result -> { var resultA = functionA(result); var resultB = functionB(resultA); var resultC = functionC(resultB); return resultC; }) .collect(Collectors.toList()); 这里使用了 Java 8 的 Stream API ,通过并行处理来提高处理效率。第一个 forEach() 方法将结果列表转换成流,并行地处理列表中的每个元素,将结果添加到结果列表中。第二个流中的 flatMap() 方法将多个结果列表合并成一个流,然后并行地对每个结果进行处理,最后将结果收集到一个列表中。 |
17
Ericality 2023-03-10 15:35:46 +08:00
@Gct012 可以 但是好像你想要 ABC 顺序执行 那为什么不直接在外面开多线程呢
即 ThreadUtils.excutor.excute(list -> functionA(list.get(i)); functionB(resultA); functionC(resultC); ) 同时我注意到 resultList 是一个 http 请求 那是不是这个 list 请求一遍就可以了? List resultList = HttpRequest.post(url).body(list.get(i)); //下面是原来的 for 循环的代替 resultList.stream.paralla.map(currentList ->{ ThreadUtiles.excutor.excute ..... } ) 以上都是伪代码哈 只是提供一个思路 |
18
awalkingman 2023-03-10 15:38:03 +08:00
@jiajianjava 优雅一点做法确实如此。多线程开发调试比较麻烦的,可以把外层结果都提取到队列里,然后再开多个消费者消费,这样观测性高(看生产者有没有生产,看队列有没有消费和消费后的结果是否符合预期)和调试难度也比较低。
|
19
leonshaw 2023-03-10 15:48:46 +08:00
内层 10000 个开线程太多了,池满了一样阻塞。要改成异步或者试试虚线程。
|
20
luckyrayyy 2023-03-10 15:51:52 +08:00
list.parallelStrem()
.flatMap(item -> {return HttpRequest.post(url).body(item).stream();}) .parallel() .forEach(item -> { funcC(funcB(funcA(item))); }); 这样? |
21
Aluneth 2023-03-10 16:18:09 +08:00
能不能 f 将 uncABC 视为一个函数,去掉外部循环,将需要跟中间件交流的地方都改成批量执行。最终还是要看这个循环耗时的点具体在什么地方。
|
22
RyanLeeCUP 2023-03-10 16:28:59 +08:00
@Gct012 不是库 类似伪代码,担心并发量就分批,然后把串行操作抽象成一个节点,节点任务并行化,最后会成管道式的流程,最后管道本身也可以被并行化 按这个思路去设计
|
23
ldyisbest 2023-03-10 16:30:31 +08:00
List resultList = HttpRequest.post(url).body(list.get(i)); 提到最外层, 用批量了接口,思想是减少 http 请求次数,functionA,B,C 里面也这样做
|
24
levintrueno 2023-03-10 17:51:44 +08:00
public class Code {
// 执行外层任务的线程池 static ExecutorService outerExecutor = Executors.newFixedThreadPool(8); // 执行内层任务的线程池 static ExecutorService innerExecutor = Executors.newFixedThreadPool(16); // 任务总数 static AtomicInteger taskCount = new AtomicInteger(); static String url = "url"; static Random random = ThreadLocalRandom.current(); public static void optimization() { StopWatch stopWatch = new StopWatch(); stopWatch.start(); // 模拟任务 final int maxTask = random.nextInt(1000); System.out.println("外层总任务数:" + maxTask); List<String> list = IntStream.rangeClosed(1, maxTask).mapToObj(String::valueOf).collect(Collectors.toList()); // 50 个任务一组 final List<List<String>> partition = Lists.partition(list, 50); System.out.println("拆分任务数量:" + partition.size()); partition.parallelStream() .map(task -> CompletableFuture.runAsync(new OuterTask(task), outerExecutor)) .forEach(CompletableFuture::join); System.out.println("taskCount = " + taskCount); stopWatch.stop(); System.out.println("耗时:" + stopWatch.getTotalTimeSeconds()); innerExecutor.shutdown(); outerExecutor.shutdown(); } private static class OuterTask implements Runnable { private final List<String> tasks; public OuterTask(List<String> tasks) { this.tasks = tasks; } @Override public void run() { tasks.parallelStream() .map(task -> CompletableFuture.runAsync(new InnerTask(task), innerExecutor)) .forEach(CompletableFuture::join); } } private static class InnerTask implements Runnable { private final String body; public InnerTask(String body) { this.body = body; } @Override public void run() { final List<String> responseResult = HttpRequest.post(url).body(body); for (String aParam : responseResult) { final String bParam = functionA(aParam); final String cParam = functionB(bParam); final String result = functionC(cParam); // handle result... taskCount.incrementAndGet(); } } } } 考虑不周,仅作参考。。。 |
25
Dahunvwu 2023-03-10 17:58:03 +08:00
disruptor 框架了解一下,或者使用 1.8 的 CompletionService
disruptor.handleEventsWithWorkerPool(poolA) .thenHandleEventsWithWorkerPool(poolB) .thenHandleEventsWithWorkerPool(poolC) .thenHandleEventsWithWorkerPool(poolD) |
26
ymz 2023-03-10 18:04:42 +08:00
线程池,以及 resultList 需要和数据库打交道的一起提出来,切分后一条 sql 处理若干个,CPU 计算很快的,主要还是 IO
|
27
ration 2023-03-10 18:30:02 +08:00 via Android
看看瓶颈在哪里,http 请求还是数据库,添加日志看看时间多少。接着有些能合并请求的合并,多线程作为最后的手段。曾经试过下载文件的,实际上太多线程没用,网络限制在那里,提高带宽就好了。
|
28
janus77 2023-03-10 18:37:44 +08:00
你拿到 A 只是为了传入 B ?拿到 B 只是为了传入 C ?那可以把 funcABC 压缩合并简化一下
|
29
night98 2023-03-10 22:28:57 +08:00
第二行可以看看接口提供方能否提供批量接口,这样速度会快一点,然后下面的 abc 方法只能通过增加线程数的方式提升效率,或者上异步。
|
30
L0L 2023-03-10 22:47:02 +08:00 via Android
@luckyrayyy 20 楼流操作大师,并发,还不会冲突。
|
31
ychost 2023-03-10 22:55:25 +08:00
Java19 的 Loom 开一开试一试,不行的话实时 reactor
|