本章我们继续来看 Node.js
当中的 EventEmitter
模块,在 Node.js
当中许多对象都会分发事件,比如一个 net.Server 对象会在每次有新连接时触发一个事件,一个 fs.readStream 对象会在文件被打开的时候触发一个事件,然而所有这些产生事件的对象都是 events.EventEmitter
的实例
本文主要分为以下几个部分,首先先介绍一下 EventEmitter
类,然后简单的过一遍源码,最后在自己动手来实现一个简单的 EventEmitter
模块,下面就让我们一步一步来实现吧
EventEmitter 类
events
模块只提供了一个对象 events.EventEmitter
,其核心就是事件触发与事件监听器功能的封装,可以通过 require('events')
来访问该模块,如下
1 | // 引入 events 模块 |
EventEmitter
对象如果在实例化时发生错误,会触发 error
事件,
关于
EventEmitter
里面的error
事件有一点需要注意的地方,EventEmitter
即使绑定了error
事件也是不会输出的,而是会在控制台打印该异常的堆栈信息,并结束进程
如下
1 | eventEmitter.on('error', function (err) { |
经过测试可以发现,绑定 error
事件只能自己触发,比如使用 eventEmitter.emit('error')
,当没有错误时,会在控制台打印 Error:undefined
,而当有错误时不会打印而是直接打印该异常的堆栈信息,并结束进程
如果想要获取异常只能通过
try catch
,更多关于EventEmitter
模块的异常处理可见 EventEmitter 模块的异常处理
当添加新的监听器时,newListener
事件会触发,当监听器被移除时,removeListener
事件被触发(这两个是 Node.js
的 EventEmitter
模块自带的特殊事件)
以上就是 EventEmitter
的简单使用方式,但是在深入了解 EventEmitter
之前,我们先来了解一些前置知识
观察者模式
源码部分主要参考的是 深入 EventEmitter
观察者模式是 软件设计模式 的一种,在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知,这通常透过呼叫各观察者所提供的方法来实现,此种模式通常被用来实时事件处理系统 —— 维基百科
EventEmitter
本质上是一个观察者模式的实现,所谓观察者模式,它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己,我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念,期刊订阅包含两个主要的角色,期刊出版方和订阅者,它们之间的关系如下
- 期刊出版方,负责期刊的出版和发行工作
- 订阅者,只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知
在观察者模式中也有两个主要角色,观察者和被观察者,它们之间的关系图如下
观察者模式的优点是支持简单的广播通信,自动通知所有已经订阅过的对象,并且目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用,但是一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间,如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃
下面我们来看一个观察者模式的应用,最为常见的例子就是为 DOM
对象添加事件监听,如下
1 | <button id="btn">确认</button> |
1 | function clickHandler(event) { |
当按钮在点击当同时会触发我们事先绑定好的事件 clickHandler
,会在控制台打印出 按钮已点击!
字样
发布/订阅模式
在 软件架构 中,发布/订阅模式是一种 消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者),而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在,同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在,—— 维基百科
发布/订阅模式与观察者模式非常类似,它们最大的区别是
- 发布者和订阅者不知道对方的存在
- 它们之间需要一个第三方组件,叫做信息中介,它将订阅者和发布者串联起来,它过滤和分配所有输入的消息
- 发布/订阅模式用来处理不同系统组件的信息交流,即使这些组件不知道对方的存在
那么信息中介是如何过滤消息呢?在发布/订阅模型中,订阅者通常接收所有发布的消息的一个子集,选择接受和处理的消息的过程被称作过滤,有两种常用的过滤形式,基于主题的和基于内容的
- 在『基于主题』的系统中,消息被发布到主题或命名通道上,订阅者将收到其订阅的主题上的所有消息,并且所有订阅同一主题的订阅者将接收到同样的消息,发布者负责定义订阅者所订阅的消息类别
- 在『基于内容』的系统中,订阅者定义其感兴趣的消息的条件,只有当消息的属性或内容满足订阅者定义的条件时,消息才会被投递到该订阅者,订阅者需要负责对消息进行分类
一些系统支持两者的混合,即发布者发布消息到主题上,而订阅者将基于内容的订阅注册到一个或多个主题上,基于主题的通信基础结构图如下
最后我们再来总结一下观察者模式与发布/订阅模式之间的区别
观察者模式 VS 发布/订阅模式
根据图片可知,两者的区别如下
- 在观察者模式中,观察者知道
Subject
的存在,Subject
一直保持对观察者进行记录,然而,在发布/订阅模式中,发布者和订阅者不知道对方的存在,它们只有通过信息中介进行通信 - 在发布订阅模式中,组件是松散耦合的,正好和观察者模式相反
- 观察者模式大多数时候是同步的,比如当事件触发
Subject
就会去调用观察者的方法,而发布/订阅模式大多数时候是异步的(使用消息队列)
在大致了解了以上内容之后,下面我们就来正式的看一看 Node.js
当中的 EventEmitter
模块
EventEmitter 模块
在 Node.js
当中,大多数的核心 API
都采用惯用的异步事件驱动架构,所有能触发事件的对象都是 EventEmitter
类的实例,这些对象开放了一个 eventEmitter.on()
函数,允许将一个或多个函数绑定到会被对象触发的命名事件上,当 EventEmitter
对象触发一个事件时,所有绑定在该事件上的函数都被同步地调用, 监听器的返回值会被丢弃,本文主要介绍以下几个核心方法
on(event, listener)
- 为指定事件添加一个监听器到监听器数组的尾部(还有一个
addListener
方法,本质与on
是一致的,见下方)
- 为指定事件添加一个监听器到监听器数组的尾部(还有一个
emit(event, [arg1], [arg2], [...])
- 按监听器的顺序执行执行每个监听器,如果事件有注册监听返回
true
,否则返回false
- 按监听器的顺序执行执行每个监听器,如果事件有注册监听返回
once(event, listener)
- 为指定事件注册一个单次监听器,即监听器最多只会触发一次,触发后立刻解除该监听器
removeListener(event, listener)
- 移除指定事件的某个监听器,监听器必须是该事件已经注册过的监听器,它接受两个参数,第一个是事件名称,第二个是回调函数名称
EventEmitter 基本使用
我们首先先来看一个最基本的 EventEmitter
功能,包含了一个观察者和一个被监听的对象,对应的实现就是 EventEmitter
中的 on
和 emit
1 | const EventEmitter = require('events') |
我们自定义 MyEmitter
类,该类继承于 EventEmitter
类,接着我们通过使用 new
关键字创建了 myEmitter
实例,然后使用 on()
方法监听 event
事件,最后利用 emit()
方法触发 event
事件
EventEmitter 构造函数
源码可见 EventEmitter.init.call(this),如下
1 | function EventEmitter() { |
在 EventEmitter
构造函数内部,会调用 EventEmitter.init
方法执行初始化操作,EventEmitter.init
的具体实现如下
1 | EventEmitter.init = function () { |
在 EventEmitter.init
内部,会根据条件执行初始化操作,这里有一个比较重要的操作 this._events = Object.create(null)
,那么问题来了,为什么要使用 Object.create(null)
来初始化一个新对象而不用更简洁的 {}
呢?简单来说,两者的区别可见下图所示
对比可以发现,使用 create
创建的对象,没有任何属性,可以把它当作一个非常纯净的 map
来使用,可以自己定义 hasOwnProperty
、toString
方法而不必担心会将原型链上的同名方法覆盖掉
on()
源码可见 EventEmitter.prototype.addListener,如下
1 | EventEmitter.prototype.addListener = function addListener(type, listener) { |
通过源码可以发现,其实 EventEmitter
实例上的 addListener
和 on
方法均是调用的 _addListener()
方法,下面我们就来看看 _addListener 的具体实现(这里只截取了一些主要实现部分)
1 | // 接收四个参数,依次为 |
简单的总结一下,主要流程为以下四个步骤
- 验证监听器是否为函数对象
- 避免类型为
newListener
的事件类型造成递归调用,优先触发 - 优化单个监听器的场景,不需使用额外的数组对象
- 基于
prepend
参数的值,控制监听器的添加顺序
emit()
源码见 EventEmitter.prototype.emit,如下
1 | EventEmitter.prototype.emit = function emit(type, ...args) { |
剔除掉多余的部分以后可以发现,逻辑还是比较好理解的,先根据事件类型获取对应的处理器,然后根据事件处理器的类型,选择直接调用或者是循环调用
上面我们简单的介绍了 on()
和 emit()
两个方法,它们主要用来添加事件监听和触发事件监听,下面我们再来看看如何移除事件监听
removeListener()
源码见 EventEmitter.prototype.emit,removeListener()
方法最多只会从监听器数组里移除一个监听器实例,如果任何单一的监听器被多次添加到指定 type
的监听器数组中,则必须多次调用 removeListener()
方法才能移除每个实例,为了方便一次性移除 type
对应的监听器,EventEmitter
为我们提供了 removeAllListeners()
方法
1 | EventEmitter.prototype.removeListener = |
我们可以简单的梳理一下 removeListener()
方法,当我们在调用 removeListener()
方法时,若 type
事件类型上绑定多个事件处理器,那么内部处理程序会先根据 listener
事件处理器,查找该事件处理器对应的索引值,然后在根据索引值的不同再进行不同的处理
但是可以发现,在处理移除对应的事件处理器的时候使用了 spliceOne()
方法,那么为什么不直接利用 Array
的 splice()
方法呢?官方的回答是 spliceOne()
方法的执行速度比 Array#splice()
快大约 1.5
倍,实现如下
1 | // About 1.5x faster than the two-arg version of Array#splice(). |
大致原理是从需要删除的位置开始,依次将后一个元素与前一个元素的位置进行互换,在删除掉最后一项
最后我们来介绍一下 EventEmitter
另一个常用的方法 once()
once()
源码见 EventEmitter.prototype.once,如下
1 | function onceWrapper() { |
梳理后可以发现,once()
方法依次调用了 _onceWrap()
和 onceWrapper()
方法,使用 state
对象的 fired
属性,用来标识是否已触发
手动实现一个 EventEmitter 模块
我们下面就来尝试着手动的实现一个 EventEmitter
模块,我们都知道,每一个 EventEmitter
实例都有一个包含所有事件的对象 _events
,事件的监听和监听事件的触发,以及监听事件的移除等事件都是在这个 _events
对象的基础上实现,我们首先先成一个 EventEmitter
类,在类的初始化方法中生成这个事件对象 _events
1 | class EventEmitter { |
_eventsCount
用于统计事件的个数,也就是 _events
对象有多少个属性,下面我们来实现 emit
方法,emit
所做的事情是在 _events
对象中取出相应 type
的属性,并执行属性所对应的函数
1 | class EventEmitter { |
emit
方法是触发事件,并执行相应的方法,而 on
方法则是对于指定的事件添加监听函数
1 | on(type, listener, prepend){ |
- 如果
_events
存在newListener
属性,也就是说_event
存在监听newListener
监听事件,那么每次on
方法添加事件的时候,都会emit
出一个newListener
- 且在
on
方法的参数中,第三个参数用于指定是在相应事件类型属性所对应的数组头部添加还是尾部添加 - 在
on
方法中为了可以链式的调用,所以返回了EventEmitter
模块的实例化本身
在 on
方法的基础上可以实现 addListener
方法和 prependListener
方法
1 | EventEmitter.prototype.addListener = EventEmitter.prototype.on |
再来看看 removeListener
1 | removeListener(type, listener){ |
以上就是几个核心方法的实现,removeAllListener
与 removeListener
相似,只要找到传入的 type
所对应属性的值,没有遍历过程,直接删除这个属性即可
EventEmitter 模块的异常处理
最后我们再来看看 EventEmitter
模块的异常处理的几种方式,比较常用的有以下几种方式
try-catch
domains
process.on('uncaughtException')
try catch 异常处理方式
在 Node.js
中也可以通过 try catch
方式来捕获和处理异常,比如
1 | try { |
上述 let x = x
赋值语句的错误会被捕获,这里提异常处理,那么跟事件有什么关系呢?Node.js
中有一个特殊的事件 error
,如果异常没有被捕获,就会触发 process
的 uncaughtException
事件抛出,如果你没有注册该事件的监听器(即该事件没有被处理),则 Node.js
会在控制台打印该异常的堆栈信息,并结束进程(崩溃),比如
1 | var events = require('events') |
在上述代码中没有监听 error
的事件函数,因此会触发 process
的 uncaughtException
事件,从而打印异常堆栈信息,并结束进程,对于阻塞或者说非异步的异常捕获,try catch
是没有问题的,但是问题在于 try catch
不能捕获非阻塞或者异步函数里面的异常,举例来说
1 | try { |
上述代码中,因为 try
方法里面是同步的,因此可以捕获异常,但是如果 try
方法里面有异步的函数
1 | try { |
因为 process.nextTick
是异步的,因此在 process.nextTick
内部的错误不能被捕获,也就是说 try catch
不能捕获非阻塞函数内的异常
通过 domains 管理异常
Node.js
中 domain
模块能被用来集中地处理多个异常操作,通过 Node.js
的 domain
模块可以捕获非阻塞函数内的异常
1 | var domain = require('domain') |
同样的,即使 process.nextTick
是一个异步函数,domain.on
方法也可以捕获这个异步函数中的异常,即使更复杂的情况下,比如异步嵌套异步的情况下,domain.on
方法也可以捕获异常
1 | var domain = require('domain') |
在上述的情况下,即使异步嵌套很复杂,也能在最外层捕获到异常,但是往往现实并没有那么美好,在 Node.js
最新的文档中,domain
已经被废除了(Deprecated
),这是因为 domain
从诞生之日起就有着缺陷,举例来说
1 | var domain = require('domain') |
如上述的代码是无法捕获到异常 Error
的,原因在于发出异常的 EventEmitter
实例 e
,以及触发异常的定时函数 timer
没有被 domain
包裹,domain
模块是通过重写事件循环中的 nextTick
和 _tickCallback
来事件将 process.domain
注入到 next
包裹的所有异步事件内,解决上述无法捕获异常的情况,只需要将 e
或者 timer
包裹进 domain
1 | d.add(e) |
就可以成功的捕获异常,但是 domain
模块已经在 Node.js
最新的文档中被废除了
process.on(‘uncaughtException’)
Node.js
中提供了一个最外层的捕获异常的方法,非阻塞或者异步函数中的异常都会抛出到最外层,如果异常没有被捕获,那么会暴露出来被最外层的 process.on('uncaughtException')
所捕获
1 | try { |
这样就能在最外层捕获异步或者说非阻塞函数中的异常,但是需要注意避免 uncaughtException
错误引起 Node.js
进程崩溃