assemble.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package asm
  2. import (
  3. "bytes"
  4. "fmt"
  5. "hash"
  6. "hash/crc64"
  7. "io"
  8. "sync"
  9. "github.com/vbatts/tar-split/tar/storage"
  10. )
  11. // NewOutputTarStream returns an io.ReadCloser that is an assembled tar archive
  12. // stream.
  13. //
  14. // It takes a storage.FileGetter, for mapping the file payloads that are to be read in,
  15. // and a storage.Unpacker, which has access to the rawbytes and file order
  16. // metadata. With the combination of these two items, a precise assembled Tar
  17. // archive is possible.
  18. func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadCloser {
  19. // ... Since these are interfaces, this is possible, so let's not have a nil pointer
  20. if fg == nil || up == nil {
  21. return nil
  22. }
  23. pr, pw := io.Pipe()
  24. go func() {
  25. err := WriteOutputTarStream(fg, up, pw)
  26. if err != nil {
  27. pw.CloseWithError(err)
  28. } else {
  29. pw.Close()
  30. }
  31. }()
  32. return pr
  33. }
  34. // WriteOutputTarStream writes assembled tar archive to a writer.
  35. func WriteOutputTarStream(fg storage.FileGetter, up storage.Unpacker, w io.Writer) error {
  36. // ... Since these are interfaces, this is possible, so let's not have a nil pointer
  37. if fg == nil || up == nil {
  38. return nil
  39. }
  40. var copyBuffer []byte
  41. var crcHash hash.Hash
  42. var crcSum []byte
  43. var multiWriter io.Writer
  44. for {
  45. entry, err := up.Next()
  46. if err != nil {
  47. if err == io.EOF {
  48. return nil
  49. }
  50. return err
  51. }
  52. switch entry.Type {
  53. case storage.SegmentType:
  54. if _, err := w.Write(entry.Payload); err != nil {
  55. return err
  56. }
  57. case storage.FileType:
  58. if entry.Size == 0 {
  59. continue
  60. }
  61. fh, err := fg.Get(entry.GetName())
  62. if err != nil {
  63. return err
  64. }
  65. if crcHash == nil {
  66. crcHash = crc64.New(storage.CRCTable)
  67. crcSum = make([]byte, 8)
  68. multiWriter = io.MultiWriter(w, crcHash)
  69. copyBuffer = byteBufferPool.Get().([]byte)
  70. defer byteBufferPool.Put(copyBuffer)
  71. } else {
  72. crcHash.Reset()
  73. }
  74. if _, err := copyWithBuffer(multiWriter, fh, copyBuffer); err != nil {
  75. fh.Close()
  76. return err
  77. }
  78. if !bytes.Equal(crcHash.Sum(crcSum[:0]), entry.Payload) {
  79. // I would rather this be a comparable ErrInvalidChecksum or such,
  80. // but since it's coming through the PipeReader, the context of
  81. // _which_ file would be lost...
  82. fh.Close()
  83. return fmt.Errorf("file integrity checksum failed for %q", entry.GetName())
  84. }
  85. fh.Close()
  86. }
  87. }
  88. }
  89. var byteBufferPool = &sync.Pool{
  90. New: func() interface{} {
  91. return make([]byte, 32*1024)
  92. },
  93. }
  94. // copyWithBuffer is taken from stdlib io.Copy implementation
  95. // https://github.com/golang/go/blob/go1.5.1/src/io/io.go#L367
  96. func copyWithBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
  97. for {
  98. nr, er := src.Read(buf)
  99. if nr > 0 {
  100. nw, ew := dst.Write(buf[0:nr])
  101. if nw > 0 {
  102. written += int64(nw)
  103. }
  104. if ew != nil {
  105. err = ew
  106. break
  107. }
  108. if nr != nw {
  109. err = io.ErrShortWrite
  110. break
  111. }
  112. }
  113. if er == io.EOF {
  114. break
  115. }
  116. if er != nil {
  117. err = er
  118. break
  119. }
  120. }
  121. return written, err
  122. }