5-6-RxJava

文章目录
  1. 1. 添加
  2. 2. 背景(方便异步并发)
  3. 3. 关于RxJava
  4. 4. 概念
    1. 4.1. RxJava与观察者模式(为什么)
  5. 5. RxJava编程函数
    1. 5.1. 操作符例子
  6. 6. 总结
  7. 7. 案例1
  8. 8. 案例2

添加

https://www.jianshu.com/p/584252c1e410

背景(方便异步并发)

微服务流行后,在我们项目开发过程中,一个服务经常会调用N个微服务,调用每个微服务可能需要几百毫秒,试想,一个复杂的业务如果要调用上百的微服务,如果各个服务同步执行,可能就需要花费好几秒,试想:这些服务为什么不能并行运行呢?

一个复杂的计算任务,为什么不能分解成更小的任务单位,让他们并行运行呢?

本文通过以上两个业务场景,比较各个实现方案的差异,在讲解之前,我们先来了解下本文提到的RxJava

关于RxJava

RxJavaReactive ExtensionsJava实现,通过使用Obserable/Flowable序列来构建异步和基于事件的程序的库,RxJava实现和扩展了观察者模式。

RxJava基于响应式编程,是一种面向数据流和变化传播的编程范式。传统编程方式代码都是顺序执行的,而响应式编程是基于异步编程的,借助于CPU多核能力,提高运行效率,降低延迟和阻塞,基于数据流模型,如一个函数可作用与数据流中的每项,可变化传播。在响应式编程中,函数成为其第一等公民,同原型类型一样,函数可作用与参数,也可作为返回值。

RxJava基于函数式编程,传统面向对象是通过抽象出对象关系来解决问题,函数式编程是通过函数的组合来解决问题。

响应式编程?是一种基于异步数据流概念的编程模式

关键概念:事件

适用场景:UI(通用)

概念

  • Observable:被订阅者,比如在安卓开发中,可能是某个数据源,数据源的变化要通知到UI,那么UI就是Observer,被订阅者有冷热之分,热Observable无论有没有订阅者订阅,事件流始终发送,而冷Observable则只有订阅者订阅事件流才开始发送数据,它们之间是可以通过API相互转化的,比如使用publish可以冷->热,RefCount可以热->冷;
  • Observer:订阅者;

RxJava与观察者模式(为什么)

RxJava是 ReactiveX在 Java上的开源的实现,简单概括,它就是一个实现异步操作的库,使用时最直观的感受就是在使用一个观察者模式的框架来完成我们的业务需求; 其实java已经有了现成的观察者模式实现:java.util.Observable和java.util.Observer,那么为何还要RxJava呢?

java.util.Observable是典型的观察者模式实现,而RxJava主要功能如下: 1. 生产者加工数据,然后发布給观察者; 2. 观察者处理数据; 3. 从生产者生产数据到观察者处理数据,这之间传递的数据可以被处理; 4. 线程切换,生产者发布数据和观察者处理数据可以在指定线程中处理;

RxJava还有个特点就是支持链式编码,再配合lambda,可以保持简洁和清晰的逻辑(注意是逻辑简洁,代码是否简洁只能取决于实际业务);

看得出,除了实现观察者模式,RxJava还提供了更丰富的能力。

观察者模式的四大要素:

  • Observable:被观察者
  • Observer:观察者
  • Subscribe:订阅
  • Event:事件

image-20210814165845134

RxJava编程函数

image-20210814170018420

  • 被订阅者:用的做多的是Observable,如果要支持背压则使用Flowable,还可以使用Single(只要OnSuccess和onError,没有onComplete),Completable(创建后不发射任何数据,只有onComplete和onError)和Maybe(只发送0或1个数据);
  • 生命周期监听:Observable创建后可使用doXXX监听你说需要的生命周期回调;
  • 流的创建:create(使用一个函数从头创建),just(指定值创建,最多10个),fromXXX(基于X类创建),repeat(特定数据重复N次创建),defer(直到有订阅者订阅时才创建),interval(每隔一段时间创建一个数据发送),timer(延迟一段时间后发送数据);
  • RxJava线程模型: 内置多个线程控制器,包括single(定长为1的线程池),newThread(启动新线程执行),computation(大小为CPU核数线程池,一般用于密集型计算),io(适用IO操作),trampoline(直接在当前线程运行)和Schedulers.from(自定义);
  • 变化操作符:map(数据转型),flatMap(数据转某个Observable后合并发送),scan(每个数据应用一个函数,然后按顺序发送),groupBy(按Key分组拆分成多个Observable),buffer(打包发送),window,cast(强制转换类型);
  • 过滤操作:filter(按条件过滤),takeLast(只发送最后N个数据),last(只发送最后一个数据),lastOrDefault(只发送最后一个数据,为Null发送默认值),takeLastBuffer(将最后N个数据当做单个数据发送),skip(跳过N个发送),skipLast(跳过最后N个),take(只发送开始的N个数据),first,takeFirst(只发送满足条件的第一个数据),elementAt(只发送第N个数据),timeout(指定事件内没发送数据,就发送异常),distinct(去重),ofType(只发送特定类型的数据),ignoreElements(丢失所有正常数据,只发送错误或完成通知),sample(一段时间内,只处理最后一个数据),throttleFirst(一段时间内,只处理第一个数据),debounce(发送一个数据,开始计时,到了规定时间没有再发送数据,则开始处理数据);
  • 条件操作和布尔操作符:all(发送的数据是否都满足条件),contains(发送的数据是否包含某数据),amb(多个被订阅者数据发送只发送首次被订阅的那个数据流),defaultIfEmpty(如果原始被订阅者没有值,则发送一个默认值),sequenceEquals(判定两个数据流是否一样,返回true或false),skipUtil(直到符合条件才发送),skipWhile(直到条件不符合才开始发送),takeUntil(满足条件后不发送)和takeWhile(条件满足的一直发送);
  • 合并和连接操作符:merge(将多个被订阅数据流合并),zip(将多个数据流结合发送,返回数据流的数据个数是最少的那个),combineLastest(类似zip,任意被订阅者开始发送数据时即发送,而zip要每个被订阅者开始发送数据才发送),join(两个被订阅者结合合并,总数据项是M*N项),startWith(在数据序列开头插入指定项),connect,灵活控制发送数据规则可使用push,refCount,replay(保证所有订阅者收到相同数据);
  • 背压:被订阅者发送数据过快以至于订阅者来不及处理的情况;

操作符例子

https://blog.csdn.net/jiduochou963/article/details/115044605

总结

对于复杂计算,你可以将计算任务分解成N个子计算任务,交给多个线程处理并将结果合并后取得最终结果,对于服务业务的调用,你应该清楚,哪些子任务可以并行运行,哪些需要顺序执行,使用RxJava在代码上可能更加直观,也可以使用JDK8的CompletableFuture,其实JDK8的很多API参考了RxJava的实现,两者在写法上非常的类似,响应式编程相比传统代码的顺序执行在思路上有很大的不同,理解上也有一定的难度,希望通过本文让您全面了解函数式编程的实现思路。

案例1

从一段最简单的服务开始:该服务需调用3个微服务,每个微服务费时250ms,三个微服务都获取数据后返回给前端(该微服务三个服务分别是商品详情,商品评论和推荐商品列表),如果按顺序执行,那么代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
long c = System.currentTimeMillis();
System.out.println("顺序执行:");
System.out.println(service("商品详情微服务")+service("商品评论微服务")+service("推荐商品微服务"));
spendTime(c);
}
//模拟某个服务
private static String service(String srvName){
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
return srvName+"\r\n";
}
private static void spendTime(long preTime) {
System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
}

这段代码毫无疑问,打印输出:

花费:781 毫秒

改造一下,使用JDK8的CompletableFuture,3个微服务独立线程运行,都完成后通知主线程打印,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws Exception {
final long cc = System.currentTimeMillis();
CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品详情微服务"));
CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品评论微服务"));
CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推荐商品微服务"));
s1.thenCombine(s2, (i,j)->{
return i+j;
}).thenCombine(s3, (i,j)->{
System.out.println("使用JDK8的并行编程:");
System.out.println(i+j);
spendTime(cc);
return i+j;
});
}

以上代码的执行结果取决于3个微服务中最长时间的那个服务,相比原先速度有明显提高:

花费:311 毫秒

那么以上的代码使用RxJava怎么来写呢?我们可以flatMap将服务分拆到各自独立线程中去执行,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static String[] ss = {"商品详情微服务","商品评论微服务","推荐商品微服务"};
public static void main(String[] args) throws Exception {
Observable.range(0,3)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer t) throws Exception {
return Observable.just(t)
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
return service(ss[t]);
}
});
}
})
.reduce((s1,s2)->s1+s2)
.subscribe(s -> {
System.out.println("Observable:\r\n" + s);
spendTime(cc2);
});
}

花费:455 毫秒

案例2

RxJava模拟的针对每个数据项的并发操作调用时间上要比直接使用JDK8的API慢得多

第二个业务场景是将复杂的计算进行拆分子计算任务,然后将每个任务计算合并成最终计算结果,以下直接给出所有源码,我们来看看几种计算方式在耗时上的不同,复杂计算任务是:对1到210000000开根号求总和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.sumslack.rxjava;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class TestComputer {
private static final int MAX_I = 210000000;

private static void spendTime(long preTime) {
System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
}

private static void spendTime(long preTime,String str) {
System.out.println("[" + str + "] 花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
}
private static ExecutorService eService = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception{

int[] ss = new int[MAX_I];
for(int i=1;i<=MAX_I;i++) {
ss[i-1] = i;
}


long c = System.currentTimeMillis();
System.out.println(xx(0,MAX_I));
spendTime(c,"顺序执行");

final long cc5 = System.currentTimeMillis();
Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return Math.sqrt(t);
}
}).reduce((i,j)->i+j)
.subscribeOn(Schedulers.computation())
.subscribe(s -> {
spendTime(cc5,"Observable直接算");
});
final long cc = System.currentTimeMillis();
CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
return xx(0,MAX_I/2);
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
return xx(MAX_I/2,MAX_I);
});
cf1.thenCombine(cf2, (i,j)->{
System.out.println(""+(i+j));
spendTime(cc,"CompletableFuture");
return i+j;
});

//也可以用:CompletableFuture.allOf(cf1,cf2).join();
c = System.currentTimeMillis();
Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
System.out.println(dd);
spendTime(cc,"stream");

c = System.currentTimeMillis();
Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
System.out.println(dd2);
spendTime(cc,"parallel stream");

final long cc2 = System.currentTimeMillis();
Observable.fromArray(0,1,2)
.flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
@Override
public ObservableSource<Double> apply(Integer t) throws Exception {
if(t%3==0) {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(0,MAX_I/3);
}
});
}else if(t%3==1) {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(MAX_I/3,MAX_I*2/3);
}
});
}else {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(MAX_I*2/3,MAX_I);
}
});
}
}
})
.reduce(new BiFunction<Double, Double, Double>() {
@Override
public Double apply(Double t1, Double t2) throws Exception {
return t1+t2;
}
})
.subscribe( s->{
System.out.println(s);
spendTime(cc2,"Observable");
});
Thread.sleep(100000);
}

private static double xx(int start,int end) {
double sum = 1;
for(int i=start;i<end;i++) {
sum += Math.sqrt(i+1);
}
return sum;
}
}

以下是费时结果:

[顺序执行] 花费:1086 毫秒
[CompletableFuture] 花费:537 毫秒
[stream] 花费:1028 毫秒
[parallel stream] 花费:1305 毫秒
[Observable] 花费:461 毫秒
[Observable直接算] 花费:4265 毫秒

这里使用 RxJava 进行计算任务分解求和是最快的,因为JDK8并发编程我们分解的是两个计算任务,而RxJava分解成3个所致!