在学习异步和并行处理之前,由于业务原因,我同时又遇到了一个问题,
当我有很多数据要进行处理时,由于数据量过大,内存可能无法支持到这么多数据,这个时候我需要对数据
进行分组批处理,那么这个时候`RxJava`能够怎么样轻易的帮助我们完成这项工作呢

buffer

1
2
3
4
5
@Test
public void generateFlowable() {
Flowable<Integer> rangeFlowable = Flowable.range(1, 100000);
rangeFlowable.subscribe(new ConsoleSubscribe());
}

我们假设现在的一个场景,我们有大概 10w 条数需要批处理,但是如果一次性全部都在内存中处理,我们的内存可能就支撑不住,从而导致堆溢出等情况发生,那么为了缓解这一压力,我们能够想到的就是,希望能够将这 10w 条数据拆分成可能大的批次,然后进行分批次处理,我们可以用到各种方式对这批数据进行分批,那利用RxJava,我们可以如何轻松的做到呢?

1
2
3
4
5
6
@Test
public void generateFlowable() {
Flowable<Integer> rangeFlowable = Flowable.range(1, 10);
Flowable<List<Integer>> bufferFlowable = rangeFlowable.buffer(5);
bufferFlowable.subscribe(new ConsoleSubscribe());
}

可以看到就这样简单的一句(由于篇幅原因,我们将数据量改小了),我们就轻松的办到了

1
2
3
4
Thread:【main】 在 2023-04-07T14:13:38.900Z 开启注册消费!
Thread:【main】 在 2023-04-07T14:13:38.916Z 消费了 data【[1, 2, 3, 4, 5]】
Thread:【main】 在 2023-04-07T14:13:38.916Z 消费了 data【[6, 7, 8, 9, 10]】
Thread:【main】 在 2023-04-07T14:13:38.916Z 完成了消费

何以很明确的看到,确实达到了分组的效果,确实很方便。除了简单的分组以外,buffer还给我们提供了时间窗口的功能。

1
2
3
4
5
6
7
8
9
10
11
@Test
public void generateFlowable2() {
Flowable<Integer> rangeFlowable = Flowable.range(1, 10).map(data -> {
TimeUnit.MILLISECONDS.sleep(200);
return data;
}).subscribeOn(Schedulers.io());
Flowable<List<Integer>> bufferFlowable = rangeFlowable.buffer(500, TimeUnit.MILLISECONDS);
bufferFlowable.subscribe(new ConsoleSubscribe());
while (true) {
}
}

同时我们得到了由时间窗口分批的结果

1
2
3
4
5
6
7
8
Thread:【main】 在 2023-04-07T14:27:06.453Z 开启注册消费!
Thread:【RxComputationThreadPool-1】 在 2023-04-07T14:27:06.976Z 消费了 data【[1, 2]】
Thread:【RxComputationThreadPool-1】 在 2023-04-07T14:27:07.478Z 消费了 data【[3, 4, 5]】
Thread:【RxComputationThreadPool-1】 在 2023-04-07T14:27:07.974Z 消费了 data【[6, 7]】
Thread:【RxComputationThreadPool-1】 在 2023-04-07T14:27:08.480Z 消费了 data【[8, 9]】
Thread:【RxCachedThreadScheduler-1】 在 2023-04-07T14:27:08.528Z 消费了 data【[10]】
Thread:【RxCachedThreadScheduler-1】 在 2023-04-07T14:27:08.528Z 完成了消费

所以这里我们可以看到,针对于一些大流量或者大批量的数据需要进行分批处理的时候,我们可以分方便的适用RxJavabuffer功能,当然除了执行分批大小和时间窗口以外,他们还能随意组合,以及更多高级功能可以适用,这个后面我们在使用到的时候在进行说明吧。

相关代码地址:
100daysCode