因为最近在深入的学习 Angular
相关知识,所以 rxjs
这部分也就是必不可少的了,那么什么是 rxjs
呢?其实简单来说,rxjs
是一种针对异步数据流编程工具,或者叫响应式扩展程序,rxjs
的目标就是异步编程,Angular
引入 rxjs
为了就是让异步可控,更为简单
大部分 rxjs
操作符都不包括在 Angular
的 Observable
基本实现中,基本实现只包括 Angular
本身所需的功能,如果需要更多的 rxjs
功能,必须导入其所定义的库来扩展 Observable
对象,rxjs
是基于观察者模式和迭代器模式以函数式编程思维来实现的,含有两个基本概念
Observables
,作为被观察者,是一个值或事件的流集合Observer
,则作为观察者,根据Observables
进行处理
两者关系如下
- 订阅,
Observer
通过Observable
提供的subscribe()
方法订阅Observable
- 发布,
Observable
通过回调next
方法向Observer
发布事件
下面我们就分别来看看这两者
Observable
Observable
就是一个拥有以下特性的函数
- 它接收一个
observer
对象作为参数,该对象中包含next
、error
和complete
方法 - 它返回一个函数,用于在销毁
Observable
时,执行清理操作,返回的是Subcription
对象,该对象中包含一个unsubscribe
方法
有几个特殊的状态,如下所示
- 永不结束,它没有
complete
状态,比如计时器 Never
,完全不发射,流中没有任何元素,但是也不结束,就是一个空的,没有状态,一般用于测试Empty
,一般也是用于测试,与Never
类似,也是空的,但是会直接进入complete
状态,所以是有结束状态的Throw
,直接进入error
状态,也是不会发射任何元素
never
never
操作符会返回一个无穷的 Observable
,当我们订阅它后,什么事情都不会发生,它是一个一直存在却什么都不做的 Observable
对象
1 | Rx.Observable.never() |
empty
empty
操作符返回一个空的 Observable
对象,如果我们订阅该对象,它会立即返回 Complete
信息
1 | Rx.Observable.empty().subscribe( |
throw
只做一件事,抛出错误
1 | Rx.Observable.throw('err') |
Observer
Observer
(观察者) 是一个普通的对象,该对象会作为subscribe()
方法的参数- 当
Observable
对象产生新值的时候,我们可以通过调用next()
方法来通知对应的观察者 - 若出现异常,则会调用观察者的
error()
方法,当我们订阅Observable
对象后,只要有新的值,都会通知对应的观察者 - 在下面两种情况中,新的值不会再通知对应的观察者
- 已调用
observer
对象的complete()
方法 - 执行取消订阅操作
- 已调用
1 | interface Observer<T> { |
下面是一些 rxjs
常用的操作符
- 创建类操作符
from
,fromEvent
,fromEventPattern
,Interval
,Timer
- 工具类操作符
do
- 变换类操作符
scan
- 数学类操作符
reduce
- 过滤类操作符
filter
,take
,first
,last
,skip
- 过滤类操作符
debounce
,debounceTime
- 过滤类操作符
distinct
,distinctUntilChanged
- 合并类操作符
merge
,concat
,startWith
- 合并类操作符
combineLatest
,withLatestFrom
,zip
下面我们就一个一个来看
from
from
可以支持从数组、类似数组的对象、Promise
、iterable
对象或类似 Observable
的对象(ES5
当中的 Observable
)来创建一个 Observable
,它几乎可以把任何对象转换成 Observable
1 | var array = [10, 20, 30] |
fromEvent
这个操作符是专门为事件转换成 Observable
而制作的,用于处理各种 DOM
中的事件
1 | var click$ = Rx.Observable.fromEvent(document, 'click') |
fromEventPattern
我们经常会遇到一些已有的代码,这些代码和类库往往不受我们的控制,无法重构或代价太大,在 rxjs
中也提供了对应的方法可以转换
1 | function addClickHandler(handler) { |
Interval
Rx
提供内建的可以创建和计时器相关的 Observable
方法,第一个是 Interval
,它可以在指定时间间隔发送整数的自增长序列
1 | // 没有条件的情况下 interval 会一直执行下去,所以触发不了剩余的两种情况(err 和 complete) |
Timer
一共有两种形式的 Timer
,一种是指定时间后返回一个序列中只有一个元素(值为 0
)的 Observable
1 | Rx.Observable.timer(1000).subscribe( |
另外一种很类似于 Interval
,接收两个参数,第一个参数表示延迟多长时间,第二个参数表示之后要以什么样的频率来进行发送,也就是说,在一开始的延迟时间后,每隔一段时间就会返回一个整数序列
1 | Rx.Observable.timer(1000, 1000).subscribe( |
do
一般用来调试,有时也会用来作为外部条件的设置,可以作为一个可以与外部交互的桥梁,因为当 subscribe()
之后,这个流中的东西就已经固定了,就没有办法在对流继续做一些链接的操作,简单来说,do
可以起到一个临时 subscribe()
的作用,但是并没有中断流
1 | // 即取得了这个值,也可以改变这个值 |
scan
接收一个函数作为参数,而函数又接收两个参数
1 | scan((x, y) => { |
x
为累加器,将函数返回的值(比如上面的 x + y
)作为下一次累加的 x
值传入进来,与递归很类似,y
为上一个序列过来所接收的值
1 | Rx.Observable.interval(1000) |
流程图如下所示
1 | 原始序列: 0------1------2------3------4------5------6------ |
在有些情况下,需要记住之前的操作结果,这时候用 scan
就是很好的选择
reduce
我们尝试将上面的例子改为 reduce
1 | Rx.Observable.interval(1000) |
与 scan
有一点不同,每次做叠加之后都会发射出一个值,reduce
会把序列当中所有的东西做最后的一个累加值,只会发射出一个值,而又由于上面是一个无限的序列,所以会是一个 Never
,所以调换一下 take()
的位置
1 | Rx.Observable.interval(1000) |
可以看到结果为 12
,其实本质上与 scan
的运算是一致的,只不过 reduce
要算出一个最终值,而且只发射最终值,reduce
不仅仅可以用于数学运算,还有一些高级的用法
1 | Rx.Observable.interval(100) |
filter
1 | let logLabel = '当前值为:' |
first
其实等同于 take(1)
,如下对比
1 | Rx.Observable.interval(1000) |
last
1 | // 如果替换成 last() 则会变为一个 Never 状态,因为序列永远无法到达 |
skip
1 | // 过滤,这里是过滤掉了前三个,即 0,2,4 |
debounce,debounceTime
两个操作符的作用都是节流器,限制一定时间内的输出,但是在使用上会有一些不同,所以我们分别来进行介绍
debounceTime
直接使用,后面传入指定的时间即可,即可达到在规定的时间内节流的作用
1 | Rx.Observable.fromEvent(number, 'keyup') |
debounce
与 debounceTime
时分类似,但是静默时间段由第二个 Observable
决定,所以可以操作的空间就比较广泛
1 | Rx.Observable.fromEvent(number, 'keyup') |
distinct,distinctUntilChanged
distinct
它的作用是将整个序列的流中不一样的保留下来,一样的重复的则过滤掉,比如下面这个示例,如果将输入框中的元素选定或者删除,然后从新输入相同的内容,是不会触发 keyup
事件的
1 | Rx.Observable.fromEvent(number, 'keyup') |
使用的时候需要小心,尤其是应用在无尽序列当中,因为会极大的消耗内存
distinctUntilChanged
它的作用是只和前一个元素进行对比,前一个元素如果跟其一样,那就抛弃掉
1 | Rx.Observable.fromEvent(number, 'keyup') |
merge,concat,startWith
这几个操作符的作用都是类似的,都是针对多个流,两个或者两个以上的流进行合并,只是合并的方式不同
merge
两个流,按各自的事件顺序进行合并,严格有时间交叉,一种简单的合并,不回去更改两条流的任何东西
1 | const a = document.querySelector('#a') |
在两个输入框内分别输入值,则会交替的输出每个输入框内的值
concat
严格来说不属于合并,应该属于对接,等待前一个流完成了之后,才会进行下一个流,尽管后面的流的序列很快的执行,也会等待前一个流完成之后才会进行输出
1 | Rx.Observable.concat(a$, b$) |
从运行结果可以看出,第一个输入框可以正常的输出内容,但是触发第二个输入框的时候是没有反应的,原因是因为第一个输入框是一个无尽序列,理论上只有第一个序列完成后才会输出后面的值,稍作修改
1 | const a$ = Rx.Observable.fromEvent(a, 'keyup').pluck('target', 'value') |
这样就可以看到输入的值了,因为只有等待第一个序列完成后后续序列输入的值才会输出
startWith
如果希望这个流在一开始的时候就有一个值可以发射出来的话,就可以使用 startWith
设置一个默认值,类似初始值,类似于在序列前 concat
一个值
1 | Rx.Observable.from([1, 2, 3, 4]).startWith(0).subscribe(v => console.log(v)) |
一般都是用来赋予初始值,避免在一开始的时候流是空的
combineLatest,withLatestFrom,zip
combineLatest
组成它的任何一个流当中有新元素出现的话,那么它就会产生一个新流当中对应的一个数据,和 withLatestFrom
相比的话,无论哪个流有改变均会有输出
zip
一对一的输出,和 combineLatest
类似,但是严格要求必须配对,即全都需要新的数据,combineLatest
和 zip
的示例可以参考文章开头部分
withLatestFrom
以一个流为主,然后这个流产生的数据的时候会去获取另一个流的最新值,注意输出的结果为数组
1 | const a$ = Rx.Observable.fromEvent(a, 'keyup').pluck('target', 'value') |
简单来说,只有当第一个输入框内的值发生变化的时候,才会去取第二个输入框内的值组合成一个数组发送出来,所以改变第二个输入框内的值是不会引起变化的