2018.9.10 响应式一些内容的分享总结

2018.9.10 响应式一些内容的分享总结

响应式到底是什么?

现实生活中,当我们听到有人喊我们的时候,我们会对其进行响应,也就是说,我们是基于事件驱动模式来进行的编程。
所以这个过程其实就是对于所产生事件的下发,我们的消费者对其进行的一系列的消费。
从这个角度,我们可以思考,整个代码的设计我们应该是针对于消费者来讲的,比如看电影,有些画面我们不想看,那就闭上眼睛,有些声音不想听,那就捂上耳朵,说白了,就是对于消费者的增强包装,我们将这些复杂的逻辑给其拆分,然后分割成一个个的小任务进行封装,于是就有了诸如filter、map、skip、limit等操作。而对于其中源码的设计逻辑,我们放在后面来讲。

并发与并行的关系

可以这么说,并发很好的利用了CPU时间片的特性,也就是操作系统选择并运行一个任务,接着在下一个时间片会运行另一个任务,并把前一个任务设置成等待状态。
其实这里想表达的是并发并不意味着并行
具体来举几个情况:

  • 有时候多线程执行会提高应用程序的性能,而有时候反而会降低程序的性能。这在关于JDK中其Stream API的使用上体现的很明显,如果任务量很小,而我们又使用了并行流,反而降低了性能。

  • 我们在多线程编程中可能会同时开启或者关闭多个线程,这会产生的很多性能开销,这也降低了程序性能。

  • 当我们的线程同时都在等待IO过程,此时并发也就可能会阻塞CPU资源,其造成的后果不仅仅是用户在等待结果,同时会浪费CPU的计算资源。

  • 如果几个线程共享了一个数据,情况就变得有些复杂了,我们需要考虑数据在各个线程中状态的一致性。为了达到这个目的,我们很可能会使用Synchronized或者是lock来解决。

    现在,应该对并发有一定的认知了吧。并发是一个很好的东西,但并不一定会实现并行。并行是在多个CPU核心上的同一时间运行多个任务或者一个任务分为多块执行(如ForkJoin)。单核CPU的话就不要考虑了。
    补充一点,实际上多线程就意味着并发,但是并行只发生在当这些线程在同一时间调度分配在不同CPU上执行。也就是说,并行是并发的一种特定的形式。往往我们一个任务里会产生很多元素,然而这些个元素在不做操作的情况下大都只能在当前线程中操作,要么我们就要对其进行ForkJoin,但这些对于我们很多程序员来讲有时候很不好操作控制,上手难度有些高,响应式的话,我们可以简单的通过其调度API就可以轻松做到事件元素的下发分配,其内部将每个元素包装成一个任务提交到线程池中,我们可以根据是否是计算型任务还是IO类型的任务来选择相应的线程池。
    这里,需要强调一下:线程只是一个对象而已,不要把其想象成cpu中的某一个执行核心,这是很多人都在犯的错,cpu时间片切换执行这些个线程。

响应式中的背压到底是一种怎样的理解

用一个不算很恰当的中国的成语来讲,就是承上启下。为了更好的解释,我们来看一个场景,大坝,在洪水时期,下游没有办法一下子消耗那么多水,大坝在此的作用就是拦截洪水,并根据下游的消耗情况酌情排放。再者,父亲的背,我们小时候,社会上很多的事情首先由父亲用自己的背来帮我们来扛起,然后根据我们自身的能力来适当的下发给我们压力,也就是说,背压应该写在连接元素生产者和消费者的一个地方,即生产者和消费者的连线者。然后,通过这里的描述,背压应该具有承载元素的能力,也就是其必须是一个容器的,而且元素的存储与下发应该具有先后的,那么使用队列则是最适合不过了。

如何去看Rxjava或者Reactor的源码,根据源码的接口的设计我们可以得到一些什么启示

关于响应式的Rx标准已经写入了JDK中:java.util.concurrent.Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@FunctionalInterface
public static interface Publisher<T> {

public void subscribe(Subscriber<? super T> subscriber);
}


public static interface Subscriber<T> {

public void onSubscribe(Subscription subscription);


public void onNext(T item);


public void onError(Throwable throwable);


public void onComplete();
}


public static interface Subscription {

public void request(long n);


public void cancel();
}


public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

可以看到,Flow这个类中包含了这4个接口定义,Publisher 通过subscribe方法来和Subscriber产生订阅关系,而Subscriber依靠onSubscribe来首先和上游产生联系,这里就是靠Subscription来做到的,所以说,Subscription往往会作为生产者的内部类定义其中,其用来接收生产者所生产的元素,支持背压的话,Subscription应该首先将其放入到一个队列中,然后根据请求数量来调用SubscriberonNext等方法进行下发。这个在Rx编程中都是统一的模式,我们通过Reactor中reactor.core.publisher.Flux#fromArray所涉及的FluxArray的源码来对此段内容进行理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
final class FluxArray<T> extends Flux<T> implements Fuseable, Scannable {

final T[] array;

@SafeVarargs
public FluxArray(T... array) {
this.array = Objects.requireNonNull(array, "array");
}

@SuppressWarnings("unchecked")
public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
if (array.length == 0) {
Operators.complete(s);
return;
}
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
}
else {
s.onSubscribe(new ArraySubscription<>(s, array));
}
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
subscribe(actual, array);
}


@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.BUFFERED) return array.length;
return null;
}

static final class ArraySubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {

final CoreSubscriber<? super T> actual;

final T[] array;

int index;

volatile boolean cancelled;

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ArraySubscription> REQUESTED =
AtomicLongFieldUpdater.newUpdater(ArraySubscription.class, "requested");

ArraySubscription(CoreSubscriber<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}

@Override
public void request(long n) {
if (Operators.validate(n)) {
if (Operators.addCap(REQUESTED, this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastPath();
}
else {
slowPath(n);
}
}
}
}

void slowPath(long n) {
final T[] a = array;
final int len = a.length;
final Subscriber<? super T> s = actual;

int i = index;
int e = 0;

for (; ; ) {
if (cancelled) {
return;
}

while (i != len && e != n) {
T t = a[i];

if (t == null) {
s.onError(new NullPointerException("The " + i + "th array element was null"));
return;
}

s.onNext(t);

if (cancelled) {
return;
}

i++;
e++;
}

if (i == len) {
s.onComplete();
return;
}

n = requested;

if (n == e) {
index = i;
n = REQUESTED.addAndGet(this, -e);
if (n == 0) {
return;
}
e = 0;
}
}
}

void fastPath() {...}

}

static final class ArrayConditionalSubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {
....
}

}

我们可以看到之前文字在源码内部的表达。这里就不多说了。而对于各种中间操作的包装我们该如何去做,依据之前的接口定义,我们应该更注重功能的设定,而无论是filter,flatmap,map等这些常用的操作,其实都是消费动作,理应定义在消费者层面,想到这里,我们该如何去做?
这里,我们就要结合我们的设计模式,装饰模式,对subscribe(Subscriber<? super T> subscriber)所传入的Subscriber进行功能增强,即从Subscriber这个角度来讲,使用的是装饰增强模式,但从外面来看,其整体定义的依然是一个Flux或者Mono,这里FluxArray的话就是例子,这样,从这个角度来讲,其属于向上适配,也就是适配模式,这里的适配玩的比较有意思,完全就是靠对内部类的包装然后通过subscribe(Subscriber<? super T> subscriber)衔接来完成的。

所以,我们应该想到中国古代苏轼的题西林壁里有一句话:横看成岭侧成峰 远近高低各不同讲的就是从不同的角度去看待一个事物,就会得到不同的结果。同样,一百个人心中有一百个哈姆雷特,也是对于同一个事物的看法,从这里,我们应该能学到设计模式千万不要特别刻意的去绝对化!

我们可以结合reactor.core.publisher.Flux#filter涉及的FluxFilter来观察理解上述涉及的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
final class FluxFilter<T> extends FluxOperator<T, T> {

final Predicate<? super T> predicate;

FluxFilter(Flux<? extends T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = Objects.requireNonNull(predicate, "predicate");
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T> actual) {
if (actual instanceof ConditionalSubscriber) {
source.subscribe(new FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual,
predicate));
return;
}
source.subscribe(new FilterSubscriber<>(actual, predicate));
}

static final class FilterSubscriber<T>
implements InnerOperator<T, T>,
Fuseable.ConditionalSubscriber<T> {

final CoreSubscriber<? super T> actual;

final Predicate<? super T> predicate;

Subscription s;

boolean done;

FilterSubscriber(CoreSubscriber<? super T> actual, Predicate<? super T> predicate) {
this.actual = actual;
this.predicate = predicate;
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}

boolean b;

try {
b = predicate.test(t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return;
}
if (b) {
actual.onNext(t);
}
else {
s.request(1);
}
}

@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return false;
}

boolean b;

try {
b = predicate.test(t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return false;
}
if (b) {
actual.onNext(t);
}
return b;
}

@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.TERMINATED) return done;

return InnerOperator.super.scanUnsafe(key);
}

@Override
public CoreSubscriber<? super T> actual() {
return actual;
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
}
}

static final class FilterConditionalSubscriber<T>
implements InnerOperator<T, T>,
Fuseable.ConditionalSubscriber<T> {

...
}

}

根据这些设计,我们自己也是完全可以作为参考来通过一套api接口设计,可以衍生出很多规范逻辑的开发,比如我们看到的众多的Rx衍生操作API的设计实现,其都是按照一套模板来进行的,我们可以称之为代码层面的微服务设计。

如何去看待众多函数表达式

人类最擅长描述场景,比如一套动作,假如是舞蹈的话,可以讲是什么什么编舞,但是这个编舞又要在一定的框架之下,即有一定的规范,同样,我们施展一套拳法,也需要一个规范,不能踢一脚也叫拳法。而对于这个规范的实现,那就是一套动作,对于拳法来讲,可能就是一个很简单的左勾拳或者右勾拳,也可以是比较复杂的咏春拳,太极拳等,而且一套拳法可能有很多小套路组成,这些小套路也是遵循着这个规范进行的,那么依据这个思路,我们来看下面的函数式接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@FunctionalInterface
public interface Predicate<T> {
boolean test(T t);
default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}
default Predicate<T> negate() {
return (t) -> !test(t);
}
default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}
static <T> Predicate<T> isEqual(Object targetRef) {
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
}

@FunctionalInterface
public interface BiConsumer<T, U> {

void accept(T t, U u);


default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
Objects.requireNonNull(after);

return (l, r) -> {
accept(l, r);
after.accept(l, r);
};
}
}

可以看到无论是条件判断表达式Predicate还是无返回值动作处理函数BiConsumer都遵循一个标准动作的设计定义思路,并通过default方法来对同类动作进行编排,以达到更加丰富的效果。所以,函数式的应用更加倾向于干净利落,凸显自己要做的事情就好,未来,我会在自己的Java编程方法论- JDK篇中花大量篇幅来解读函数式编程的各种奇特而实用的使用方法,来降低我们复杂接口的设计逻辑难度,做到知名见义,了然于胸的效果。这个在我的Java编程方法论- Reactor与Spring webflux篇中也是有涉及的。

关于响应式的使用性能的考究

响应式编程知识一种模式,用的好与坏全看自己对于api的理解程度,不要想着会多么的降低性能,这个并没有进行什么过度包装这一说的,当讲到jdbc这里如何表现不行的时候,当前并没有一个开源的Reactor-jdbc的框架,也就造成的测试的不合理性,何况新的知识是需要大家一起共同来学习推动的,不好的地方我们推动就好,不需要上来就对其进行否定,mongodb有提供相应的响应式api,但其内部还是之前的方式,同样,关系型数据库也是一个道理,响应式编程注重的是中间过程的处理,关于生产元素的获取它没太多关系,更多的还是看元素生产者的性能,一家之言,可能有偏颇,希望理解,有问题提出就好。

您的支持将鼓励我继续创作!