在之前的文章当中我们简单的介绍了 什么是 rxjs 以及一些常见的操作符使用场景,那么在本章当中,我们就在之前的基础上来看看另外几个比较常用的高阶操作符,关于更多的操作符的详细内容可以参见官方文档 rxjs
flatMap
当流中的每个元素其本身又是一个流的情况下,高阶操作符就是用来处理这样的情况
1 2 3 4 5 6 7
| Rx.Observable .fromEvent(a, 'keyup') .pluck('target', 'value') .map(_ => Rx.Observable.interval(100)) .subscribe(val => { val.subscribe(val => console.log(val)) })
|
像这样一层一层的调用不是很好的方法,所以我们可以采用 flatMap
(在 rxjs
中是 mergeMap
的别名)
1 2 3 4 5
| Rx.Observable .fromEvent(a, 'keyup') .pluck('target', 'value') .flatMap(_ => Rx.Observable.interval(100)) .subscribe(val => console.log(val))
|
这样一来每次触发都会从新生成一个新流,而两个新流则是并行,其中的每一个流并没有断开,mergeMap
会保证所有的订阅,保证外层元素所对应的子流的订阅
switchMap
与 flatMap
有些不同,它会断开之前的流,转而从新生成一个新流
1 2 3 4 5
| Rx.Observable .fromEvent(a, 'keyup') .pluck('target', 'value') .switchMap(_ => Rx.Observable.interval(1000)) .subscribe(val => console.log(val))
|
一旦有新的外层元素进来,它就会抛弃掉这个元素之前的外层元素所关联的子元素
count
这个操作符简单来说就是用来计算源的发送数量,并当源完成时发出该数值,可以简单的理解为将流中的数据做一个统计,然后输出出来(最后也是一个值),也就是告知之前的任务已经全部完成了,比如我们有一个删除列表的操作
1 2 3 4 5 6 7 8 9 10 11 12 13
|
del(project: Project): Observable<Project> { const delTasks$ = Observable.from(project.list) .mergeMap(id => this.http.delete(url)) .count() return delTasks$.switchMap(_ => this.http.delete(url)) .mapTo(project) }
|
Observable 的冷和热
冷和热的概念可能不太好理解,不过这里我们换一个角度来看待这个问题,我们可以将两者的区别分为一个是看视频,一个是看电视直播,虽然都是同样的内容,但是视频每次都必须需要从头开始看,这就是冷的 Observable
,而电视直播,无论合适进来,看到的都是同样的内容,这就是热的 Observable
1 2 3 4 5 6 7
| const count$ = Rx.Observable.interval(1000) const sub1 = count$.subscribe(v => console.log(v))
setTimeout(() => { const sub2 = count$.subscribe(v => console.log(v)) }, 2000)
|
每次有新的流进来,都是从头开始计算,下面来看看热的
1 2 3 4 5 6 7
| const count$ = Rx.Observable.interval(1000).share() const sub1 = count$.subscribe(v => console.log(v))
setTimeout(() => { const sub2 = count$.subscribe(v => console.log(v)) }, 2000)
|
可以发现,后进来的流会将之前的全部抛弃掉,直接从进来时候的流的位置跟着往下走
Subject
Subject
既是 Observable
对象,又是 Observer
对象,是一个特殊的对象,一方面可以作为流的组成也就是输出的一方,另一方可以作为流的观察一方即接收的一方,这个操作符也是平常开发过程当中使用较多的,Subject
其实是观察者模式的实现,所以当观察者订阅 Subject
对象时,Subject
对象会把订阅者添加到观察者列表中,每当有 Subject
对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next()
方法,把值一一送出
当有新消息时,Subject
会对内部的 observers
列表进行组播(multicast
),Subject
之所以具有 Observable
中的所有方法,是因为 Subject
类继承了 Observable
类,在 Subject
类中有五个重要的方法
next
,每当 Subject
对象接收到新值的时候,next
方法会被调用
error
,运行中出现异常,error
方法会被调用
complete
,Subject
订阅的 Observable
对象结束后,complete
方法会被调用
subscribe
,添加观察者
unsubscribe
,取消订阅 (设置终止标识符、清空观察者列表)
因为其同时实现了两个接口,在一些特殊的情景下会非常有用,下面是两个延伸方法
ReplaySubject
两者相差不多,Replay
会保留最新的 N
个值
BehaviorSubject
与上面使用方法一致,不过是一种特殊形式,会保留最新的一个值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| const counter$ = Rx.Observable.interval(1000).take(5)
const observer1 = { next: (v) => console.log(`v1 >> ${v}`), error: (err) => console.log(err), complete: (_) => console.log(`Completed -`) }
const observer2 = { next: (v) => console.log(`v2 >> ${v}`), error: (err) => console.log(err), complete: (_) => console.log(`Completed -`) }
counter$.subscribe(observer1) setTimeout(() => { counter$.subscribe(observer2) }, 2000)
|
可以发现,此时的输出结果是一样的,但是此时面临的情况为,需要执行两次 subscribe()
操作,而有时的场景为定义好的序列应该会在什么时刻来进行触发,只需要执行一次操作,两个序列都会执行,所以在这种情况下就可以使用 Subject
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| const counter$ = Rx.Observable.interval(1000).take(5) const subject = new Rx.Subject()
const observer1 = { next: (v) => console.log(`v1 >> ${v}`), error: (err) => console.log(err), complete: (_) => console.log(`Completed -`) }
const observer2 = { next: (v) => console.log(`v2 >> ${v}`), error: (err) => console.log(err), complete: (_) => console.log(`Completed -`) }
subject.subscribe(observer1) setTimeout(() => { subject.subscribe(observer2) }, 2000)
counter$.subscribe(subject)
|
subject.next()
可以往流中推送两个新值
1 2 3 4 5 6 7 8
| subject.next(10) subject.next(11) subject.subscribe(observer1) setTimeout(() => { subject.subscribe(observer2) }, 2000)
counter$.subscribe(subject)
|
但是需要注意的是,添加的位置很重要,比如上面这样,与之前的输出是一样的,流中的数据没有改变,这是因为在推送新值的时候,还没有进行订阅
1 2 3 4 5 6 7 8 9 10
| subject.subscribe(observer1) subject.next(10) subject.next(11)
setTimeout(() => { subject.subscribe(observer2) }, 2000)
counter$.subscribe(subject)
|
可以看到第一个流中就有推送的新值存在了,但是第二个流中是没有新值的,因为在第二个流开始订阅的时候,推送新值的时间点已经过去了
Rx.ReplaySubject()
将过去发生的一些事件进行重播,比如将之前发生的两个事件发生重播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| const subject = new Rx.ReplaySubject(2) subject.next(10) subject.next(11)
|
可以发现,第一个流重播的为 10
和 11
,而第二个流重播的则是 11
和 0
,因为当第二个流开始重播的时候发现前两个的输出分别为 11
和 0
Rx.BehaviorSubject()
与 ReplaySubject()
不同的是,BehaviorSubject()
只会记住最新的值
1 2 3
| const subject = new Rx.BehaviorSubject() subject.next(10) subject.next(11)
|
下面是一个示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private _dragData = new BehaviorSubject<DragData>(null)
setDragData(data: DragData) { this._dragData.next(data) }
getDragData(): Observable<DragData> { return this._dragData.asObservable() }
clearDragData() { this._dragData.next(null) }
|
Async Pipe
在常规编程中,如果得到一个 Observable
数组,在页面当中是没有办法去直接使用,需要去 Subscribe
一下,然后赋予给声明的本地变量,用来得到返回的数据,最后再用于页面渲染,但是有了 Async Pipe
以后,上面这些操作都不需要了,可以直接在页面当中直接使用 Observable
,并且不需要去取消订阅
1 2 3 4 5
| <md-option *ngFor='let item of memberRestlts$ | async' [value]='item' (onSelectionChange)='handleMembersSelection(item)' >{{item.email}}</md-option>
|
1 2 3 4 5
| this.memberRestlts$ = this.form.get('memberSearch').valueChanges .debounceTime(300) .distinctUntilChanged() .filter(s => s && s.length > 1) .switchMap(str => this.service$.serachUsers(str))
|
按照之前的操作,我们应当去 subscribe
一下,但是这里使用了 Async Pipe
,就不需要那么多麻烦的操作
1 2
| memberRestlts$: Observable<User[]>
|
然后让本地这个流等于我们组合后的流,然后在 HTML
模版当中便可以直接使用,别忘了加上 | async
取消订阅释放资源
在 Angular
当中,我们如果使用了数据流的话,一般会在使用完毕后在 ngOnDestroy()
生命周期当中去销毁这个流以节约性能,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
getUserInfoSubscription = new Subscription()
ngOnInit() { this.getUserInfoSubscription.add(this._store.select(fromRoot.getUserInfo).filter(user => !!user).subscribe(user => { })) }
ngOnDestroy(): void { this.getUserInfoSubscription.unsubscribe() }
|
但是又如上文我们提到过的 Async Pipe
,它又是不需要我们手动的去取消订阅,所以下面我们就来看一些需要我们手动的去取消订阅释放资源和不需要我们去手动操作的实际使用场景
需要手动取消订阅释放资源的场景
表单当中的场景
1 2 3 4 5 6 7 8 9
| ngOnInit() { this.form = new FormGroup({...}) this.valueChanges = this.form.valueChanges.subscribe(console.log) }
ngOnDestroy() { this.valueChanges.unsubscribe() }
|
路由当中的场景
1 2 3 4 5 6 7
| ngOnInit() { this.route.params.subscribe(console.log) }
ngOnDestroy() { }
|
Renderer 服务
1 2 3 4 5 6 7 8 9 10 11 12 13
| constructor( private renderer: Renderer2, private element: ElementRef ) { }
ngOnInit() { this.click = this.renderer .listen(this.element.nativeElement, 'click', handler) }
ngOnDestroy() { this.click.unsubscribe() }
|
interval() & fromEvent()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| constructor(private element : ElementRef) { }
interval: Subscription click: Subscription
ngOnInit() { this.interval = Observable.interval(1000).subscribe(console.log) this.click = Observable .fromEvent(this.element.nativeElement, 'click') .subscribe(console.log) }
ngOnDestroy() { this.interval.unsubscribe() this.click.unsubscribe() }
|
Redux Store
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| constructor(private store: Store) { }
todos: Subscription
ngOnInit() {
this.todos = this.store.select('todos').subscribe(console.log) }
ngOnDestroy() { this.todos.unsubscribe() }
|
无需手动释放资源场景
简单来说主要分为以下两种,所以大部分情况下还是需要我们手动的去取消订阅释放资源
AsyncPipe
- 当组件销毁的时候,
async
管道会自动执行取消订阅操作,进而避免内存泄
@HostListener
- 如果使用
@HostListener
装饰器,添加事件监听时,我们无法手动取消订阅
如果需要手动移除事件监听的话,可以使用以下的方式
1 2 3 4 5
| this.handler = this.renderer.listen('document', 'click', event =>{...})
this.handler()
|
Finite Observable
当你使用 HTTP
服务或 timer Observable
对象时,你也不需要手动执行取消订阅操作
1 2 3 4 5
| ngOnInit() { Observable.timer(1000).subscribe(console.log) this.http.get('http://api.com').subscribe(console.log) }
|