0%

Rxjs常见操作符

一.RxJS 初试

  • 在 javascript 中的试炼
1
2
3
4
5
const height = document.getElementById('height');
const height$ = Rx.Observable.fromEvent(height, 'keyup');//$是约定俗称的命名方式,因为通过Rx.Observable已经将其转换成了流形式,其中含义是:将keyup事件转换成了Observable观察者
//既然是观察者,则可以订阅
height$.subscribe(val => console.log(val));//当输入的时候会输出事件的对象,获得值则使用val.target.value
// 输入是1,console打印的就是1
  • RxJs 的威力在于合并和转换流,实例如下

(1)html

1
2
3
<input type="text" id="length"/>
<input type="text" id="width" />
<div id="area"></div>

(2)js

1
2
3
4
5
6
7
8
const length = document.getElementById('height');
const width = document.getElementById('width');
const area = document.getElementById('area');
const length$ = Rx.Observable.fromEvent(length, 'keyup').pluck('target','value');
const width$ = Rx.Observable.fromEvent(width, 'keyup').pluck('target','value');
//combineLatest表示如果监听的两个值有一个改变则会重新计算一遍
const area$ = Rx.Observable.combineLatest(length$, width$, (l,w)=>{return l*w});//用于将两个流合并成一个数据流
area$.subscribe(val => area.innerHTML = val);
  • 事件流:理解 Rx 的关键是要把任何变化想象成事件流

二.RxJS 常见操作符

1.常见创建类操作符

  • from:可以把数组、Promise、以及 Iterable 转化为 Observable
  • fromEvent:可以把事件转化为 Observable
  • of:接受一系列的数据,并把它们 emit 出去
    • 如可以使用object$ = Rx.Observable.of({id:1,value:20})转化对象,使用的时候应该是 object.value 或 object.id 的方式

2.常见转换操作符

  • map
    • 是 mapTo、pluck 操作符的根本
    • 举例将上述 pluck 操作符取 target 的 value 值转换成使用 map 操作符
1
const width$ = Rx.Observable.fromEvent(width, 'keyup').map(ev=>ev.target.value);
  • 在 angular 中灵活使用 Observable 如下
1
2
3
4
5
// 定义一个发起网络请求获取Observable的方法
getQuote():Observable<Quote> {
const uri = 'http://localhost:3000';
return this.http.get(uri)
.map(res => res.json() as Quote);// 将请求返回的内容转换成json并转换成对应定义的Quote对象
  • mapTo

    • 比如点击按钮事件或其他事件,我们不需要关注其值内容,只需要知道发生了即可,可以使用 mapTo 定义成一个固定值

    • 实例代码

1
2
3
const width$ = Rx.Observable.fromEvent(width, 'keyup').mapTo(1);//此时执行keyup事件,width$的值即为1
//等同于如下
const width$ = Rx.Observable.fromEvent(width, 'keyup').map(_ => 1);//此时执行keyup事件,width$的值即为1
  • pluck

3.Observable 的性质

  • Observable 有三种状态(即 subscribe 的三个参数):next、error、complete
    • next 是正常执行时的内容,subscribe 的第一个参数
    • error 是当执行时出错,或监听了 throw 类型 Observable 时的执行内容,subscribe 的第二个参数
    • complete 是 Observable 执行结束时的内容,subscribe 的第三个参数
  • 特殊的 Observable:永不结束、Never、Empty(结束但不发射)、Throw
    • Never 类型 Observable 表示不会发生也永远不会结束,会在执行过程中导致,也可直接通过 Rx.Observable.error(‘xxx’)的方式声明,结果不会执行 next、error、complete 中任何一个阶段
    • Empty 类型 Observable 不会发射元素会直接结束,会在执行过程中导致,也可以通过 Rx.Observable.empty()的方式声明,结果不会执行 next、error,会执行 complete 阶段
    • Throw 类型 Observable 直接进入 error 状态,会在执行过程中抛出异常导致,也可以直接通过 Rx.Observable.throw(‘xxx’)的方式声明,结果只会执行 error 阶段

4.常见工作操作符:do

  • do 操作符用于在流处理期间对数据进行操作,实例如下
1
2
3
4
5
const interval$ = Rx.Observable.interval(100)
.do(v => {
console.log('val is :'+v);
})
.take(3);

5.常见变换类操作符:scan

  • scan(()=>{})接受一个函数,参数 1 是上次处理后的结果,参数 2 是新值内容。实例代码
1
2
3
4
5
6
7
8
9
10
const interval$ = Rx.Observable.interval(100)
.filter(v => val%2 === 0) //表示是偶数才放行
.scan((x,y)=>{return x+y})//表示结果累加操作
.take(4);
//输出结果
// 0[0+0]
// 2[上次结果0+新值2]
// 6[上次结果2+新值4]
// 12[上次结果6+新值6]
//complete执行内容

6.常见数学类操作符:reduce

  • reduce 是将流计算结果做统一的最后处理并发射,实例代码
1
2
3
4
5
6
const interval$ = Rx.Observable.interval(100)
.filter(v => val%2 === 0) //表示是偶数才放行
.take(4)
.reduce((x,y)=>{return x+y});
//输出结果
// 12[只发射最终值]

7.过滤类操作符:filter、take、first/last、skip…

  • take(num)表示取流中前 num 个
  • filter(()=>{})表示对流处理的放行判断,如果满足条件则放行,不满足条件则不放行
1
2
3
const interval$ = Rx.Observable.interval(100)
.filter(v => val%2 === 0) //表示是偶数才放行
.take(3);//订阅后会输出0 2 4
  • first()表示取流第一个,相当于 take(1)
  • last()表示取流最后一个
  • skip(num)表示跳过前 num 个
1
2
3
const interval$ = Rx.Observable.interval(100)
.filter(v => val%2 === 0) //表示是偶数才放行
.skip(2);//订阅后会输出4 6 8 ...

8.常见创建类操作符:Interval、Timer

  • Interval 实例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const interval$ = Rx.Observable.interval(100).take(3);
interval$.subscribe(
val => {
console.log(val) //next状态
},
err => {
console.log(err) // error状态
},
()=> {
console.log('I am complete') // complete状态
}
);
//输出结果:interval()是做循环用的,每次发射出来的是索引,故生成的是0/1/2/3/4....,又由于take表示取前多少个,故take(3)表示取前3个
// 0
// 1
// 2
// "I am complete" // complete状态执行的内容
  • Timer 实例代码
1
2
3
4
5
6
7
8
const timer$ = Rx.Observable.timer(100,200);//表示100毫秒之后启动,之后以200毫秒的频率一直发送
// 故timer比Interval多出起始延迟时间的设置
timer$.subscribe(v=> console.log(v))
//输出结果
// 0 --运行后100毫秒之后输出
// 1 --输出0后200毫秒之后输出
// 2 --输出1后200毫秒之后输出
// 3 --...

9.实例:自行给 rxjs 中 Observable 添加 debug 方法

  • 通过 Observable 的原型中定义 debug 方法返回 Observable 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import {Observable} from 'rxjs/Observable';
import {environment} from '../../environment/environment'
// typescript用于解决自行添加对象属性的报错问题
declare module 'rxjs/Observable' {
interface Observable<T> {
debug: (...any) => Observable<T>;
}
}
// 给Observable添加debug方法
Observable.prototype.debug = function(message:string) {
return this.do(
(next)=>{
if(!environment.production) {
console.log(message,next)
}
},
(err) => {
if(!environment.production) {
console.error('ERROR>>',message,err);
}
},
()=> {
if(!environment.production) {
console.log('Completed -');
}
}
)
}
//使用时直接在声明Observable期间添加即可以查看对应Observable对象内容,如
this.http.get('url')
.debug('Test:')
.map(res => res.json() as Quote);
//在subscribe监听后的会输出对应的log

10.过滤类操作符:debounce、debounceTime

  • debounce:比 debounceTime 灵活,debounceTime 只能设定固定的毫秒间隔,而 debounce 可以通过接受的 function 设定毫秒间隔
1
2
const length$ = Rx.Observable.fromEvent(length,'keyup').pluck('target','value').debounce(()=>Rx.Observable.interval(300));//可以自行修改function返回内容从而决定滤掉的内容
//上述代码含义是:过滤掉300毫秒以内keyup事件监听内容
  • debouceTime(num):时间滤波器,表示只关注大于等于 num 毫秒间隔的事件内容
1
2
const length$ = Rx.Observable.fromEvent(length,'keyup').pluck('target','value').debounceTime(300);
//上述代码含义是:过滤掉300毫秒以内keyup事件监听内容
  • 执行图片

image

11.过滤类操作符:distinct、distinctUtilChanged

  • distinct:整个序列中,过滤一样的,保留不一样的(要求序列中没有重复的元素)
1
const length$ = Rx.Observable.fromEvent(length,'keyup').pluck('target','value').distinct();//过滤掉整个流中重复的元素
  • distinctUtilChanged:只跟前一个元素比,过滤一样的,保留不一样的
1
const length$ = Rx.Observable.fromEvent(length,'keyup').pluck('target','value').distinctUtilChanged();//过滤掉流中前一个重复的元素
  • 执行图片

image

12.合并类操作符:merge、concat、startWith

  • merge:在整个序列中按照流运行状态进行合并
  • concat:在整个序列中将流前后拼接(如拼接的第一个流是无尽流,则永远只会输出第一个流内容,因为第二个流永远不会发生,第一个流没有执行完)
  • startWith:设定流发射的初始值
  • 执行图片
    image

13.合并类操作符:combineLatest、withLatestFrom、zip

image

  • 通过 combineLatest 可以对两个流进行对应的处理操作,实例请参照开头计算面积
  • zip 有对齐的感觉,将两个流对应位置的元素进行处理操作,慢的流决定最终 zip 生成流的速度
  • withLatestFrom 当基准流改变时才会进行流处理,使用方式是基准流.withLatestFrom(其他流),如下
1
const merged$ = length$.withLatestFrom(width$);
  • 区别:zip 有对齐的特性,withLatestFrom 是以源事件流为基准,combineLatest 是无论任何一个流发生改变时都会处理