rxjs 中的高阶操作符

rxjs 中的高阶操作符

在之前的文章当中我们简单的介绍了 什么是 rxjs 以及一些常见的操作符使用场景,那么在本章当中,我们就在之前的基础上来看看另外几个比较常用的高阶操作符,关于更多的操作符的详细内容可以参见官方文档 rxjs

flatMap

当流中的每个元素其本身又是一个流的情况下,高阶操作符就是用来处理这样的情况

1
2
3
4
5
6
7
Rx.Observable
.fromEvent(a, 'keyup')
.pluck('target', 'value')
.map(_ => Rx.Observable.interval(100))
.subscribe(val => {
val.subscribe(val => console.log(val))
})

像这样一层一层的调用不是很好的方法,所以我们可以采用 flatMap(在 rxjs 中是 mergeMap 的别名)

1
2
3
4
5
Rx.Observable
.fromEvent(a, 'keyup')
.pluck('target', 'value')
.flatMap(_ => Rx.Observable.interval(100))
.subscribe(val => console.log(val))

这样一来每次触发都会从新生成一个新流,而两个新流则是并行,其中的每一个流并没有断开,mergeMap 会保证所有的订阅,保证外层元素所对应的子流的订阅

switchMap

flatMap 有些不同,它会断开之前的流,转而从新生成一个新流

1
2
3
4
5
Rx.Observable
.fromEvent(a, 'keyup')
.pluck('target', 'value')
.switchMap(_ => Rx.Observable.interval(1000))
.subscribe(val => console.log(val))

一旦有新的外层元素进来,它就会抛弃掉这个元素之前的外层元素所关联的子元素

count

这个操作符简单来说就是用来计算源的发送数量,并当源完成时发出该数值,可以简单的理解为将流中的数据做一个统计,然后输出出来(最后也是一个值),也就是告知之前的任务已经全部完成了,比如我们有一个删除列表的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
// ...

del(project: Project): Observable<Project> {
// 用 mergeMap 的原因是因为如果在删除过程中有新的 id 进来,原有的删除操作还是需要继续做的,而且新的删除操作也会操作,即所有外层元素的流进来以后,它对应的子流全部都要保持住
const delTasks$ = Observable.from(project.list)
.mergeMap(id => this.http.delete(url))
.count()
// 汇总以后不用去关心其外层的流,利用 switchMap 执行删除操作,因为最后希望返回一个 project 对象,所以使用 mapTo 进行转换一下
return delTasks$.switchMap(_ => this.http.delete(url))
.mapTo(project)
}

// ...

Observable 的冷和热

冷和热的概念可能不太好理解,不过这里我们换一个角度来看待这个问题,我们可以将两者的区别分为一个是看视频,一个是看电视直播,虽然都是同样的内容,但是视频每次都必须需要从头开始看,这就是冷的 Observable,而电视直播,无论合适进来,看到的都是同样的内容,这就是热的 Observable

1
2
3
4
5
6
7
// 冷的 Observable
const count$ = Rx.Observable.interval(1000)
const sub1 = count$.subscribe(v => console.log(v))

setTimeout(() => {
const sub2 = count$.subscribe(v => console.log(v))
}, 2000)

每次有新的流进来,都是从头开始计算,下面来看看热的

1
2
3
4
5
6
7
// 热的 Observable,添加上 share() 即可
const count$ = Rx.Observable.interval(1000).share()
const sub1 = count$.subscribe(v => console.log(v))

setTimeout(() => {
const sub2 = count$.subscribe(v => console.log(v))
}, 2000)

可以发现,后进来的流会将之前的全部抛弃掉,直接从进来时候的流的位置跟着往下走

Subject

Subject 既是 Observable 对象,又是 Observer 对象,是一个特殊的对象,一方面可以作为流的组成也就是输出的一方,另一方可以作为流的观察一方即接收的一方,这个操作符也是平常开发过程当中使用较多的,Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 Subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出

当有新消息时,Subject 会对内部的 observers 列表进行组播(multicast),Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法

  • next,每当 Subject 对象接收到新值的时候,next 方法会被调用
  • error,运行中出现异常,error 方法会被调用
  • completeSubject 订阅的 Observable 对象结束后,complete 方法会被调用
  • subscribe,添加观察者
  • unsubscribe,取消订阅 (设置终止标识符、清空观察者列表)

因为其同时实现了两个接口,在一些特殊的情景下会非常有用,下面是两个延伸方法

  • ReplaySubject 两者相差不多,Replay 会保留最新的 N 个值
  • BehaviorSubject 与上面使用方法一致,不过是一种特殊形式,会保留最新的一个值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 调整一下上面的示例,这种情况下依然是一种冷的 Observable
const counter$ = Rx.Observable.interval(1000).take(5)

const observer1 = {
next: (v) => console.log(`v1 >> ${v}`),
error: (err) => console.log(err),
complete: (_) => console.log(`Completed -`)
}

const observer2 = {
next: (v) => console.log(`v2 >> ${v}`),
error: (err) => console.log(err),
complete: (_) => console.log(`Completed -`)
}

counter$.subscribe(observer1)
setTimeout(() => {
counter$.subscribe(observer2)
}, 2000)

可以发现,此时的输出结果是一样的,但是此时面临的情况为,需要执行两次 subscribe() 操作,而有时的场景为定义好的序列应该会在什么时刻来进行触发,只需要执行一次操作,两个序列都会执行,所以在这种情况下就可以使用 Subject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 利用 subject 来进行中转,这种情况下成为了热的 Observable
const counter$ = Rx.Observable.interval(1000).take(5)
const subject = new Rx.Subject()

const observer1 = {
next: (v) => console.log(`v1 >> ${v}`),
error: (err) => console.log(err),
complete: (_) => console.log(`Completed -`)
}

const observer2 = {
next: (v) => console.log(`v2 >> ${v}`),
error: (err) => console.log(err),
complete: (_) => console.log(`Completed -`)
}

subject.subscribe(observer1)
setTimeout(() => {
subject.subscribe(observer2)
}, 2000)

counter$.subscribe(subject)

subject.next()

可以往流中推送两个新值

1
2
3
4
5
6
7
8
subject.next(10)
subject.next(11)
subject.subscribe(observer1)
setTimeout(() => {
subject.subscribe(observer2)
}, 2000)

counter$.subscribe(subject)

但是需要注意的是,添加的位置很重要,比如上面这样,与之前的输出是一样的,流中的数据没有改变,这是因为在推送新值的时候,还没有进行订阅

1
2
3
4
5
6
7
8
9
10
// 调整为这样即可
subject.subscribe(observer1)
subject.next(10)
subject.next(11)

setTimeout(() => {
subject.subscribe(observer2)
}, 2000)

counter$.subscribe(subject)

可以看到第一个流中就有推送的新值存在了,但是第二个流中是没有新值的,因为在第二个流开始订阅的时候,推送新值的时间点已经过去了

Rx.ReplaySubject()

将过去发生的一些事件进行重播,比如将之前发生的两个事件发生重播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const subject = new Rx.ReplaySubject(2)
subject.next(10)
subject.next(11)

// ...
// v1 >> 10
// v1 >> 11
// v1 >> 0
// v2 >> 11
// v2 >> 0
// v1 >> 1
// v2 >> 1
// v1 >> 2
// v2 >> 2
// v1 >> 3
// v2 >> 3
// v1 >> 4
// v2 >> 4
// Completed -
// Completed -

可以发现,第一个流重播的为 1011,而第二个流重播的则是 110,因为当第二个流开始重播的时候发现前两个的输出分别为 110

Rx.BehaviorSubject()

ReplaySubject() 不同的是,BehaviorSubject() 只会记住最新的值

1
2
3
const subject = new Rx.BehaviorSubject()
subject.next(10)
subject.next(11)

下面是一个示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 使用 BehaviorSubject 去存储,因为其总是可以记住上一次的一个最新值
private _dragData = new BehaviorSubject<DragData>(null)

// 在开始拖拽的时候,把流中新增一个 data 元素,把这个最新值 next 出去
setDragData(data: DragData) {
this._dragData.next(data)
}

// 当放到否个区域的时候,可以得到这个 Observable,所以取值的时候就会取到最新的 data,尽管在拖拽的过程中值已经发射完了,但是依然可以得到上一次发射之后最新的一个值
// this._dragData.asObservable() 的作用是将 Subject 转换成 Observable
getDragData(): Observable<DragData> {
return this._dragData.asObservable()
}

// 清空的时候将一个 null 放到这个流中,可以保证在其他误接收的地方会发现这是一个 null,即没有这个值
clearDragData() {
this._dragData.next(null)
}

Async Pipe

在常规编程中,如果得到一个 Observable 数组,在页面当中是没有办法去直接使用,需要去 Subscribe 一下,然后赋予给声明的本地变量,用来得到返回的数据,最后再用于页面渲染,但是有了 Async Pipe 以后,上面这些操作都不需要了,可以直接在页面当中直接使用 Observable,并且不需要去取消订阅

1
2
3
4
5
<md-option
*ngFor='let item of memberRestlts$ | async'
[value]='item'
(onSelectionChange)='handleMembersSelection(item)'
>{{item.email}}</md-option>
1
2
3
4
5
this.memberRestlts$ = this.form.get('memberSearch').valueChanges
.debounceTime(300)
.distinctUntilChanged()
.filter(s => s && s.length > 1)
.switchMap(str => this.service$.serachUsers(str))

按照之前的操作,我们应当去 subscribe 一下,但是这里使用了 Async Pipe,就不需要那么多麻烦的操作

1
2
// 直接定义一个流
memberRestlts$: Observable<User[]>

然后让本地这个流等于我们组合后的流,然后在 HTML 模版当中便可以直接使用,别忘了加上 | async

取消订阅释放资源

Angular 当中,我们如果使用了数据流的话,一般会在使用完毕后在 ngOnDestroy() 生命周期当中去销毁这个流以节约性能,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ...

getUserInfoSubscription = new Subscription()

ngOnInit() {
this.getUserInfoSubscription.add(this._store.select(fromRoot.getUserInfo).filter(user => !!user).subscribe(user => {
// ...
}))
}

ngOnDestroy(): void {
this.getUserInfoSubscription.unsubscribe()
}

// ...

但是又如上文我们提到过的 Async Pipe,它又是不需要我们手动的去取消订阅,所以下面我们就来看一些需要我们手动的去取消订阅释放资源和不需要我们去手动操作的实际使用场景

需要手动取消订阅释放资源的场景

表单当中的场景

1
2
3
4
5
6
7
8
9
ngOnInit() {
this.form = new FormGroup({...})
// 监听表单值的变化
this.valueChanges = this.form.valueChanges.subscribe(console.log)
}

ngOnDestroy() {
this.valueChanges.unsubscribe()
}

路由当中的场景

1
2
3
4
5
6
7
ngOnInit() {
this.route.params.subscribe(console.log)
}

ngOnDestroy() {
// 手动执行取消订阅的操作
}

Renderer 服务

1
2
3
4
5
6
7
8
9
10
11
12
13
constructor(
private renderer: Renderer2,
private element: ElementRef
) { }

ngOnInit() {
this.click = this.renderer
.listen(this.element.nativeElement, 'click', handler)
}

ngOnDestroy() {
this.click.unsubscribe()
}

interval() & fromEvent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
constructor(private element : ElementRef) { }

interval: Subscription
click: Subscription

ngOnInit() {
this.interval = Observable.interval(1000).subscribe(console.log)
this.click = Observable
.fromEvent(this.element.nativeElement, 'click')
.subscribe(console.log)
}

ngOnDestroy() {
this.interval.unsubscribe()
this.click.unsubscribe()
}

Redux Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
constructor(private store: Store) { }

todos: Subscription

ngOnInit() {
/**
* select(key : string) {
* return this.map(state => state[key]).distinctUntilChanged()
* }
*/
this.todos = this.store.select('todos').subscribe(console.log)
}

ngOnDestroy() {
this.todos.unsubscribe()
}

无需手动释放资源场景

简单来说主要分为以下两种,所以大部分情况下还是需要我们手动的去取消订阅释放资源

  • AsyncPipe
    • 当组件销毁的时候,async 管道会自动执行取消订阅操作,进而避免内存泄
  • @HostListener
    • 如果使用 @HostListener 装饰器,添加事件监听时,我们无法手动取消订阅

如果需要手动移除事件监听的话,可以使用以下的方式

1
2
3
4
5
// subscribe
this.handler = this.renderer.listen('document', 'click', event =>{...})

// unsubscribe
this.handler()

Finite Observable

当你使用 HTTP 服务或 timer Observable 对象时,你也不需要手动执行取消订阅操作

1
2
3
4
5
ngOnInit() {
// 表示 1s 后发出值,然后就结束了
Observable.timer(1000).subscribe(console.log)
this.http.get('http://api.com').subscribe(console.log)
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×