假设一个场景:对外提供一个 http 接口 A,A 接口内部实际上又是调用 B C D 三个微服务接口,B C D 之间没有依赖关系,可以同时执行,最后将结果组合起来返回。
我们正常一般都是顺序同步执行,假设每个服务用时 1 秒,那么至少需要 3 秒才能返回结果,假如 BCD 并行执行的话,理论上只要 1 秒就能返回结果了。
在 Java 里只想到了用多线程来做这样的优化,可以用 callable 这种可以获取异步返回结果的类。
在公司内网上看到有人说可以用 RxJava 来做这样的优化,但是花了些时间发现好像还不如上面的方法,也可能是自己对 RxJava 不熟悉,如果有熟悉 RxJava 的不吝赐教。还是用这里的例子来说明,在 RxJava 里 B C D 服务需要各封装成一个 observable,然后和 observer 用异步线程模式来关联。那么 BCD 会同时开始执行,但是我发现没有已经封装好的工具来阻塞主线程等大家一起执行完返回结果,callable 好歹还有个 get ()方法,不过也不好用就是了。最好还是借助 CountDownLatch 类的工具来协助。 不知道 RxJava 在服务端应用不多的原因是不是因为这个,实在是没有发现什么特别好的地方,唯一我觉得好用的可能就是在一个 observerable 里 onnext 链式调用,对于有层层依赖的情况可能比较好用,还有一些操作符之类的。
另外在知乎上也看到一篇同样主题的文章。
希望大家来点建议。
1
ysweics 2018-10-22 16:24:56 +08:00
同有这种疑惑,坐等大神
|
2
DeadLion OP 贴一个 Callable demo 出来
``` @RequestMapping("/testWhitCallable") public String testWhitCallable() throws ExecutionException, InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); ExecutorService es = Executors.newCachedThreadPool(); Future fb = es.submit(() -> { Object result = b.methodB(); countDownLatch.countDown(); return result; }); Future fa = es.submit(() -> { Object result = a.methodA(); countDownLatch.countDown(); return result; }); countDownLatch.await(); return fa.get().toString() + fb.get().toString(); } ``` |
3
sagaxu 2018-10-22 16:38:55 +08:00 via Android
可以用 future 来做,每个 http 请求一个 future,再聚合这 3 个 future 得到一个用来等待的 future。http 请求改成异步的,不要让它阻塞你的线程。
|
4
lhx2008 2018-10-22 16:39:18 +08:00 via Android
有一个 zip 方法,我用的是 reactor3 的,把各个请求任务发到不同线程上面,然后 zip 一起,再 onNext,reactor3 有一个 block 方法转回同步。rxjava 不太清楚。
或者 vert.x 的 future 也有一个类似的 ,叫 CompositeFuture,底层可以看到是 Countdown latch,所以其实 jdk 自带就是 countdownlatch |
5
lhx2008 2018-10-22 16:46:20 +08:00 via Android
reacto3 demo,手机打的,意思一下
Mono.zip(Mono.fromCallable(), Mono.fromCallable(),Mono.fromCallable()) .doOnNext(xxx) .doOnError(xxx) .block |
6
lhx2008 2018-10-22 16:48:05 +08:00 via Android
忘了发线程了
Mono.zip(Mono.fromCallable().subscribeOn(线程池), Mono.fromCallable().subsribeOn(),Mono.fromCallable().subscribeOn) .doOnNext(xxx) .doOnError(xxx) .block |
7
xcstream 2018-10-22 16:48:36 +08:00
java 有协程么
没有的话本质上就是多线程了 |
8
ffeii 2018-10-22 16:49:21 +08:00 via iPhone
Spring webflux 里有 Flux.zip 方法
https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/zip.png |
9
jinhan13789991 2018-10-22 16:50:10 +08:00
android 开发一枚,我们一般用 combineLatest 来做一些表单验证操作。没有涉及到太多多线程。
你可以看一下 ~ https://blog.csdn.net/johnny901114/article/details/61191723 |
11
hpeng 2018-10-22 16:57:42 +08:00
|
12
zhangwugui 2018-10-22 17:08:01 +08:00
对 RxJava 同样不太熟悉,楼上倒是也说了这个 CompletableFuture,这个是 JDK8 基于 Future 异步处理的增强,这个倒是可以用在这里。
|
13
DeadLion OP 搜到一篇 https://medium.com/@nithinmallya4/processing-streaming-data-with-spring-webflux-ed0fc68a14de
里面提到了很多方案,不过好像大家都说的差不多了 1.Java 5 提供的 Futures 2.Java 8 提供的 CompletableFutures 3.第三方库 RxJava 4.Spring 5 提供的 Reactive Streams,实际就是上面提到的 Reactor 实现的。 5.Spring 5 中的 Webflux,不过我看上面说只能基于 http 或者 websockets 请求 ,不过我们说的不仅限于 http 形式的微服务,还有 rpc 也算。 @ysweics @lhx2008 @ffeii |
14
janus77 2018-10-22 17:10:20 +08:00
zip 操作符即可
|
15
lhx2008 2018-10-22 17:14:02 +08:00 via Android 1
@DeadLion webflux 就是 reactor,只是 webflux 提供了一个配套的异步网络 API,如果是 RPC 的,Reactor 支持接入别的异步或者 fromCallable
|
17
wengang285 2018-10-22 17:37:49 +08:00
promise/future
|
18
yaoliyc 2018-10-22 19:05:07 +08:00
jdk7 提供了 fork-join
|
19
v3exhost 2018-10-24 11:27:30 +08:00
```java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); ``` |