提问者:小点点

stream. Transform项目的延迟处理


假设我们有一个简单的node. js转换流:

export class JSONParser extends stream.Transform {

  constructor() {
    super({objectMode: true});
  }
}

我想同步处理项目一段时间,然后延迟剩余部分。类似这样:

export class JSONParser extends stream.Transform {

  count = 0;

  constructor() {
    super({objectMode: true});
  }

  _transform(chunk, encoding, cb) {

   const modifiedChunk = this.modify(chunk);

   if(count++ % 55 === 0){
     process.nextTick(() => this.push(modifiedChunk));
     return;
   }

     this.push(modifiedChunk);
  }
}

理论上,这意味着对于每55个左右的项目,流将等待下一个滴答来处理剩余的项目。问题-

>

  • 这确实会延迟所有剩余项目的进程,还是仅仅延迟这一个块?它会保持被推送的块的顺序吗?

    我相信令牌桶算法可以做速率限制,也许这是实现非事件循环阻塞流的更好方法?


  • 共3个答案

    匿名用户

    process. nextTick是异步方法。它将在现有流程堆栈完成后调用您传递的方法。

    因此,将被推送的项目的顺序将是(假设有112个项目):

    1,2,3,4...54, 56, 57,..., 109, 111, 112, 55, 110

    匿名用户

    在这种情况下,最好使用setIm和not process. nextTick,因为后者会饿死I/O:

       if(count++ % 55 === 0){
         setImmediate(() => this.push(modifiedChunk));
         return;
       }
    

    而不是:

       if(count++ % 55 === 0){
         process.nextTick(() => this.push(modifiedChunk));
         return;
       }
    

    匿名用户

    我会试着回答你的问题并解释原因:

    1. 这确实会延迟所有剩余项目的进程,还是仅仅延迟这一个块?它会保持被推送的块的顺序吗?

    是的,但是你需要做一个小的修正。在pushcb被调用之前,转换流不会调用_transform方法,但是-请注意你实际上根本没有调用cb。你应该在流很好地处理下一个块之后这样做:

      _transform(chunk, encoding, cb) {
    
       const modifiedChunk = this.modify(chunk);
    
        if(count++ % 55 === 0){
          process.nextTick(() => {
            this.push(modifiedChunk); 
            cb()
          });
          return;
        }
    
        this.push(modifiedChunk);
        cb();
    
      }
    

    你写的算法似乎没有做实际的速率限制——至少在每秒块的意义上没有。它只是推迟了一些处理,然后每隔这么多块进行下一次勾选。

    令牌桶将是一个很好的解决方案,您甚至可以使用如下转换方法创建一个简单的PassDirect流:

    new PassThrough({transform(...args) {
       // some logic to see if you can push out some chunks
       // if so:
       return super._transform(...args)
       // otherwise
       return bucket.ready().then(() => super._transform(...args));
    }   
    

    如果你需要一些想法,这里有一个我实现的速率限制示例超燃冲压发动机。它在操作上类似于令牌桶,但它是基于时间的,而不是基于桶大小的——尽管我认为它解释了相同的结果。