node pipe流时 双工流中当前流如何获取到上一个流已经结束

node pipe流时 双工流中当前流如何获取到上一个流已经结束。如下代码中在SetIndexHtmlDuplexStream. prototype._write中我无法知道上一个可读流中何时结束,所以无法知道在什么时候 this.push(null) 造成无法触发currentCreateWriteStream finish事件。麻烦知道的大神指点一条明路

util.inherits(SetIndexHtmlDuplexStream, stream.Duplex);

function SetIndexHtmlDuplexStream(program, opt) {
    stream.Duplex.call(this, {
        objectMode: true,
        // allowHalfOpen: false
    });
    this.program = program;
}

SetIndexHtmlDuplexStream.prototype._write = function(chunk, encoding, callback) {

    let {
        output,
        name
    } = this.program;
    chunk = chunk.toString()
        .replace(/(src\=\"\.\/)(.+?)(\")/, '$1' + output + '$3') //设置rollup.config.js input 入口文件名字
        .replace(/(new\s)(.+?)(\(\))/, '$1' + name + '$3') //设置rollup.config.js input 入口文件名字

    this.push(chunk);

    callback();

}

SetIndexHtmlDuplexStream.prototype._read = function(size) {
}

let currentCreateReadStream = fs.createReadStream(inputPath);
let currentCreateWriteStream = fs.createWriteStream(outputPath);
const setIndexHtmlDuplexStream = new SetIndexHtmlDuplexStream(program);


currentCreateReadStream.pipe(setIndexHtmlDuplexStream).pipe(currentCreateWriteStream);

currentCreateWriteStream.on('finish', function() {
        console.log('   \x1b[36mcreate\x1b[0m : ' + path.relative(cwd, outputPath));
});
阅读 3.4k
1 个回答

查看了源码发现pipe其实是做了上一个流结束会触发下一个流结束的操作。

var endFn = doEnd ? onend : cleanup;
    if (state.endEmitted)
        process.nextTick(endFn);
    else
        src.once('end', endFn);

    dest.on('unpipe', onunpipe);

    function onunpipe(readable) {
        debug('onunpipe');
        if (readable === src) {
            cleanup();
        }
    }

    function onend() {
        debug('onend');
        dest.end();
    }
    function cleanup() {
        debug('cleanup');
        // cleanup event handlers once the pipe is broken
        dest.removeListener('close', onclose);
        dest.removeListener('finish', onfinish);
        dest.removeListener('drain', ondrain);
        dest.removeListener('error', onerror);
        dest.removeListener('unpipe', onunpipe);
        src.removeListener('end', onend);
        src.removeListener('end', cleanup);
        src.removeListener('data', ondata);

        cleanedUp = true;

        // if the reader is waiting for a drain event from this
        // specific writer, then it would cause it to never start
        // flowing again.
        // So, if this is awaiting a drain, then we just call it now.
        // If we don't know, then assume that we are waiting for one.
        if (state.awaitDrain &&
            (!dest._writableState || dest._writableState.needDrain))
            ondrain();
    }

但是不知道duplex抽什么风了。然后改用transiform就妥妥了

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题