经过前面的学习,我们已经了解到一些`RxJava`的使用方式,今天我们在这里借助一个小小的案例来综合使用
一下我们学到的知识,首先这里我们会使用到`Vert.x`的`RxJava`实现
案例
我们需要创建一个 Http 请求工具,能够满足我们的一些特殊的处理方式
准备
依赖
这里涉及到Http
请求工具所以我们借用一下Vert.x
的RxJava
实现帮助我们简化我们的实现细节。
MAVEN
1 2 3 4 5 6 7 8 9 10 11 12 13
| <dependency> <groupId>io.reactivex.rxjava3</groupId> <artifactId>rxjava</artifactId> </dependency>
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web-client</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-rx-java</artifactId> </dependency>
|
接口
Server.java 模拟单个的请求访问
1 2 3 4 5
| @FunctionalInterface public interface Server {
Single<?> service(WebClient client); }
|
JClient.java 对请求进行封装
1 2 3 4 5 6 7 8 9
| public class JClient { private final WebClient client;
public JClient() { Vertx vertx = Vertx.vertx(); this.client = WebClient.create(vertx); } ... }
|
实现:并行访问
首先我们需要了解到,我们是需要将多个请求访问并行执行,并且每个请求都会得到相应的返回值,那么我们很轻易的想到,我们有两个可以使用的创建方式merge
和zip
。所以我们可以很简单的就想到我们的方法声明应该是:
1 2
| public Single<List<?>> multiDoServer(Server... servers);
|
这里所用到`Single`是另一种`RxJava`的创建方式,使用场景和`Flowable`不一样但是,大体的思路一样地,
而且我们用到的`WebClient`使用的就是这个,所以我们先暂时用这个创建方式来示意。
所以我们能够很轻松的想到,接下来我们需要将每一个server
转化成对应的RxJava
能够聚合的实体即可,所以我们的完整实现如下:
1 2 3 4 5
| public Single<List<?>> multiDoServer(Server... servers) { List<? extends Single<?>> list = Arrays.stream(servers).map(server -> server.service(client)) .collect(Collectors.toList()); return Single.zip(list, Arrays::asList); }
|
我们会使用一个OPEN Api来帮助我们模拟网络请求
1 2 3 4 5 6 7
| public static Server makeService(Integer time) { return client -> client.getAbs("http://httpstat.us/200?sleep=" + time) .rxSend() .doOnSuccess(data -> System.out.println(" " + time + " is finish at" + new Date())) .map(data -> data.bodyAsString() + " with " + time + " at" + new Date()); }
|
那么接下来我们通过一个简单的测试,来试一下我们创建的并行请求具体使用情况
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void test() throws InterruptedException { JClient client = new JClient(); long start = System.currentTimeMillis(); Single<List<?>> server = client .multiDoServer(makeService(1000), makeService(2000), makeService(1000), makeService(1000), makeService(3000)); Subscription subscribe = server.subscribe(); while (!subscribe.isUnsubscribed()) {
} System.out.println("Used:" + (System.currentTimeMillis() - start) + " ms"); }
|
我们得到以下结果:
1 2 3 4 5 6
| 1000 is finish atMon Apr 10 22:31:13 CST 2023 1000 is finish atMon Apr 10 22:31:13 CST 2023 1000 is finish atMon Apr 10 22:31:13 CST 2023 2000 is finish atMon Apr 10 22:31:14 CST 2023 3000 is finish atMon Apr 10 22:31:15 CST 2023 Used:4613 ms
|
我们可以看到,它并不是按照我们添加 servise 的顺序取请求的,而且总共消耗了 4.6s,小于五个请求的延迟总和 8s,,其中有一些时间消耗属于网络I/O
和Vert.x
内部的调度,所以总和来看,我们确实简单的实现了一个并行请求的 Client,可能还有很多需要改进的地方。
相关代码地址:
100daysCode
JClient