RxJava学习笔记

RxJava 简介

RxJava 是一个为 Java 实现的响应式编程框架,我们可以用它来编写异步/事件驱动的程序。

RxJava 可以看作是一个扩展了的 Observer 模式的实现,其扩展主要体现在如下几个方面:

  • 支持事件序列,多次发射(RxJava 使用 emit 这个术语)
  • 通过操作符来修改/过滤事件序列
  • 封装了底层的异步逻辑和相关数据结构,使得编写异步程序更加容易

Observer 模式

响应式编程的基础建立在最基础的 Observer 模式之上,Observer 模式的类图如下:

Observer 模式

Observer 模式提供了一种事件传播的机制,即事件的关注者(即 Observer)不再主动去询问事件源(即 Subject)某种事件是否发生,而是在 Subject 上维护一个 Observer 的列表,当某种事件发生时,由 Subject 主动通知 Observer 去响应事件。这种结构非常适用于异步处理程序、GUI等事件驱动程序等,例如在 MVC 模式中,View 充当 Observer 角色,Model 充当 Subject 角色,当 Model 变更时(产生事件),View 响应事件,这样 Model 和 View 能专心于自己职责内的逻辑,避免产生耦合。

Publish/Subscribe 模式

Publish/Subscribe 模式(以下简称 P/S 模式)是 Observer 模式的一种变体,相对于 Observer 模式来说,其结构更加松散,Publisher 和 Subscriber 之间耦合度更低。

Publish/Subscribe 模式

Publisher 和 Subscriber 之间通过消息或者事件(Event)进行通信,Publisher 不关心消息发送给哪些 Subscriber,Subscriber 也不关心消息由哪些 Publisher 发出。

ReactiveX

RxJava 的全称是 ReactiveX Java,X 表示对 Reactive 的扩展,最初从微软的 LINQ 扩展而来,其融合了 Observer 模式,Iterator 模式和函数式编程(Functional Programming)。Reactive 这个词中文翻译成响应式,从编程风格分类上来说,Reactive 属于 Declarative(声明式)编程风格的一种,与 Declarative 相对的是 Imperative(命令式)编程风格。这里不讨论声明式编程和命令式编程两种风格的定义,具体定义可以在这里这里查阅到,我们这里只探讨响应式编程。

响应式编程,关注的是数据流,以及数据变化的传播。举个例子:

浏览器发出 GET 请求,服务器收到请求后返回数据,浏览器渲染页面。其中,发出 GET 请求之前,浏览器需要渲染出“loading…”

在这个例子中,如果使用命令式编程风格,这个例子可以实现成如下伪代码形式:

1
2
3
4
5
1. 渲染出 “loading...”
2. 发送 HTTP GET
3. <等待回应>/<做其他事>
4. 接收到 HTTP 回应
5. 渲染数据

可以看到,命令式编程风格里我们首先关注的是控制流,然后才是控制流里的每一步如何实现,如果使用响应式编程,实现则是如下这种方式:

1
2
3
4
5
6
7
8
9
1. 发送 HTTP GET {
before: {
渲染出 “loading...”
},
onSuccess: {
渲染数据
}
}
2. <做其他事>

在响应式编程的世界里,我们关注的是数据流中某些节点我们如何响应变化,我们并不太关心这些变化何时发生。使用响应式编程,异步程序的编写会方便很多。

说完了 Reactive,我们再来看看那个 X 是什么意思,还是刚刚的例子,如果我们期望浏览器能提供渲染到浏览器窗口以外能记录一些日志的功能,简单考虑,可以在每一次渲染之后加上一行代码:记录日志,但这样就又回到了命令式编程的世界了,实际上日志的记录和浏览器渲染完全没有关心,回顾一下开闭原则,我们应该对已有的逻辑进行补充而不是修改来实现额外的功能。

这个时候,这个 X 的功能就体现出来了,我们可以按如下方式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1. 发送 HTTP GET {
before: {
渲染出 “loading...”
},
onSuccess: {
渲染数据
}
}.map {
before: {
记录日志
},
onSuccess: {
记录日志
}
}
2. <做其他事>

可以看到,我们在发送 HTTP GET 命令之后增加了一个 map 操作,在 RxJava 里,我们称之为操作符(在 Functional Programming 里称之为 高阶函数),这样,控制流还是那个控制流,只不过在数据流的变化过程中我们增加了功能,并且和原有的功能完全解耦,这就是 Rx 的威力。

ReactiveX 基于 Observer 模式,其中有两个重要的角色:Subscriber 和 Observable,Subscriber 在 Observer 基础上增加了 unsubscribe 方法,用于取消订阅。刚刚提到,ReactiveX 关心的是数据流,除了能处理单个数据的 emit 之外,其还支持数据序列,即数据流的处理:

ReactiveX 模型

在一个数据序列上,Observable 会依次 emit 出其中的数据项,而 Subscriber 则通过 onNext 方法『响应』这些数据,若所有数据最终都被正确地 emit 出来,Subscriber 的 onCompleted 方法则会被调用,而一旦有错误发生,Subscriber 的 onError 方法则会被调用,并且所有后续没有 emit 的数据都不会被继续 emit 出来。

应用场景

ReactiveX 的机制决定了它适用于各种需要异步,基于事件的场景,相对于回调来说,ReactiveX 提供的 Observable 和相关运算符更为强大,目前其应用于如下一些方面:

  • 异步网络编程
  • GUI应用,这类应用多基于事件驱动,例如基于 RxJava 封装的 RxAndroid 等

一些有意思的 ReactiveX 的封装:

  1. RxNetty ReactiveX 官方出品的一个 Netty 的 Wrapper
  2. RxAndroid 又一个 ReactiveX 官方出品的库,基于 RxJava,应用于 Android 开发

RxJava 使用

下面的章节讲详细讲述 RxJava 中各种接口的使用方式,利用实例来描述 ReactiveX 的各种思想。

实例化

实例化方法用于将某种可观察的行为封装成一个 Observable 对象,有如下一些方法可供使用:

  • create

    该方法用于返回一个 Observable,在 Subscriber 订阅时,会执行传入的 OnSubscribe 函数对象:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Observable observable = Observable.create(subscriber -> {
    try {
    if (!subscriber.isUnsubscribed()) {
    subscriber.onNext(1);
    subscriber.onNext(2);
    subscriber.onCompleted();
    }
    } catch (Throwable t) {
    subscriber.onError(t);
    }
    });

    上述例子中,当调用 observable.subscribe(System.out::println) 时,传入 create 方法的 lambda 表达式将会执行,这里出入的 lambda 表达式会先调用 Subscriber 的 onNext 方法两次,然后调用 onCompleted 方法,若有异常发生,则 onError 方法会被执行。

  • defer

    该方法用于返回一个 Observable 的工厂,和 create 方法的不同点在于,通过 create 创建的 Observable 中的数据流只会被 emit 一遍,而 defer 方法产生的 Observable 在每次有订阅时,会产生一个新的 Observable 来 emit 定义好的数据流。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    Observable observable = Observable.defer(() -> return Observable.create(subscriber -> {
    try {
    if (!subscriber.isUnsubscribed()) {
    subscriber.onNext(1);
    subscriber.onCompleted();
    }
    } catch (Throwable t) {
    subscriber.onError(t);
    }
    }));
  • empty, never, error

    这三个方法产生的 Observable 当有 Subscriber 订阅时,分别只会调用其 onCompleted 方法,何种方法都不调用,只调用 onError 方法。

  • from, just

    传入一个数据序列,产生的 Observable 将依次 emit 出其中的数据项

    1
    2
    3
    4
    5
    Observable observable = Observable.just(1, 2, 3); // 依次调用 Subscriber 的 onNext(1), onNext(2), onNext(3)
    Future<Integer> response = requestUrl();
    Observable observable2 = Observable.from(response); // 将一个 Future 对象转换成 Observable 对象
    Observable observable3 = Observable.from(new String[] {"hello", "world"}); // 依次调用 Subscriber 的 onNext("hello"), onNext("world")

运算符

运算符是 RxJava 最重要的一部分内容,也是 RxJava 最为强大的地方,通过运算符,可以修饰已有的 Observable,以改变其行为,产生更为灵活的数据流,就好比在原始的 TCP 请求回应交互中,我们可以在其上封装出 HTTP 协议,以实现比 TCP 更加复杂,更加具有现实意义的语义。

鉴于篇幅有限,本文暂不讨论异步运算符

链式调用

在介绍 RxJava 强大的运算符之前,需要介绍一下链式调用,RxJava 通过链式调用风格来组织 API,使得逻辑更加直观,减少了学习成本。所谓链式调用,就是通过将需要调用的方法组织成一个链状,避免创建临时变量来存储中间结果,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
List<Integer> urlIds = queryUrlIds();
urlIds.stream()
.map(new Function<Integer, String>() {
@Override
public String apply(Integer id) {
return queryUrl(id);
}
})
.filter(new Predicate<String>() {
@Override
public boolean test(String s) {
return !"http://www.baidu.com".equals(s);
}
})
.forEach(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});

使用链式调用能够很清晰地看到针对一个数据,进行了哪些操作,并且操作的顺序是什么样的,并且不会产生大量的临时对象,而一般来说,这些临时对象对我们的逻辑并没有帮助,仅仅是一个临时 Handler 而已。

链式调用促成了 Fluent Interface 的诞生。另外,其实链式调用和 Unix 的管道也比较类似,通过管道,将多个命令(链式调用里即函数,Java 8 之前只能通过接口方式实现 Functor,之后的 Java 通过 Lambda 表达式来实现 Functor)组合起来,共同表达某种完整的逻辑。

Observable.lift 方法

lift 方法是 Observable 中非常重要也是最灵活的一个方法,作者甚至建议:

This method requires advanced knowledge about building operators; please consider other standard composition methods first

该方法的参数是一个 Operator,用于将一个 Subscriber 包装成另一个 Subscriber,以实现修改 Observable emit 出来数据的目的,并在 lift 方法内部创建一个新的 Observable,以避免对旧 Observable 上已有的 Subscriber 产生影响。一般来说,如果需要对原 Observable 产生的所有数据项逐一进行运算,需要使用 lift 方法。

Transformation

这类运算符用于对已有 Observable emit 出来的数据进行变换,主要的运算逻辑有如下一些:

  • Map/FlatMap

    Map 和 FlatMap 均用来转换源 Observable 内 emit 出来的数据。Map 用于简单转换,例如:

    1
    Observable observable = Observable.just(1, 2, 3).map(item -> return "Got " + item);

    可以将 [1, 2, 3] 映射成 ["Got 1", "Got 2", "Got 3"] 这样的数据序列。

    FlatMap 则是将源 Observable 内 emit 出来的每一个数据项转换成新的 Observable,然后将这些 Observable 的结果扁平化之后 emit 出来,例如:

    1
    2
    3
    Observable observable = Observable.just(1, 2, 3).flatMap(item -> {
    return Observable.just(10 * item, 20 * item);
    });

    可以将 [1, 2, 3] 映射成 [10, 20, 20, 40, 30, 60] 这样的数据序列。

  • GroupBy

    用于将源 Observable 内 emit 出来的数据进行分组,分组后的数据由新的 Observable 通知到相应的 Subscriber,例如:

    1
    2
    3
    4
    5
    6
    7
    8
    Observable observable = Observable.just(1, 2, 3).groupBy(item -> item % 2 == 0 ? "EVEN" : "ODD");
    observable.subscribe(
    groupedObservable -> groupedObservable.subscribe(
    item -> {
    System.out.println(groupedObservable.getKey() + ": " + item);
    }
    )
    );

    可以产生如下的序列:

    1
    2
    3
    [1, 2, 3]
    |-- {"ODD": [1, 3]}
    `-- {"EVEN": [2]}

    每一个组里的数据序列由一个单独的 Observable 进行 emit,在同步场景下,所有分组的 Observable 内 emit 出数据项的顺序和源 Observable 内 emit 出数据项的顺序一致。

  • Scan

    这个运算符用于将上一次的 emit 出的数据传入本次 emit 调用中,例如:

    1
    Observable observable = Observable.just(1, 2, 3).scan((a, b) -> a + b);

    在 emit 1 的时候,直接 emit 出 1,但在 emit 2 的时候,会将 1 也传入进来,所以,上述代码执行的是累加源 Observable 内 emit 出来的数据项的逻辑。

  • Buffer

    用于将 emit 出来的数据进行缓存,累积到一定程度后,再统一 emit 出一个 List,例如:

    1
    Observable observable = Observable.just(1, 2, 3, 4, 5, 6).buffer(2);

    可以将 [1, 2, 3, 4, 5, 6] 转换成 [1, 2], [3, 4], [5, 6] 这样的数据序列。

  • Window

    用于定义一些 Window 来将源 Observable 内 emit 出来的数据转换成具有 Window 的 Observable,例如:

    1
    Observable observable = Observable.just(1, 2, 3, 4, 5, 6).window(3);

    可以将 [1, 2, 3, 4, 5, 6] 转换成 [1, 2, 3], [4, 5, 6] 这样的数据序列。Window 和 Buffer 最大的不同是,Buffer 返回的 Observable 每次还是 emit 一个源 Observable 内 emit 出来的数据项,而 Window 返回的 Observable 每次 emit 的是一个新的 Observable,这个新的 Observable 会 emit 源 Observable 里的数据项。

Combination

  • CombineLatest

    用于合并多个 Observable 内 emit 出来的结果,例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Observable observable = Observable.combineLatest(
    ImmutableList.of(
    Observable.just(1, 2, 3),
    Observable.just(4, 5, 6)
    ), new FuncN<Integer>() {
    @Override
    public Integer call(Object... args) {
    int sum = 0;
    for (Object arg : args) {
    sum += (int) arg;
    }
    return sum;
    }
    });

    时序图如下:

    1
    2
    3
    4
    5
    6
    7
    8
    Observable1: 1 2 3
    Observable2: 4 |5 | 6
    ===================|===||===|=====|======
    `---||---|-----|---------> FuncN.call(1, 4)
    `|---|-----|---------> FuncN.call(2, 4)
    `---|-----|---------> FuncN.call(2, 5)
    `-----|---------> FuncN.call(3, 5)
    `---------> FuncN.call(3, 6)

    这个方法常用于多个并行 Observable。

  • Merge

    用于将多个 Observable 合并成一个 Observable,例如:

    1
    2
    3
    4
    5
    6
    Observable observable = Observable.merge(
    ImmutableList.of(
    Observable.just(1, 3, 5),
    Observable.just(2, 4, 6)
    )
    );

    可以将 [[1, 3, 5], [2, 4, 6]] 合并成 [1, 3, 5, 2, 4, 6],注意,这里的数据序列 emit 的时序可能并不是按 Observable 列表里的顺序 emit 完一个 Observable 之后再 emit 下一个,可能有并行的情况。

Filter

过滤器类的操作符用于过滤 emit 出来的数据项,主要有如下一些:

  • First/Last/ElementAt

    用于过滤出特定位置的数据项,并只 emit 这些数据,例如:

    1
    Observable observable = Observable.just(1, 2, 3).first();

    可以只 emit 出第一个元素,然后收到 onCompleted 消息结束。

  • Take/TaskLast/Skip/SkipLast

    用于过滤出数据序列里前 N 个/后 N 个,或者跳过前 N 个/后 N 个数据项,例子:略。

  • Filter

    用于指定 Func1 作为一个 Predicate 提供给源 Observable,若其返回 true,则 emit 出某个数据项,否则丢弃某个数据项,例子:

    1
    2
    3
    4
    5
    6
    Observable observable = Observable.just(1, 2, 3, 4, 5).filter(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
    return integer < 3;
    }
    });

    可以过滤出小于 3 的数据项。

  • Distinct

    用于过滤出其中重复的项,使得这些重复的项每个都只被 emit 一次,例如:

    1
    Observable observable = Observable.just(1, 2, 2, 3, 4, 5, 5).distinct();

    可以将 [1, 2, 2, 3, 4, 5, 5] 过滤成 [1, 2, 3, 4, 5] 这样的数据序列。

参考资料

  1. ReactiveX
  2. Awesome-RxJava
  3. 101 Rx Samples