假设我们有一个简单的node. js转换流:
export class JSONParser extends stream.Transform {
constructor() {
super({objectMode: true});
}
}
我想同步处理项目一段时间,然后延迟剩余部分。类似这样:
export class JSONParser extends stream.Transform {
count = 0;
constructor() {
super({objectMode: true});
}
_transform(chunk, encoding, cb) {
const modifiedChunk = this.modify(chunk);
if(count++ % 55 === 0){
process.nextTick(() => this.push(modifiedChunk));
return;
}
this.push(modifiedChunk);
}
}
理论上,这意味着对于每55个左右的项目,流将等待下一个滴答来处理剩余的项目。问题-
>
这确实会延迟所有剩余项目的进程,还是仅仅延迟这一个块?它会保持被推送的块的顺序吗?
我相信令牌桶算法可以做速率限制,也许这是实现非事件循环阻塞流的更好方法?
process. nextTick是异步方法。它将在现有流程堆栈完成后调用您传递的方法。
因此,将被推送的项目的顺序将是(假设有112个项目):
1,2,3,4...54, 56, 57,..., 109, 111, 112, 55, 110
在这种情况下,最好使用setIm和not process. nextTick,因为后者会饿死I/O:
if(count++ % 55 === 0){
setImmediate(() => this.push(modifiedChunk));
return;
}
而不是:
if(count++ % 55 === 0){
process.nextTick(() => this.push(modifiedChunk));
return;
}
我会试着回答你的问题并解释原因:
是的,但是你需要做一个小的修正。在pushcb
被调用之前,转换流不会调用_transform
方法,但是-请注意你实际上根本没有调用cb
。你应该在流很好地处理下一个块之后这样做:
_transform(chunk, encoding, cb) {
const modifiedChunk = this.modify(chunk);
if(count++ % 55 === 0){
process.nextTick(() => {
this.push(modifiedChunk);
cb()
});
return;
}
this.push(modifiedChunk);
cb();
}
你写的算法似乎没有做实际的速率限制——至少在每秒块的意义上没有。它只是推迟了一些处理,然后每隔这么多块进行下一次勾选。
令牌桶将是一个很好的解决方案,您甚至可以使用如下转换方法创建一个简单的PassDirect
流:
new PassThrough({transform(...args) {
// some logic to see if you can push out some chunks
// if so:
return super._transform(...args)
// otherwise
return bucket.ready().then(() => super._transform(...args));
}
如果你需要一些想法,这里有一个我实现的速率限制示例超燃冲压发动机
。它在操作上类似于令牌桶,但它是基于时间的,而不是基于桶大小的——尽管我认为它解释了相同的结果。