什么是 rxjs

什么是 rxjs

因为最近在深入的学习 Angular 相关知识,所以 rxjs 这部分也就是必不可少的了,那么什么是 rxjs 呢?其实简单来说,rxjs 是一种针对异步数据流编程工具,或者叫响应式扩展程序,rxjs 的目标就是异步编程,Angular 引入 rxjs 为了就是让异步可控,更为简单

大部分 rxjs 操作符都不包括在 AngularObservable 基本实现中,基本实现只包括 Angular 本身所需的功能,如果需要更多的 rxjs 功能,必须导入其所定义的库来扩展 Observable 对象,rxjs 是基于观察者模式和迭代器模式以函数式编程思维来实现的,含有两个基本概念

  • Observables,作为被观察者,是一个值或事件的流集合
  • Observer,则作为观察者,根据 Observables 进行处理

两者关系如下

  • 订阅,Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable
  • 发布,Observable 通过回调 next 方法向 Observer 发布事件

下面我们就分别来看看这两者

Observable

Observable 就是一个拥有以下特性的函数

  • 它接收一个 observer 对象作为参数,该对象中包含 nexterrorcomplete 方法
  • 它返回一个函数,用于在销毁 Observable 时,执行清理操作,返回的是 Subcription 对象,该对象中包含一个 unsubscribe 方法

有几个特殊的状态,如下所示

  • 永不结束,它没有 complete 状态,比如计时器
  • Never,完全不发射,流中没有任何元素,但是也不结束,就是一个空的,没有状态,一般用于测试
  • Empty,一般也是用于测试,与 Never 类似,也是空的,但是会直接进入 complete 状态,所以是有结束状态的
  • Throw,直接进入 error 状态,也是不会发射任何元素

never

never 操作符会返回一个无穷的 Observable,当我们订阅它后,什么事情都不会发生,它是一个一直存在却什么都不做的 Observable 对象

1
2
3
4
5
6
Rx.Observable.never()
.subscribe(
v => { console.log(v) },
e => { console.log(e) },
() => { console.log('complete') }
)

empty

empty 操作符返回一个空的 Observable 对象,如果我们订阅该对象,它会立即返回 Complete 信息

1
2
3
4
5
Rx.Observable.empty().subscribe(
null,
null,
() => { console.log('Completed') }
)

throw

只做一件事,抛出错误

1
2
3
4
5
6
Rx.Observable.throw('err')
.subscribe(
v => { console.log(v) },
e => { console.log(e) },
() => { console.log('complete') }
)

Observer

  • Observer (观察者) 是一个普通的对象,该对象会作为 subscribe() 方法的参数
  • Observable 对象产生新值的时候,我们可以通过调用 next() 方法来通知对应的观察者
  • 若出现异常,则会调用观察者的 error() 方法,当我们订阅 Observable 对象后,只要有新的值,都会通知对应的观察者
  • 在下面两种情况中,新的值不会再通知对应的观察者
    • 已调用 observer 对象的 complete() 方法
    • 执行取消订阅操作
1
2
3
4
5
6
7
interface Observer<T> {
closed?: boolean // 标识是否已经取消对 Observable 对象的订阅
next: (value: T) => void // 每当 Observable 发送新值的时候,next 方法会被调用
error: (err: any) => void // 当 Observable 内发生错误时,error 方法就会被调用
complete: () => void // 当 Observable 数据终止后,complete 方法会被调用
// 需要注意:在调用 complete 方法之后,next 方法不会再被调用
}

下面是一些 rxjs 常用的操作符

  • 创建类操作符 fromfromEventfromEventPatternIntervalTimer
  • 工具类操作符 do
  • 变换类操作符 scan
  • 数学类操作符 reduce
  • 过滤类操作符 filtertakefirstlastskip
  • 过滤类操作符 debouncedebounceTime
  • 过滤类操作符 distinctdistinctUntilChanged
  • 合并类操作符 mergeconcatstartWith
  • 合并类操作符 combineLatestwithLatestFromzip

下面我们就一个一个来看

from

from 可以支持从数组、类似数组的对象、Promiseiterable 对象或类似 Observable 的对象(ES5 当中的 Observable)来创建一个 Observable,它几乎可以把任何对象转换成 Observable

1
2
3
4
var array = [10, 20, 30]
var result$ = Rx.Observable.from(array)

result$.subscribe(x => console.log(x))

fromEvent

这个操作符是专门为事件转换成 Observable 而制作的,用于处理各种 DOM 中的事件

1
2
var click$ = Rx.Observable.fromEvent(document, 'click')
click$.subscribe(x => console.log(x))

fromEventPattern

我们经常会遇到一些已有的代码,这些代码和类库往往不受我们的控制,无法重构或代价太大,在 rxjs 中也提供了对应的方法可以转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function addClickHandler(handler) {
document.addEventListener('click', handler)
}

function removeClickHandler(handler) {
document.removeEventListener('click', handler)
}

var click$ = Rx.Observable.fromEventPattern(
addClickHandler,
removeClickHandler
)

click$.subscribe(x => console.log(x))

Interval

Rx 提供内建的可以创建和计时器相关的 Observable 方法,第一个是 Interval,它可以在指定时间间隔发送整数的自增长序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 没有条件的情况下 interval 会一直执行下去,所以触发不了剩余的两种情况(err 和 complete)
Rx.Observable.interval(1000).subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

// 稍作修改,添加上条件,便可以看到 Completed,其中的 take() 表示取前几个
Rx.Observable.interval(1000).take(4)
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

Timer

一共有两种形式的 Timer,一种是指定时间后返回一个序列中只有一个元素(值为 0)的 Observable

1
2
3
4
5
6
7
8
9
10
11
Rx.Observable.timer(1000).subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

另外一种很类似于 Interval,接收两个参数,第一个参数表示延迟多长时间,第二个参数表示之后要以什么样的频率来进行发送,也就是说,在一开始的延迟时间后,每隔一段时间就会返回一个整数序列

1
2
3
4
5
6
7
8
9
10
11
Rx.Observable.timer(1000, 1000).subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

do

一般用来调试,有时也会用来作为外部条件的设置,可以作为一个可以与外部交互的桥梁,因为当 subscribe() 之后,这个流中的东西就已经固定了,就没有办法在对流继续做一些链接的操作,简单来说,do 可以起到一个临时 subscribe() 的作用,但是并没有中断流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 即取得了这个值,也可以改变这个值
let logLabel = '当前值为:'

Rx.Observable.interval(1000)
.map(val => val * 2)
.do(v => {
console.log(logLabel + v)
logLabel = '修改后,当前值为:'
})
.take(3)
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

scan

接收一个函数作为参数,而函数又接收两个参数

1
2
3
scan((x, y) => {
return x + y
})

x 为累加器,将函数返回的值(比如上面的 x + y)作为下一次累加的 x 值传入进来,与递归很类似,y 为上一个序列过来所接收的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.scan((x, y) => { return x + y })
.take(4)
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

// 输出结果为
// Value 0
// Value 2
// Value 6
// Value 12
// Completed

流程图如下所示

1
2
3
4
5
6
7
8
9
10
11
12
原始序列: 0------1------2------3------4------5------6------

filter(val => val % 2 === 0)

filter: 0-------------2-------------4-------------6------

scan((x, y) => { return x + y })

x = 0, y = 0 x = 0, y = 2 x = 2, y = 4 x = 6, y = 6
\ \ \ \
\ \ \ \
scan: 0-------------2------------6-------------12-----

在有些情况下,需要记住之前的操作结果,这时候用 scan 就是很好的选择

reduce

我们尝试将上面的例子改为 reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.reduce((x, y) => { return x + y })
.take(4)
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

scan 有一点不同,每次做叠加之后都会发射出一个值,reduce 会把序列当中所有的东西做最后的一个累加值,只会发射出一个值,而又由于上面是一个无限的序列,所以会是一个 Never,所以调换一下 take() 的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.take(4)
.reduce((x, y) => { return x + y })
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

可以看到结果为 12,其实本质上与 scan 的运算是一致的,只不过 reduce 要算出一个最终值,而且只发射最终值,reduce 不仅仅可以用于数学运算,还有一些高级的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Rx.Observable.interval(100)
.filter(val => val % 2 === 0)
.take(4)
.reduce((x, y) => { return [...x, y] }, [])
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)
// [0, 2, 4, 6]

filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
let logLabel = '当前值为:'

// 利用 filter 过滤奇数,需要注意这里的 take(3),原本应该是 0,1,2 就结束了
// 但是之前使用了 filter() 使得偶数放行,所以这里表示需要等待三个偶数经过后才会结束
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.do(v => {
console.log(logLabel + v)
logLabel = '修改后,当前值为:'
})
.take(3)
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

first

其实等同于 take(1),如下对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.first()
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

// ==> 等同于 take(1)
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.take(1)
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

last

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 如果替换成 last() 则会变为一个 Never 状态,因为序列永远无法到达
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.last()
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

skip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 过滤,这里是过滤掉了前三个,即 0,2,4
Rx.Observable.interval(1000)
.filter(val => val % 2 === 0)
.skip(3)
.subscribe(
v => {
console.log('Value', v)
},
e => {
console.log('Error', e)
},
() => {
console.log('Completed')
}
)

debounce,debounceTime

两个操作符的作用都是节流器,限制一定时间内的输出,但是在使用上会有一些不同,所以我们分别来进行介绍

debounceTime

直接使用,后面传入指定的时间即可,即可达到在规定的时间内节流的作用

1
2
3
4
5
6
7
8
Rx.Observable.fromEvent(number, 'keyup')
.pluck('target', 'value')
.debounceTime(300)
.subscribe(
v => { console.log(v) },
e => { console.log(e) },
() => { console.log('complete') }
)

debounce

debounceTime 时分类似,但是静默时间段由第二个 Observable 决定,所以可以操作的空间就比较广泛

1
2
3
4
5
6
7
8
9
Rx.Observable.fromEvent(number, 'keyup')
.pluck('target', 'value')
// 需要注意,如果写成 debounce(() => { return Rx.Observable.interval(300) }) 这样带有 {} 的形式,需要加上 return
.debounce(() => Rx.Observable.interval(300))
.subscribe(
v => { console.log(v) },
e => { console.log(e) },
() => { console.log('complete') }
)

distinct,distinctUntilChanged

distinct

它的作用是将整个序列的流中不一样的保留下来,一样的重复的则过滤掉,比如下面这个示例,如果将输入框中的元素选定或者删除,然后从新输入相同的内容,是不会触发 keyup 事件的

1
2
3
4
5
6
7
8
Rx.Observable.fromEvent(number, 'keyup')
.pluck('target', 'value')
.distinct()
.subscribe(
v => { console.log(v) },
e => { console.log(e) },
() => { console.log('complete') }
)

使用的时候需要小心,尤其是应用在无尽序列当中,因为会极大的消耗内存

distinctUntilChanged

它的作用是只和前一个元素进行对比,前一个元素如果跟其一样,那就抛弃掉

1
2
3
4
5
6
7
8
Rx.Observable.fromEvent(number, 'keyup')
.pluck('target', 'value')
.distinctUntilChanged()
.subscribe(
v => { console.log(v) },
e => { console.log(e) },
() => { console.log('complete') }
)

merge,concat,startWith

这几个操作符的作用都是类似的,都是针对多个流,两个或者两个以上的流进行合并,只是合并的方式不同

merge

两个流,按各自的事件顺序进行合并,严格有时间交叉,一种简单的合并,不回去更改两条流的任何东西

1
2
3
4
5
6
7
8
9
10
11
12
const a = document.querySelector('#a')
const b = document.querySelector('#b')

const a$ = Rx.Observable.fromEvent(a, 'keyup').pluck('target', 'value')
const b$ = Rx.Observable.fromEvent(b, 'keyup').pluck('target', 'value')

Rx.Observable.merge(a$, b$)
.subscribe(
(v) => {
console.log(v)
}
)

在两个输入框内分别输入值,则会交替的输出每个输入框内的值

concat

严格来说不属于合并,应该属于对接,等待前一个流完成了之后,才会进行下一个流,尽管后面的流的序列很快的执行,也会等待前一个流完成之后才会进行输出

1
2
3
4
5
6
Rx.Observable.concat(a$, b$)
.subscribe(
(v) => {
console.log(v)
}
)

从运行结果可以看出,第一个输入框可以正常的输出内容,但是触发第二个输入框的时候是没有反应的,原因是因为第一个输入框是一个无尽序列,理论上只有第一个序列完成后才会输出后面的值,稍作修改

1
2
3
4
5
6
7
8
9
const a$ = Rx.Observable.fromEvent(a, 'keyup').pluck('target', 'value')
const c$ = Rx.Observable.from([1, 2, 3, 4])

Rx.Observable.concat(c$, a$)
.subscribe(
(v) => {
console.log(v)
}
)

这样就可以看到输入的值了,因为只有等待第一个序列完成后后续序列输入的值才会输出

startWith

如果希望这个流在一开始的时候就有一个值可以发射出来的话,就可以使用 startWith 设置一个默认值,类似初始值,类似于在序列前 concat 一个值

1
2
Rx.Observable.from([1, 2, 3, 4]).startWith(0).subscribe(v => console.log(v))
// 输出 0 1 2 3 4

一般都是用来赋予初始值,避免在一开始的时候流是空的

combineLatest,withLatestFrom,zip

combineLatest

组成它的任何一个流当中有新元素出现的话,那么它就会产生一个新流当中对应的一个数据,和 withLatestFrom 相比的话,无论哪个流有改变均会有输出

zip

一对一的输出,和 combineLatest 类似,但是严格要求必须配对,即全都需要新的数据,combineLatestzip 的示例可以参考文章开头部分

withLatestFrom

以一个流为主,然后这个流产生的数据的时候会去获取另一个流的最新值,注意输出的结果为数组

1
2
3
4
5
6
7
8
9
const a$ = Rx.Observable.fromEvent(a, 'keyup').pluck('target', 'value')
const b$ = Rx.Observable.fromEvent(b, 'keyup').pluck('target', 'value')

a$.withLatestFrom(b$)
.subscribe(
(v) => {
console.log(v)
}
)

简单来说,只有当第一个输入框内的值发生变化的时候,才会去取第二个输入框内的值组合成一个数组发送出来,所以改变第二个输入框内的值是不会引起变化的

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×