Go源码分析(3) - io

Go源码分析(3) - io

io包下分为io和iotuil,提供了缓冲io和非缓冲io,拷贝文件等常见操作。

io.go

提供了四种基本通用api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Reader interface {
Read(p []byte) (n int, err error)
}

type Writer interface {
Write(p []byte) (n int, err error)
}

type Closer interface {
Close() error
}

type Seeker interface {
Seek(offset int64, whence int) (int64, error)
}
  • Reader接口提供了读操作接口
  • Writer接口提供了写操作接口
  • Closer接口关闭当前io操作以及资源释放
  • Seeker接口将当前读取或者移动指针移动到特定到位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ReadWriter is the interface that groups the basic Read and Write methods.
type ReadWriter interface {
Reader
Writer
}

// ReadCloser is the interface that groups the basic Read and Close methods.
type ReadCloser interface {
Reader
Closer
}

// WriteCloser is the interface that groups the basic Write and Close methods.
type WriteCloser interface {
Writer
Closer
}

// ReadWriteCloser is the interface that groups the basic Read, Write and Close methods.
type ReadWriteCloser interface {
Reader
Writer
Closer
}

// ReadSeeker is the interface that groups the basic Read and Seek methods.
type ReadSeeker interface {
Reader
Seeker
}

// WriteSeeker is the interface that groups the basic Write and Seek methods.
type WriteSeeker interface {
Writer
Seeker
}

// ReadWriteSeeker is the interface that groups the basic Read, Write and Seek methods.
type ReadWriteSeeker interface {
Reader
Writer
Seeker
}

以上几个接口都是对几种基本接口的组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ReaderFrom is the interface that wraps the ReadFrom method.
//
// ReadFrom reads data from r until EOF or error.
// The return value n is the number of bytes read.
// Any error except io.EOF encountered during the read is also returned.
//
// The Copy function uses ReaderFrom if available.
type ReaderFrom interface {
ReadFrom(r Reader) (n int64, err error)
}

// WriterTo is the interface that wraps the WriteTo method.
//
// WriteTo writes data to w until there's no more data to write or
// when an error occurs. The return value n is the number of bytes
// written. Any error encountered during the write is also returned.
//
// The Copy function uses WriterTo if available.
type WriterTo interface {
WriteTo(w Writer) (n int64, err error)
}
  • ReaderFrom从一个reader读取内容
  • WriterTo将当前当前writer的内容读取到另一个w中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// ReaderAt is the interface that wraps the basic ReadAt method.
//
// ReadAt reads len(p) bytes into p starting at offset off in the
// underlying input source. It returns the number of bytes
// read (0 <= n <= len(p)) and any error encountered.
//
// When ReadAt returns n < len(p), it returns a non-nil error
// explaining why more bytes were not returned. In this respect,
// ReadAt is stricter than Read.
//
// Even if ReadAt returns n < len(p), it may use all of p as scratch
// space during the call. If some data is available but not len(p) bytes,
// ReadAt blocks until either all the data is available or an error occurs.
// In this respect ReadAt is different from Read.
//
// If the n = len(p) bytes returned by ReadAt are at the end of the
// input source, ReadAt may return either err == EOF or err == nil.
//
// If ReadAt is reading from an input source with a seek offset,
// ReadAt should not affect nor be affected by the underlying
// seek offset.
//
// Clients of ReadAt can execute parallel ReadAt calls on the
// same input source.
//
// Implementations must not retain p.
type ReaderAt interface {
ReadAt(p []byte, off int64) (n int, err error)
}

// WriterAt is the interface that wraps the basic WriteAt method.
//
// WriteAt writes len(p) bytes from p to the underlying data stream
// at offset off. It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// WriteAt must return a non-nil error if it returns n < len(p).
//
// If WriteAt is writing to a destination with a seek offset,
// WriteAt should not affect nor be affected by the underlying
// seek offset.
//
// Clients of WriteAt can execute parallel WriteAt calls on the same
// destination if the ranges do not overlap.
//
// Implementations must not retain p.
type WriterAt interface {
WriteAt(p []byte, off int64) (n int, err error)
}

从指定偏移量开始读取或者写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// StringWriter is the interface that wraps the WriteString method.
type StringWriter interface {
WriteString(s string) (n int, err error)
}

// WriteString writes the contents of the string s to w, which accepts a slice of bytes.
// If w implements StringWriter, its WriteString method is invoked directly.
// Otherwise, w.Write is called exactly once.
func WriteString(w Writer, s string) (n int, err error) {
if sw, ok := w.(StringWriter); ok {
return sw.WriteString(s)
}
return w.Write([]byte(s))
}

WriteString可以将字符串写入Writer中,如果该WriterWriteString类型,则会调用该对象自己的WriteString方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
} else if n > 0 && err == EOF {
err = ErrUnexpectedEOF
}
return
}
  1. 如果当前buf长度小于最少读取的字节,返回太短的buffer错误
  2. 循环读取当前reader中的内容到buffer,比较是已经读取到min长度
  3. 如果当前读取的长度小于min并且错误等于EOF,代表reader里面没有足够对长度,抛出EOF错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func CopyN(dst Writer, src Reader, n int64) (written int64, err error) {
written, err = Copy(dst, LimitReader(src, n))
if written == n {
return n, nil
}
if written < n && err == nil {
// src stopped early; must have been EOF.
err = EOF
}
return
}

func Copy(dst Writer, src Reader) (written int64, err error) {
return copyBuffer(dst, src, nil)
}

func CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
if buf != nil && len(buf) == 0 {
panic("empty buffer in io.CopyBuffer")
}
return copyBuffer(dst, src, buf)
}


func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
// If the reader has a WriteTo method, use it to do the copy.
// Avoids an allocation and a copy.
if wt, ok := src.(WriterTo); ok {
return wt.WriteTo(dst)
}
// Similarly, if the writer has a ReadFrom method, use it to do the copy.
if rt, ok := dst.(ReaderFrom); ok {
return rt.ReadFrom(src)
}
if buf == nil {
size := 32 * 1024
if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
if l.N < 1 {
size = 1
} else {
size = int(l.N)
}
}
buf = make([]byte, size)
}
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = ErrShortWrite
break
}
}
if er != nil {
if er != EOF {
err = er
}
break
}
}
return written, err
}
  • CopyN(dst Writer, src Reader, n int64) (written int64, err error)调用Copy方法并包装当前reader为LimitReader
  • Copy(dst Writer, src Reader) (written int64, err error)调用copyBuffer(dst, src, nil)方法传入buf为nil,不是用缓冲区
  • copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)是这些方法中最关键的一个方法
    1. 如果当前reader方法实现WriterTo方法,则直接调用
    2. 如果当前writer方法实现ReadFrom方法,则直接调用
    3. 如果buf为空,处理直接为Copy或者CopyN的情况
      1. 首先如果强转为LimitedReader方法,如果可以则直接将buf的大小设置为LimitedReader.N的大小
      2. buf大小为32*1024,既为32mb大小
    4. 循环从read中读取到buf,然后从buf中写入到writer中,reader中的数据被读取完毕,或者出现读写大小不一样的错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// LimitReader returns a Reader that reads from r
// but stops with EOF after n bytes.
// The underlying implementation is a *LimitedReader.
func LimitReader(r Reader, n int64) Reader { return &LimitedReader{r, n} }

// A LimitedReader reads from R but limits the amount of
// data returned to just N bytes. Each call to Read
// updates N to reflect the new amount remaining.
// Read returns EOF when N <= 0 or when the underlying R returns EOF.
type LimitedReader struct {
R Reader // underlying reader
N int64 // max bytes remaining
}

func (l *LimitedReader) Read(p []byte) (n int, err error) {
if l.N <= 0 {
return 0, EOF
}
if int64(len(p)) > l.N {
p = p[0:l.N]
}
n, err = l.R.Read(p)
l.N -= int64(n)
return
}

限制被读取的大小,通过切片的方式来限制buf的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// NewSectionReader returns a SectionReader that reads from r
// starting at offset off and stops with EOF after n bytes.
func NewSectionReader(r ReaderAt, off int64, n int64) *SectionReader {
return &SectionReader{r, off, off, off + n}
}

// SectionReader implements Read, Seek, and ReadAt on a section
// of an underlying ReaderAt.
type SectionReader struct {
r ReaderAt
base int64
off int64
limit int64
}

func (s *SectionReader) Read(p []byte) (n int, err error) {
if s.off >= s.limit {
return 0, EOF
}
if max := s.limit - s.off; int64(len(p)) > max {
p = p[0:max]
}
n, err = s.r.ReadAt(p, s.off)
s.off += int64(n)
return
}

var errWhence = errors.New("Seek: invalid whence")
var errOffset = errors.New("Seek: invalid offset")

func (s *SectionReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
default:
return 0, errWhence
case SeekStart:
offset += s.base
case SeekCurrent:
offset += s.off
case SeekEnd:
offset += s.limit
}
if offset < s.base {
return 0, errOffset
}
s.off = offset
return offset - s.base, nil
}

func (s *SectionReader) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 || off >= s.limit-s.base {
return 0, EOF
}
off += s.base
if max := s.limit - off; int64(len(p)) > max {
p = p[0:max]
n, err = s.r.ReadAt(p, off)
if err == nil {
err = EOF
}
return n, err
}
return s.r.ReadAt(p, off)
}

// Size returns the size of the section in bytes.
func (s *SectionReader) Size() int64 { return s.limit - s.base }

SectionReader部分读取,base是基础的读取的位置,off为偏移量,limit限制的最大偏移量.

  • Seek(offset int64, whence int) (int64, error)方法中whence是一个枚举类型有三个值SeekStart,SeekCurrent,SeekEnd从开头,当前位置,和结尾位置作为初始下标,返回寻找的下表位置
  • (s *SectionReader) ReadAt(p []byte, off int64) (n int, err error)从指定位置开始读取,如果当前偏移量超出了限制,就返回EOF,并且如果结尾的数量小于当前buf的大小,就对buf进行重新切片,再调用reader的ReadAt()方法防止超出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// TeeReader returns a Reader that writes to w what it reads from r.
// All reads from r performed through it are matched with
// corresponding writes to w. There is no internal buffering -
// the write must complete before the read completes.
// Any error encountered while writing is reported as a read error.
func TeeReader(r Reader, w Writer) Reader {
return &teeReader{r, w}
}

type teeReader struct {
r Reader
w Writer
}

func (t *teeReader) Read(p []byte) (n int, err error) {
n, err = t.r.Read(p)
if n > 0 {
if n, err := t.w.Write(p[:n]); err != nil {
return n, err
}
}
return
}

将buffer中的所有内容全部都传输入到writer中

multi.go

1
2
3
4
5
type eofReader struct{}

func (eofReader) Read([]byte) (int, error) {
return 0, EOF
}

定义一个eofReader实现reader接口作为EOF处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type multiReader struct {
readers []Reader
}

func (mr *multiReader) Read(p []byte) (n int, err error) {
for len(mr.readers) > 0 {
// Optimization to flatten nested multiReaders (Issue 13558).
if len(mr.readers) == 1 {
if r, ok := mr.readers[0].(*multiReader); ok {
mr.readers = r.readers
continue
}
}
n, err = mr.readers[0].Read(p)
if err == EOF {
// Use eofReader instead of nil to avoid nil panic
// after performing flatten (Issue 18232).
mr.readers[0] = eofReader{} // permit earlier GC
mr.readers = mr.readers[1:]
}
if n > 0 || err != EOF {
if err == EOF && len(mr.readers) > 0 {
// Don't return EOF yet. More readers remain.
err = nil
}
return
}
}
return 0, EOF
}

// MultiReader returns a Reader that's the logical concatenation of
// the provided input readers. They're read sequentially. Once all
// inputs have returned EOF, Read will return EOF. If any of the readers
// return a non-nil, non-EOF error, Read will return that error.
func MultiReader(readers ...Reader) Reader {
r := make([]Reader, len(readers))
copy(r, readers)
return &multiReader{r}
}

multiReader是一个多readers多读取的一个结构体,实现了reader接口,构造方法为传入多个reader,调用系统内置copy()方法防止该multireader对原始对reader进行修改。

  • Read(p []byte) (n int, err error),查看构造的reader是否有可用数量的reader,如果构造的reader本身也multiReader类型,那么将当前reader解包添加到readers中,然后调用readers[0]Read(p)方法,返回结果之后,如果已经读取到结尾,当前reader从readers中剔除,但是这里不用nil来避免panic,而是使用一个eofReader来表示这个reader已经不可用了,同时加快GC。继续判断如果读取的字节是大于0或者错误不是是EOF就直接返回。实质上mulitReader就是一个序列化的读取reader,从readers[0]读取到readers[n]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
type multiWriter struct {
writers []Writer
}

func (t *multiWriter) Write(p []byte) (n int, err error) {
for _, w := range t.writers {
n, err = w.Write(p)
if err != nil {
return
}
if n != len(p) {
err = ErrShortWrite
return
}
}
return len(p), nil
}

var _ StringWriter = (*multiWriter)(nil)

func (t *multiWriter) WriteString(s string) (n int, err error) {
var p []byte // lazily initialized if/when needed
for _, w := range t.writers {
if sw, ok := w.(StringWriter); ok {
n, err = sw.WriteString(s)
} else {
if p == nil {
p = []byte(s)
}
n, err = w.Write(p)
}
if err != nil {
return
}
if n != len(s) {
err = ErrShortWrite
return
}
}
return len(s), nil
}

// MultiWriter creates a writer that duplicates its writes to all the
// provided writers, similar to the Unix tee(1) command.
//
// Each write is written to each listed writer, one at a time.
// If a listed writer returns an error, that overall write operation
// stops and returns the error; it does not continue down the list.
func MultiWriter(writers ...Writer) Writer {
allWriters := make([]Writer, 0, len(writers))
for _, w := range writers {
if mw, ok := w.(*multiWriter); ok {
allWriters = append(allWriters, mw.writers...)
} else {
allWriters = append(allWriters, w)
}
}
return &multiWriter{allWriters}
}

multiWritermultiReader的实现过程非常相似,从writers[0]顺序写入到writers[n]中。

pipe.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// atomicError is a type-safe atomic value for errors.
// We use a struct{ error } to ensure consistent use of a concrete type.
type atomicError struct{ v atomic.Value }

func (a *atomicError) Store(err error) {
a.v.Store(struct{ error }{err})
}
func (a *atomicError) Load() error {
err, _ := a.v.Load().(struct{ error })
return err.error
}

// ErrClosedPipe is the error used for read or write operations on a closed pipe.
var ErrClosedPipe = errors.New("io: read/write on closed pipe")

该文件定一个原子错误的结构体,这个结构体是没有向外暴露的,仅内部使用。然后定一个关闭pipe的错误

1
2
3
4
5
6
7
8
9
10
11
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan []byte
rdCh chan int

once sync.Once // Protects closing done
done chan struct{}
rerr atomicError
werr atomicError
}
  • wrMupiple需要保持序列化,所需要加锁
  • wrCh写管道
  • rdCh读管道,表示需要读取多少字节
  • once让关闭只被执行一次的操作
  • done监听是否关闭
  • rerr读错误
  • werr写错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (p *pipe) Read(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.readCloseError()
default:
}

select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}

首先判断管道是否被关闭,如果关闭直接返回关闭的错误,从写管道中读取内容,然后将管道中的内容copy()出来,防止元数据发生改变后读取后的内容也跟着改变了,然后将当前读取了多少字节给读通道,在其中要判断是否管道是否发生关闭,防止这个时候管道被关闭。

1
2
3
4
5
6
7
func (p *pipe) readCloseError() error {
rerr := p.rerr.Load()
if werr := p.werr.Load(); rerr == nil && werr != nil {
return werr
}
return ErrClosedPipe
}

返回读关闭错误

1
2
3
4
5
6
7
8
func (p *pipe) CloseRead(err error) error {
if err == nil {
err = ErrClosedPipe
}
p.rerr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}

从读错误里面存储关闭管道的错误,然后调用close方法关闭p.done,来通知关闭了管道,使用once.Do()来防止被关闭多次,导致报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *pipe) Write(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.writeCloseError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()
}

for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}

写方法依然后是先判断管道有没有被关闭,如果没有关闭就对整个方法加速,然后将bytes写入到写管道中,然后从读管道中得到读取数量,作为下一次buf的开始位置,直到byte长度为0,也就是写入完毕,或者管道被关闭。最后返回当前写入的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (p *pipe) writeCloseError() error {
werr := p.werr.Load()
if rerr := p.rerr.Load(); werr == nil && rerr != nil {
return rerr
}
return ErrClosedPipe
}

func (p *pipe) CloseWrite(err error) error {
if err == nil {
err = EOF
}
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}

上面这两个方法和读操作是类似的。

总结

Go中的io操作有很多精妙的操作,比如不将数组中的对象置为nil,避免panic,使用另一个对象来代替,从而使原来数组的对象没有被引用到,使其防止到白色区域,下一次的时候就可以被GC回收掉。然后该文件定义了相当多的接口,很多都没有在这个包下实现,大多是在其他包中进行实现,该包没有承担过多的责任,使其扩展起来也可以很容易明白。定义了很多工具和通用的一些结构体,多个readers的类,管道,部分阅读器,TeeReader等类型。

评论

`
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×