node 路由间事件通信问题

路由间事件通信问题,通过事件监听器问题

...

const EventEmitter = require('events')
const eventBus = new EventEmitter()


fastify.get('/', (request, reply) => {
  // 创建 sse (`EventSource`)事件流
  const sseStream =new eventSourceStream();
  // eventBus 添加 ‘evt’ 事件监听器
  const handleEvt = ()=>{
    sseStream.write({
      event:'visit',
      data: new Deta().getTime(),
    })
  };
  /* **问题在这里每次访问该路由,都会向eventBus添加监听器,导致监听器累计添加,多次触发;预期只想触发一次,该如何实现 sse推送** */
  eventBus.on('evt',handleEvt)
  sseStream.write()
  reply.type('text/event-stream').send(sseStream)
})

// 访问 ‘/visit’时触发,eventBus 上的 ‘evt' 事件,进而触发推送
fastify.get('/visit', (request, reply) => {
  eventBus.emit('evt')
  reply.send({ hello: 'world' })
})

...
阅读 1.9k
2 个回答

问题或许说的不够明确,主要还是被sse流给困住了;后来想想,只需要复制sse流即可解决问题;

...
 // 创建 sse (`EventSource`)事件流
 const sseStream =new eventSourceStream();


fastify.get('/', (request, reply) => {
  // 复制 sse (`EventSource`)事件流
  const k = new PassThrough();
  sseStream.pipe(k);
  
  sseStream.write()
  reply.type('text/event-stream').send(k)
})

// 访问 ‘/visit’时触发,只需要向流中写入东西即可
fastify.get('/visit', (request, reply) => {
   sseStream.write({
      event:'visit',
      data: new Deta().getTime(),
    })
  reply.send({ hello: 'world' })
})

...

其实不复制流也可以解决,只需要继承 node 标准库中 events ,重写 on 事件,使其监听的每个事件只添加一次就行了(默认是可以多次添加的,最多10次,超过会警告,因存在内存泄漏问题)

const EventEmitter = require("events");

// 单事件触发器(系统默认为多事件共存)
class SingleEventEmitter extends EventEmitter {
  constructor(opts) {
    super(opts);
    this._maxListeners = 1;
  }
  on(eventName, listener) {
    // 修改了默认同名事件可共存行为,使其只能存在一份,后边的覆盖之前的事件
    this._events[eventName] = [listener];
    // 需要手动设置事件数量,不然 eventNames 方法获取不到事件名称列表
    this._eventsCount = Object.keys(this._events).length;
  }
}

那就添加的是否做下判断是否有加过吧

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题