|
为什么要使用文件流想象这样一个场景,我要处理一个 10G 的文件,但我的内存大小只有 2G,该怎么办?我们可以分 5 次读取文件,每次只读取 2G 的数据,这样就可以解决这个问题,那么这个分段读取的过程就是流!在 node 中 stream 模块封装了流的基本操作,文件流也是直接依赖的此模块,这里我们借助文件流来深入理解 stream文件可读流读取文件,将文件内容一点一点的读入内存当中。使用方式我们先看一下基本的使用方式。constfs=require('fs')constrs=fs.createReadStream('./w-test.js')rs.on('data',(chunk)=>{console.log(chunk)})rs.on('close',()=>{console.log('close')})如上代码所示,我们通过 fs.createStream() 创建了一个可读流,用来读取 w-test.js 文件。当 on('data') 时,会自动的读取文件数据,每次默认读取 64kb 的内容,也可以通过 highWaterMark 参数来动态改变每次内容流程的阈值。文件读取完毕后会自动触发 close 事件。如下代码为 createReadStream 可以配置的参数constrs=fs.createReadStream('./w-test.js',{flags:'r',//文件系统表示,这里是指以可读的形式操作文件encoding:null,//编码方式autoClose:false,//读取完毕时是否自动触发close事件start:0,//开始读取的位置end:2,//结束读取的位置highWaterMark:2//每次读取的内容大小})注意: start 跟 end 都是包含的,即 [start, end]。其实,fs.crateReadStream 就是返回一个 fs.ReadStream 类的实例,所以上述代码就等同于:constrs=newfs.ReadStream('./w-test.js',{flags:'r',//文件系统表示,这里是指以可读的形式操作文件encoding:null,//编码方式autoClose:false,//读取完毕时是否自动触发close事件start:0,//开始读取的位置end:2,//结束读取的位置highWaterMark:2//每次读取的内容大小})文件可读流的实现了解完使用方式,那我们就应该尝试从原理上去搞定它,接下来,我们手写一个可读流。fs.read / fs.open可读流的本质就是分批读取文件数据,而 fs.read() 方法可以控制读取文件的内容大小fs.read(fd,buffer,offset,length,position,callback)fd:要读取的文件描述符buffer:数据要被写入的 buffer(将读取到的文件内容写入到此 buffer 内)offset:buffer 中开始写入的偏移量(从 buffer 的第几个索引开始写入)length:读取的字节数(从文件中读取几个字节)postion:指定从文件中开始读取的位置(从文件的第几个字节开始读)callback:回调函数errbytesRead:实际读取的字节数读取文件需要一个文件标识符,我们应该需要使用 fs.open 来获取path:文件路径flags:文件系统标志,默认值:'r'。意思是要对文件进行什么操作,常见的有以下几种:r:打开文件用于读取w:打开文件用于写入a:打开文件用于追加mode:文件操作权限,默认值:0o666(可读写)。callback:回调函数。函数上携带的参数如下:err:如果失败,则值为错误原因fd(number):文件描述符,读取、写入文件时都要用到这个值初始化首先,ReadStream 是一个类,从表现上来看这个类可以监听事件即 on('data'),所以我们应该让它继承自 EventEmitter,如下代码:classReadStreamextendsEventEmitter{constructor(){super();}}然后我们初始化参数,并打开文件,如下代码(代码中会对关键代码作注释):classReadStreamextendsEventEmitter{constructor(path,options={}){super()//解析参数this.path=paththis.flags=options.flags'r'this.encoding=options.encoding'utf8'this.autoClose=options.autoClosetruethis.start=options.start0this.end=options.endundefinedthis.highWaterMark=options.highWaterMark16*1024//文件的偏移量this.offset=this.start//是否处于流动状态,调用pause或resume方法时会用到,下文会讲到this.flowing=false//打开文件this.open()//当绑定新事件时会触发newListener//这里当绑定data事件时,自动触发文件的读取this.on('newListener',(type)=>{if(type==='data'){//标记为开始流动this.flowing=true//开始读取文件this.read()}})}}读取文件之前,我们要先打开文件,即 this.open()。on('newListener')是 EventEmitter 的一个事件,每当我们绑定新的事件时都会触发 newListener,例如:当我们 on('data') 时,会触发 newListener 事件,并且 type 为 'data'。这里当我们监听到data事件绑定(即 on('data'))时,就开始读取文件即 this.read(),this.read() 是我们核心方法。openopen 方法如下:open(){fs.open(this.path,this.flags,(err,fd)=>{if(err){//文件打开失败触发error事件this.emit('error',err)return}//记录文件标识符this.fd=fd//文件打开成功后触发open事件this.emit('open')})}当打开文件后记录下文件标识符,即 this.fdreadread 方法如下:read(){//由于```fs.open```是异步操作,//所以当调用read方法时,文件可能还没有打开//所以我们要等open事件触发之后,再次调用read方法if(typeofthis.fd!=='number'){this.once('open',()=>this.read())return}//申请一个highWaterMark字节的buffer,//用来存储从文件读取的内容constbuf=Buffer.alloc(this.highWaterMark)//开始读取文件//每次读取时,都记录下文件的偏移量fs.read(this.fd,buf,0,buf.length,this.offset,(err,bytesRead)=>{this.offset+=bytesRead//bytesRead为实际读取的文件字节数//如果bytesRead为0,则代表没有读取到内容,即读取完毕if(bytesRead){//每次读取都触发data事件this.emit('data',buf.slice(0,bytesRead))//如果处于流动状态,则继续读取//这里当调用pause方法时,会将this.flowing置为falsethis.flowing&this.read()}else{//读取完毕后触发end事件this.emit('end')//如果可以自动关闭,则关闭文件并触发close事件this.autoClose&fs.close(this.fd,()=>this.emit('close'))}})}上述每行代码都有注释,相信也不难理解,这里有几个关键点要注意一下一定要等文件打开后才能开始读取文件,但是文件打开是一个异步操作,我们并不知道具体的打开完毕时间,所以,我们会在文件打开后触发一个 on('open') 事件,read 方法内会等 open 事件触发后再次重新调用 read()fs.read() 方法之前有讲过,可以从前文回顾里看一下 手写 fs 核心方法this.flowing 属性是用来判断是否是流动的,会用对应的 pasue() 方法与 resume() 来控制,下面我们来看一下这两个方法。pausepause(){this.flowing=false}resumeresume(){if(!this.flowing){this.flowing=truethis.read()}}完整代码const{EventEmitter}=require('events')constfs=require('fs')classReadStreamextendsEventEmitter{constructor(path,options={}){super()this.path=paththis.flags=options.flags'r'this.encoding=options.encoding'utf8'this.autoClose=options.autoClosetruethis.start=options.start0this.end=options.endundefinedthis.highWaterMark=options.highWaterMark16*1024this.offset=this.startthis.flowing=falsethis.open()this.on('newListener',(type)=>{if(type==='data'){this.flowing=truethis.read()}})}open(){fs.open(this.path,this.flags,(err,fd)=>{if(err){this.emit('error',err)return}this.fd=fdthis.emit('open')})}pause(){this.flowing=false}resume(){if(!this.flowing){this.flowing=truethis.read()}}read(){if(typeofthis.fd!=='number'){this.once('open',()=>this.read())return}constbuf=Buffer.alloc(this.highWaterMark)fs.read(this.fd,buf,0,buf.length,this.offset,(err,bytesRead)=>{this.offset+=bytesReadif(bytesRead){this.emit('data',buf.slice(0,bytesRead))this.flowing&this.read()}else{this.emit('end')this.autoClose&fs.close(this.fd,()=>this.emit('close'))}})}}文件可写流顾名思义了,将内容一点一点写入到文件里去。fs.writefd:要被写入的文件描述符buffer:将指定 buffer 的内容写入到文件中去offset:指定 buffer 的写入位置(从 buffer 的第 offset 个索引读取内容写入到文件中去)length:指定要写入的字节数position:文件的偏移量(从文件的第 position 个字节开始写入)使用方式//使用方式1:constws=fs.createWriteStream('./w-test.js')//使用方式2:constws=newWriteStream('./w-test.js',{flags:'w',encoding:'utf8',autoClose:true,highWaterMark:2})//写入文件constflag=ws.write('2')ws.on('drain',()=>console.log('drain'))ws.write() 写入文件。这里有一个返回值,代表是否已经达到最大缓存。当我们同步连续调用多次 write()时,并不是每次调用都立即写入文件,而是同一时间只能执行一次写入操作,所以剩下的会被写入到缓存中,等上一次写入完毕后再从缓存中依次取出执行。所以,这时就会有一个最大的缓存大小,默认为 64kb。而这里的返回值则代表是否还可以继续写入,也就是:是否达到了最大缓存。true 代表可以继续写入。ws.on('drain'),如果调用ws.write()返回 false,则当可以继续写入数据到流时会触发 'drain' 事件。文件可写流的实现初始化先定义 WriteStream 类,并继承 EventEmitter,然后,初始化参数。_注意看代码注释_const{EventEmitter}=require('events')constfs=require('fs')classWriteStreamextendsEventEmitter{constructor(path,options={}){super()//初始化参数this.path=paththis.flags=options.flags'w'this.encoding=options.encoding'utf8'this.autoClose=options.autoClosetruethis.highWaterMark=options.highWaterMark16*1024this.offset=0//文件读取偏移量this.cache=[]//缓存的要被写入的内容//将要被写入的总长度,包括缓存中的内容长度this.writtenLen=0//是否正在执行写入操作,//如果正在写入,那以后的操作需放入this.cachethis.writing=false//是否应该触发drain事件this.needDrain=false//打开文件this.open()}}open()跟 ReadStream 一样的代码。open(){fs.open(this.path,this.flags,(err,fd)=>{if(err){this.emit('error',err)return}this.fd=fdthis.emit('open')})}write()执行写入操作write(chunk,encoding,cb=()=>{}){//初始化被写入的内容//如果时字符串,则转为bufferchunk=Buffer.isBuffer(chunk)chunk:Buffer.from(chunk,encoding)//计算要被写入的长度this.writtenLen+=chunk.length//判断是否已经超过highWaterMarkconsthasLimit=this.writtenLen>=this.highWaterMark//是否需要触发drain//如果超过highWaterMark,则代表需要触发this.needDrain=hasLimit//如果没有正在写入的内容,则调用_write直接开始写入//否则放入cache中//写入完成后,调用clearBuffer,从缓存中拿取最近一次内容开始写入if(!this.writing){this.writing=truethis._write(chunk,()=>{cb()this.clearBuffer()})}else{this.cache.push({chunk:chunk,cb})}return!hasLimit}//写入操作_write(chunk,cb){if(typeofthis.fd!=='number'){this.once('open',()=>this._write(chunk,cb))return}//写入文件fs.write(this.fd,chunk,0,chunk.length,this.offset,(err,bytesWritten)=>{if(err){this.emit('error',err)return}//计算偏移量this.offset+=bytesWritten//写入完毕,则减去当前写入的长度this.writtenLen-=bytesWrittencb()})}首先初始化要被写入的内容,只支持 buffer 跟字符串,如果是字符串则直接转为 buffer。计算要被写入的总长度,即 this.writtenLen += chunk.length判断是否已经超过 highWaterMark判断是否需要触发 drain判断是否已经有正在被写入的内容了,如果没有则调用 _write() 直接写入,如果有则放入缓存中。当 _write() 写入完毕后,调用 clearBuffer() 方法,从 this.cache 中取出最先被缓存的内容进行写入操作。clearBuffer 方法如下所示clearBuffer()clearBuffer(){//取出缓存constdata=this.cache.shift()if(data){const{chunk,cb}=data//继续进行写入操作this._write(chunk,()=>{cb()this.clearBuffer()})return}//触发drainthis.needDrain&this.emit('drain')//写入完毕,将writing置为falsethis.writing=false//needDrain置为falsethis.needDrain=false}完整代码const{EventEmitter}=require('events')constfs=require('fs')classWriteStreamextendsEventEmitter{constructor(path,options={}){super()this.path=paththis.flags=options.flags'w'this.encoding=options.encoding'utf8'this.autoClose=options.autoClosetruethis.highWaterMark=options.highWaterMark16*1024this.offset=0this.cache=[]this.writtenLen=0this.writing=falsethis.needDrain=falsethis.open()}open(){fs.open(this.path,this.flags,(err,fd)=>{if(err){this.emit('error',err)return}this.fd=fdthis.emit('open')})}clearBuffer(){constdata=this.cache.shift()if(data){const{chunk,cb}=datathis._write(chunk,()=>{cb()this.clearBuffer()})return}this.needDrain&this.emit('drain')this.writing=falsethis.needDrain=false}write(chunk,encoding,cb=()=>{}){chunk=Buffer.isBuffer(chunk)chunk:Buffer.from(chunk,encoding)this.writtenLen+=chunk.lengthconsthasLimit=this.writtenLen>=this.highWaterMarkthis.needDrain=hasLimitif(!this.writing){this.writing=truethis._write(chunk,()=>{cb()this.clearBuffer()})}else{this.cache.push({chunk:chunk,cb})}return!hasLimit}_write(chunk,cb){if(typeofthis.fd!=='number'){this.once('open',()=>this._write(chunk,cb))return}fs.write(this.fd,chunk,0,chunk.length,this.offset,(err,bytesWritten)=>{if(err){this.emit('error',err)return}this.offset+=bytesWrittenthis.writtenLen-=bytesWrittencb()})}}-END-关于奇舞团奇舞团是 360 集团最大的大前端团队,代表集团参与 W3C 和 ECMA 会员(TC39)工作。奇舞团非常重视人才培养,有工程师、讲师、翻译官、业务接口人、团队 Leader 等多种发展方向供员工选择,并辅以提供相应的技术力、专业力、通用力、领导力等培训课程。奇舞团以开放和求贤的心态欢迎各种优秀人才关注和加入奇舞团。
|
|