You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

transfer.go 28 kB

4 years ago

  1. // Copyright 2019 Huawei Technologies Co.,Ltd.
  2. // Licensed under the Apache License, Version 2.0 (the "License"); you may not use
  3. // this file except in compliance with the License. You may obtain a copy of the
  4. // License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software distributed
  9. // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  10. // CONDITIONS OF ANY KIND, either express or implied. See the License for the
  11. // specific language governing permissions and limitations under the License.
  12. //nolint:golint, unused
  13. package obs
  14. import (
  15. "bufio"
  16. "encoding/xml"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "os"
  22. "path/filepath"
  23. "sync"
  24. "sync/atomic"
  25. "syscall"
  26. )
  27. var errAbort = errors.New("AbortError")
  28. // FileStatus defines the upload file properties
  29. type FileStatus struct {
  30. XMLName xml.Name `xml:"FileInfo"`
  31. LastModified int64 `xml:"LastModified"`
  32. Size int64 `xml:"Size"`
  33. }
  34. // UploadPartInfo defines the upload part properties
  35. type UploadPartInfo struct {
  36. XMLName xml.Name `xml:"UploadPart"`
  37. PartNumber int `xml:"PartNumber"`
  38. Etag string `xml:"Etag"`
  39. PartSize int64 `xml:"PartSize"`
  40. Offset int64 `xml:"Offset"`
  41. IsCompleted bool `xml:"IsCompleted"`
  42. }
  43. // UploadCheckpoint defines the upload checkpoint file properties
  44. type UploadCheckpoint struct {
  45. XMLName xml.Name `xml:"UploadFileCheckpoint"`
  46. Bucket string `xml:"Bucket"`
  47. Key string `xml:"Key"`
  48. UploadId string `xml:"UploadId,omitempty"`
  49. UploadFile string `xml:"FileUrl"`
  50. FileInfo FileStatus `xml:"FileInfo"`
  51. UploadParts []UploadPartInfo `xml:"UploadParts>UploadPart"`
  52. }
  53. func (ufc *UploadCheckpoint) isValid(bucket, key, uploadFile string, fileStat os.FileInfo) bool {
  54. if ufc.Bucket != bucket || ufc.Key != key || ufc.UploadFile != uploadFile {
  55. doLog(LEVEL_INFO, "Checkpoint file is invalid, the bucketName or objectKey or uploadFile was changed. clear the record.")
  56. return false
  57. }
  58. if ufc.FileInfo.Size != fileStat.Size() || ufc.FileInfo.LastModified != fileStat.ModTime().Unix() {
  59. doLog(LEVEL_INFO, "Checkpoint file is invalid, the uploadFile was changed. clear the record.")
  60. return false
  61. }
  62. if ufc.UploadId == "" {
  63. doLog(LEVEL_INFO, "UploadId is invalid. clear the record.")
  64. return false
  65. }
  66. return true
  67. }
  68. type uploadPartTask struct {
  69. UploadPartInput
  70. obsClient *ObsClient
  71. abort *int32
  72. extensions []extensionOptions
  73. enableCheckpoint bool
  74. }
  75. func (task *uploadPartTask) Run() interface{} {
  76. if atomic.LoadInt32(task.abort) == 1 {
  77. return errAbort
  78. }
  79. input := &UploadPartInput{}
  80. input.Bucket = task.Bucket
  81. input.Key = task.Key
  82. input.PartNumber = task.PartNumber
  83. input.UploadId = task.UploadId
  84. input.SseHeader = task.SseHeader
  85. input.SourceFile = task.SourceFile
  86. input.Offset = task.Offset
  87. input.PartSize = task.PartSize
  88. extensions := task.extensions
  89. var output *UploadPartOutput
  90. var err error
  91. if extensions != nil {
  92. output, err = task.obsClient.UploadPart(input, extensions...)
  93. } else {
  94. output, err = task.obsClient.UploadPart(input)
  95. }
  96. if err == nil {
  97. if output.ETag == "" {
  98. doLog(LEVEL_WARN, "Get invalid etag value after uploading part [%d].", task.PartNumber)
  99. if !task.enableCheckpoint {
  100. atomic.CompareAndSwapInt32(task.abort, 0, 1)
  101. doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.PartNumber)
  102. }
  103. return fmt.Errorf("get invalid etag value after uploading part [%d]", task.PartNumber)
  104. }
  105. return output
  106. } else if obsError, ok := err.(ObsError); ok && obsError.StatusCode >= 400 && obsError.StatusCode < 500 {
  107. atomic.CompareAndSwapInt32(task.abort, 0, 1)
  108. doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.PartNumber)
  109. }
  110. return err
  111. }
  112. func loadCheckpointFile(checkpointFile string, result interface{}) error {
  113. ret, err := ioutil.ReadFile(checkpointFile)
  114. if err != nil {
  115. return err
  116. }
  117. if len(ret) == 0 {
  118. return nil
  119. }
  120. return xml.Unmarshal(ret, result)
  121. }
  122. func updateCheckpointFile(fc interface{}, checkpointFilePath string) error {
  123. result, err := xml.Marshal(fc)
  124. if err != nil {
  125. return err
  126. }
  127. err = ioutil.WriteFile(checkpointFilePath, result, 0666)
  128. return err
  129. }
  130. func getCheckpointFile(ufc *UploadCheckpoint, uploadFileStat os.FileInfo, input *UploadFileInput, obsClient *ObsClient, extensions []extensionOptions) (needCheckpoint bool, err error) {
  131. checkpointFilePath := input.CheckpointFile
  132. checkpointFileStat, err := os.Stat(checkpointFilePath)
  133. if err != nil {
  134. doLog(LEVEL_DEBUG, fmt.Sprintf("Stat checkpoint file failed with error: [%v].", err))
  135. return true, nil
  136. }
  137. if checkpointFileStat.IsDir() {
  138. doLog(LEVEL_ERROR, "Checkpoint file can not be a folder.")
  139. return false, errors.New("checkpoint file can not be a folder")
  140. }
  141. err = loadCheckpointFile(checkpointFilePath, ufc)
  142. if err != nil {
  143. doLog(LEVEL_WARN, fmt.Sprintf("Load checkpoint file failed with error: [%v].", err))
  144. return true, nil
  145. } else if !ufc.isValid(input.Bucket, input.Key, input.UploadFile, uploadFileStat) {
  146. if ufc.Bucket != "" && ufc.Key != "" && ufc.UploadId != "" {
  147. _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, obsClient, extensions)
  148. if _err != nil {
  149. doLog(LEVEL_WARN, "Failed to abort upload task [%s].", ufc.UploadId)
  150. }
  151. }
  152. _err := os.Remove(checkpointFilePath)
  153. if _err != nil {
  154. doLog(LEVEL_WARN, fmt.Sprintf("Failed to remove checkpoint file with error: [%v].", _err))
  155. }
  156. } else {
  157. return false, nil
  158. }
  159. return true, nil
  160. }
  161. func prepareUpload(ufc *UploadCheckpoint, uploadFileStat os.FileInfo, input *UploadFileInput, obsClient *ObsClient, extensions []extensionOptions) error {
  162. initiateInput := &InitiateMultipartUploadInput{}
  163. initiateInput.ObjectOperationInput = input.ObjectOperationInput
  164. initiateInput.ContentType = input.ContentType
  165. var output *InitiateMultipartUploadOutput
  166. var err error
  167. if extensions != nil {
  168. output, err = obsClient.InitiateMultipartUpload(initiateInput, extensions...)
  169. } else {
  170. output, err = obsClient.InitiateMultipartUpload(initiateInput)
  171. }
  172. if err != nil {
  173. return err
  174. }
  175. ufc.Bucket = input.Bucket
  176. ufc.Key = input.Key
  177. ufc.UploadFile = input.UploadFile
  178. ufc.FileInfo = FileStatus{}
  179. ufc.FileInfo.Size = uploadFileStat.Size()
  180. ufc.FileInfo.LastModified = uploadFileStat.ModTime().Unix()
  181. ufc.UploadId = output.UploadId
  182. err = sliceFile(input.PartSize, ufc)
  183. return err
  184. }
  185. func sliceFile(partSize int64, ufc *UploadCheckpoint) error {
  186. fileSize := ufc.FileInfo.Size
  187. cnt := fileSize / partSize
  188. if cnt >= 10000 {
  189. partSize = fileSize / 10000
  190. if fileSize%10000 != 0 {
  191. partSize++
  192. }
  193. cnt = fileSize / partSize
  194. }
  195. if fileSize%partSize != 0 {
  196. cnt++
  197. }
  198. if partSize > MAX_PART_SIZE {
  199. doLog(LEVEL_ERROR, "The source upload file is too large")
  200. return fmt.Errorf("The source upload file is too large")
  201. }
  202. if cnt == 0 {
  203. uploadPart := UploadPartInfo{}
  204. uploadPart.PartNumber = 1
  205. ufc.UploadParts = []UploadPartInfo{uploadPart}
  206. } else {
  207. uploadParts := make([]UploadPartInfo, 0, cnt)
  208. var i int64
  209. for i = 0; i < cnt; i++ {
  210. uploadPart := UploadPartInfo{}
  211. uploadPart.PartNumber = int(i) + 1
  212. uploadPart.PartSize = partSize
  213. uploadPart.Offset = i * partSize
  214. uploadParts = append(uploadParts, uploadPart)
  215. }
  216. if value := fileSize % partSize; value != 0 {
  217. uploadParts[cnt-1].PartSize = value
  218. }
  219. ufc.UploadParts = uploadParts
  220. }
  221. return nil
  222. }
  223. func abortTask(bucket, key, uploadID string, obsClient *ObsClient, extensions []extensionOptions) error {
  224. input := &AbortMultipartUploadInput{}
  225. input.Bucket = bucket
  226. input.Key = key
  227. input.UploadId = uploadID
  228. if extensions != nil {
  229. _, err := obsClient.AbortMultipartUpload(input, extensions...)
  230. return err
  231. }
  232. _, err := obsClient.AbortMultipartUpload(input)
  233. return err
  234. }
  235. func handleUploadFileResult(uploadPartError error, ufc *UploadCheckpoint, enableCheckpoint bool, obsClient *ObsClient, extensions []extensionOptions) error {
  236. if uploadPartError != nil {
  237. if enableCheckpoint {
  238. return uploadPartError
  239. }
  240. _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, obsClient, extensions)
  241. if _err != nil {
  242. doLog(LEVEL_WARN, "Failed to abort task [%s].", ufc.UploadId)
  243. }
  244. return uploadPartError
  245. }
  246. return nil
  247. }
  248. func completeParts(ufc *UploadCheckpoint, enableCheckpoint bool, checkpointFilePath string, obsClient *ObsClient, extensions []extensionOptions) (output *CompleteMultipartUploadOutput, err error) {
  249. completeInput := &CompleteMultipartUploadInput{}
  250. completeInput.Bucket = ufc.Bucket
  251. completeInput.Key = ufc.Key
  252. completeInput.UploadId = ufc.UploadId
  253. parts := make([]Part, 0, len(ufc.UploadParts))
  254. for _, uploadPart := range ufc.UploadParts {
  255. part := Part{}
  256. part.PartNumber = uploadPart.PartNumber
  257. part.ETag = uploadPart.Etag
  258. parts = append(parts, part)
  259. }
  260. completeInput.Parts = parts
  261. var completeOutput *CompleteMultipartUploadOutput
  262. if extensions != nil {
  263. completeOutput, err = obsClient.CompleteMultipartUpload(completeInput, extensions...)
  264. } else {
  265. completeOutput, err = obsClient.CompleteMultipartUpload(completeInput)
  266. }
  267. if err == nil {
  268. if enableCheckpoint {
  269. _err := os.Remove(checkpointFilePath)
  270. if _err != nil {
  271. doLog(LEVEL_WARN, "Upload file successfully, but remove checkpoint file failed with error [%v].", _err)
  272. }
  273. }
  274. return completeOutput, err
  275. }
  276. if !enableCheckpoint {
  277. _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, obsClient, extensions)
  278. if _err != nil {
  279. doLog(LEVEL_WARN, "Failed to abort task [%s].", ufc.UploadId)
  280. }
  281. }
  282. return completeOutput, err
  283. }
  284. func (obsClient ObsClient) resumeUpload(input *UploadFileInput, extensions []extensionOptions) (output *CompleteMultipartUploadOutput, err error) {
  285. uploadFileStat, err := os.Stat(input.UploadFile)
  286. if err != nil {
  287. doLog(LEVEL_ERROR, fmt.Sprintf("Failed to stat uploadFile with error: [%v].", err))
  288. return nil, err
  289. }
  290. if uploadFileStat.IsDir() {
  291. doLog(LEVEL_ERROR, "UploadFile can not be a folder.")
  292. return nil, errors.New("uploadFile can not be a folder")
  293. }
  294. ufc := &UploadCheckpoint{}
  295. var needCheckpoint = true
  296. var checkpointFilePath = input.CheckpointFile
  297. var enableCheckpoint = input.EnableCheckpoint
  298. if enableCheckpoint {
  299. needCheckpoint, err = getCheckpointFile(ufc, uploadFileStat, input, &obsClient, extensions)
  300. if err != nil {
  301. return nil, err
  302. }
  303. }
  304. if needCheckpoint {
  305. err = prepareUpload(ufc, uploadFileStat, input, &obsClient, extensions)
  306. if err != nil {
  307. return nil, err
  308. }
  309. if enableCheckpoint {
  310. err = updateCheckpointFile(ufc, checkpointFilePath)
  311. if err != nil {
  312. doLog(LEVEL_ERROR, "Failed to update checkpoint file with error [%v].", err)
  313. _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, &obsClient, extensions)
  314. if _err != nil {
  315. doLog(LEVEL_WARN, "Failed to abort task [%s].", ufc.UploadId)
  316. }
  317. return nil, err
  318. }
  319. }
  320. }
  321. uploadPartError := obsClient.uploadPartConcurrent(ufc, checkpointFilePath, input, extensions)
  322. err = handleUploadFileResult(uploadPartError, ufc, enableCheckpoint, &obsClient, extensions)
  323. if err != nil {
  324. return nil, err
  325. }
  326. completeOutput, err := completeParts(ufc, enableCheckpoint, checkpointFilePath, &obsClient, extensions)
  327. return completeOutput, err
  328. }
  329. func handleUploadTaskResult(result interface{}, ufc *UploadCheckpoint, partNum int, enableCheckpoint bool, checkpointFilePath string, lock *sync.Mutex) (err error) {
  330. if uploadPartOutput, ok := result.(*UploadPartOutput); ok {
  331. lock.Lock()
  332. defer lock.Unlock()
  333. ufc.UploadParts[partNum-1].Etag = uploadPartOutput.ETag
  334. ufc.UploadParts[partNum-1].IsCompleted = true
  335. if enableCheckpoint {
  336. _err := updateCheckpointFile(ufc, checkpointFilePath)
  337. if _err != nil {
  338. doLog(LEVEL_WARN, "Failed to update checkpoint file with error [%v].", _err)
  339. }
  340. }
  341. } else if result != errAbort {
  342. if _err, ok := result.(error); ok {
  343. err = _err
  344. }
  345. }
  346. return
  347. }
  348. func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpointFilePath string, input *UploadFileInput, extensions []extensionOptions) error {
  349. pool := NewRoutinePool(input.TaskNum, MAX_PART_NUM)
  350. var uploadPartError atomic.Value
  351. var errFlag int32
  352. var abort int32
  353. lock := new(sync.Mutex)
  354. for _, uploadPart := range ufc.UploadParts {
  355. if atomic.LoadInt32(&abort) == 1 {
  356. break
  357. }
  358. if uploadPart.IsCompleted {
  359. continue
  360. }
  361. task := uploadPartTask{
  362. UploadPartInput: UploadPartInput{
  363. Bucket: ufc.Bucket,
  364. Key: ufc.Key,
  365. PartNumber: uploadPart.PartNumber,
  366. UploadId: ufc.UploadId,
  367. SseHeader: input.SseHeader,
  368. SourceFile: input.UploadFile,
  369. Offset: uploadPart.Offset,
  370. PartSize: uploadPart.PartSize,
  371. },
  372. obsClient: &obsClient,
  373. abort: &abort,
  374. extensions: extensions,
  375. enableCheckpoint: input.EnableCheckpoint,
  376. }
  377. pool.ExecuteFunc(func() interface{} {
  378. result := task.Run()
  379. err := handleUploadTaskResult(result, ufc, task.PartNumber, input.EnableCheckpoint, input.CheckpointFile, lock)
  380. if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) {
  381. uploadPartError.Store(err)
  382. }
  383. return nil
  384. })
  385. }
  386. pool.ShutDown()
  387. if err, ok := uploadPartError.Load().(error); ok {
  388. return err
  389. }
  390. return nil
  391. }
  392. // ObjectInfo defines download object info
  393. type ObjectInfo struct {
  394. XMLName xml.Name `xml:"ObjectInfo"`
  395. LastModified int64 `xml:"LastModified"`
  396. Size int64 `xml:"Size"`
  397. ETag string `xml:"ETag"`
  398. }
  399. // TempFileInfo defines temp download file properties
  400. type TempFileInfo struct {
  401. XMLName xml.Name `xml:"TempFileInfo"`
  402. TempFileUrl string `xml:"TempFileUrl"`
  403. Size int64 `xml:"Size"`
  404. }
  405. // DownloadPartInfo defines download part properties
  406. type DownloadPartInfo struct {
  407. XMLName xml.Name `xml:"DownloadPart"`
  408. PartNumber int64 `xml:"PartNumber"`
  409. RangeEnd int64 `xml:"RangeEnd"`
  410. Offset int64 `xml:"Offset"`
  411. IsCompleted bool `xml:"IsCompleted"`
  412. }
  413. // DownloadCheckpoint defines download checkpoint file properties
  414. type DownloadCheckpoint struct {
  415. XMLName xml.Name `xml:"DownloadFileCheckpoint"`
  416. Bucket string `xml:"Bucket"`
  417. Key string `xml:"Key"`
  418. VersionId string `xml:"VersionId,omitempty"`
  419. DownloadFile string `xml:"FileUrl"`
  420. ObjectInfo ObjectInfo `xml:"ObjectInfo"`
  421. TempFileInfo TempFileInfo `xml:"TempFileInfo"`
  422. DownloadParts []DownloadPartInfo `xml:"DownloadParts>DownloadPart"`
  423. }
  424. func (dfc *DownloadCheckpoint) isValid(input *DownloadFileInput, output *GetObjectMetadataOutput) bool {
  425. if dfc.Bucket != input.Bucket || dfc.Key != input.Key || dfc.VersionId != input.VersionId || dfc.DownloadFile != input.DownloadFile {
  426. doLog(LEVEL_INFO, "Checkpoint file is invalid, the bucketName or objectKey or downloadFile was changed. clear the record.")
  427. return false
  428. }
  429. if dfc.ObjectInfo.LastModified != output.LastModified.Unix() || dfc.ObjectInfo.ETag != output.ETag || dfc.ObjectInfo.Size != output.ContentLength {
  430. doLog(LEVEL_INFO, "Checkpoint file is invalid, the object info was changed. clear the record.")
  431. return false
  432. }
  433. if dfc.TempFileInfo.Size != output.ContentLength {
  434. doLog(LEVEL_INFO, "Checkpoint file is invalid, size was changed. clear the record.")
  435. return false
  436. }
  437. stat, err := os.Stat(dfc.TempFileInfo.TempFileUrl)
  438. if err != nil || stat.Size() != dfc.ObjectInfo.Size {
  439. doLog(LEVEL_INFO, "Checkpoint file is invalid, the temp download file was changed. clear the record.")
  440. return false
  441. }
  442. return true
  443. }
  444. type downloadPartTask struct {
  445. GetObjectInput
  446. obsClient *ObsClient
  447. extensions []extensionOptions
  448. abort *int32
  449. partNumber int64
  450. tempFileURL string
  451. enableCheckpoint bool
  452. }
  453. func (task *downloadPartTask) Run() interface{} {
  454. if atomic.LoadInt32(task.abort) == 1 {
  455. return errAbort
  456. }
  457. getObjectInput := &GetObjectInput{}
  458. getObjectInput.GetObjectMetadataInput = task.GetObjectMetadataInput
  459. getObjectInput.IfMatch = task.IfMatch
  460. getObjectInput.IfNoneMatch = task.IfNoneMatch
  461. getObjectInput.IfModifiedSince = task.IfModifiedSince
  462. getObjectInput.IfUnmodifiedSince = task.IfUnmodifiedSince
  463. getObjectInput.RangeStart = task.RangeStart
  464. getObjectInput.RangeEnd = task.RangeEnd
  465. var output *GetObjectOutput
  466. var err error
  467. if task.extensions != nil {
  468. output, err = task.obsClient.GetObject(getObjectInput, task.extensions...)
  469. } else {
  470. output, err = task.obsClient.GetObject(getObjectInput)
  471. }
  472. if err == nil {
  473. defer func() {
  474. errMsg := output.Body.Close()
  475. if errMsg != nil {
  476. doLog(LEVEL_WARN, "Failed to close response body.")
  477. }
  478. }()
  479. _err := updateDownloadFile(task.tempFileURL, task.RangeStart, output)
  480. if _err != nil {
  481. if !task.enableCheckpoint {
  482. atomic.CompareAndSwapInt32(task.abort, 0, 1)
  483. doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.partNumber)
  484. }
  485. return _err
  486. }
  487. return output
  488. } else if obsError, ok := err.(ObsError); ok && obsError.StatusCode >= 400 && obsError.StatusCode < 500 {
  489. atomic.CompareAndSwapInt32(task.abort, 0, 1)
  490. doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.partNumber)
  491. }
  492. return err
  493. }
  494. func getObjectInfo(input *DownloadFileInput, obsClient *ObsClient, extensions []extensionOptions) (getObjectmetaOutput *GetObjectMetadataOutput, err error) {
  495. if extensions != nil {
  496. getObjectmetaOutput, err = obsClient.GetObjectMetadata(&input.GetObjectMetadataInput, extensions...)
  497. } else {
  498. getObjectmetaOutput, err = obsClient.GetObjectMetadata(&input.GetObjectMetadataInput)
  499. }
  500. return
  501. }
  502. func getDownloadCheckpointFile(dfc *DownloadCheckpoint, input *DownloadFileInput, output *GetObjectMetadataOutput) (needCheckpoint bool, err error) {
  503. checkpointFilePath := input.CheckpointFile
  504. checkpointFileStat, err := os.Stat(checkpointFilePath)
  505. if err != nil {
  506. doLog(LEVEL_DEBUG, fmt.Sprintf("Stat checkpoint file failed with error: [%v].", err))
  507. return true, nil
  508. }
  509. if checkpointFileStat.IsDir() {
  510. doLog(LEVEL_ERROR, "Checkpoint file can not be a folder.")
  511. return false, errors.New("checkpoint file can not be a folder")
  512. }
  513. err = loadCheckpointFile(checkpointFilePath, dfc)
  514. if err != nil {
  515. doLog(LEVEL_WARN, fmt.Sprintf("Load checkpoint file failed with error: [%v].", err))
  516. return true, nil
  517. } else if !dfc.isValid(input, output) {
  518. if dfc.TempFileInfo.TempFileUrl != "" {
  519. _err := os.Remove(dfc.TempFileInfo.TempFileUrl)
  520. if _err != nil {
  521. doLog(LEVEL_WARN, "Failed to remove temp download file with error [%v].", _err)
  522. }
  523. }
  524. _err := os.Remove(checkpointFilePath)
  525. if _err != nil {
  526. doLog(LEVEL_WARN, "Failed to remove checkpoint file with error [%v].", _err)
  527. }
  528. } else {
  529. return false, nil
  530. }
  531. return true, nil
  532. }
  533. func sliceObject(objectSize, partSize int64, dfc *DownloadCheckpoint) {
  534. cnt := objectSize / partSize
  535. if objectSize%partSize > 0 {
  536. cnt++
  537. }
  538. if cnt == 0 {
  539. downloadPart := DownloadPartInfo{}
  540. downloadPart.PartNumber = 1
  541. dfc.DownloadParts = []DownloadPartInfo{downloadPart}
  542. } else {
  543. downloadParts := make([]DownloadPartInfo, 0, cnt)
  544. var i int64
  545. for i = 0; i < cnt; i++ {
  546. downloadPart := DownloadPartInfo{}
  547. downloadPart.PartNumber = i + 1
  548. downloadPart.Offset = i * partSize
  549. downloadPart.RangeEnd = (i+1)*partSize - 1
  550. downloadParts = append(downloadParts, downloadPart)
  551. }
  552. dfc.DownloadParts = downloadParts
  553. if value := objectSize % partSize; value > 0 {
  554. dfc.DownloadParts[cnt-1].RangeEnd = dfc.ObjectInfo.Size - 1
  555. }
  556. }
  557. }
  558. func createFile(tempFileURL string, fileSize int64) error {
  559. fd, err := syscall.Open(tempFileURL, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
  560. if err != nil {
  561. doLog(LEVEL_WARN, "Failed to open temp download file [%s].", tempFileURL)
  562. return err
  563. }
  564. defer func() {
  565. errMsg := syscall.Close(fd)
  566. if errMsg != nil {
  567. doLog(LEVEL_WARN, "Failed to close file with error [%v].", errMsg)
  568. }
  569. }()
  570. err = syscall.Ftruncate(fd, fileSize)
  571. if err != nil {
  572. doLog(LEVEL_WARN, "Failed to create file with error [%v].", err)
  573. }
  574. return err
  575. }
  576. func prepareTempFile(tempFileURL string, fileSize int64) error {
  577. parentDir := filepath.Dir(tempFileURL)
  578. stat, err := os.Stat(parentDir)
  579. if err != nil {
  580. doLog(LEVEL_DEBUG, "Failed to stat path with error [%v].", err)
  581. _err := os.MkdirAll(parentDir, os.ModePerm)
  582. if _err != nil {
  583. doLog(LEVEL_ERROR, "Failed to make dir with error [%v].", _err)
  584. return _err
  585. }
  586. } else if !stat.IsDir() {
  587. doLog(LEVEL_ERROR, "Cannot create folder [%s] due to a same file exists.", parentDir)
  588. return fmt.Errorf("cannot create folder [%s] due to a same file exists", parentDir)
  589. }
  590. err = createFile(tempFileURL, fileSize)
  591. if err == nil {
  592. return nil
  593. }
  594. fd, err := os.OpenFile(tempFileURL, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
  595. if err != nil {
  596. doLog(LEVEL_ERROR, "Failed to open temp download file [%s].", tempFileURL)
  597. return err
  598. }
  599. defer func() {
  600. errMsg := fd.Close()
  601. if errMsg != nil {
  602. doLog(LEVEL_WARN, "Failed to close file with error [%v].", errMsg)
  603. }
  604. }()
  605. if fileSize > 0 {
  606. _, err = fd.WriteAt([]byte("a"), fileSize-1)
  607. if err != nil {
  608. doLog(LEVEL_ERROR, "Failed to create temp download file with error [%v].", err)
  609. return err
  610. }
  611. }
  612. return nil
  613. }
  614. func handleDownloadFileResult(tempFileURL string, enableCheckpoint bool, downloadFileError error) error {
  615. if downloadFileError != nil {
  616. if !enableCheckpoint {
  617. _err := os.Remove(tempFileURL)
  618. if _err != nil {
  619. doLog(LEVEL_WARN, "Failed to remove temp download file with error [%v].", _err)
  620. }
  621. }
  622. return downloadFileError
  623. }
  624. return nil
  625. }
  626. func (obsClient ObsClient) resumeDownload(input *DownloadFileInput, extensions []extensionOptions) (output *GetObjectMetadataOutput, err error) {
  627. getObjectmetaOutput, err := getObjectInfo(input, &obsClient, extensions)
  628. if err != nil {
  629. return nil, err
  630. }
  631. objectSize := getObjectmetaOutput.ContentLength
  632. partSize := input.PartSize
  633. dfc := &DownloadCheckpoint{}
  634. var needCheckpoint = true
  635. var checkpointFilePath = input.CheckpointFile
  636. var enableCheckpoint = input.EnableCheckpoint
  637. if enableCheckpoint {
  638. needCheckpoint, err = getDownloadCheckpointFile(dfc, input, getObjectmetaOutput)
  639. if err != nil {
  640. return nil, err
  641. }
  642. }
  643. if needCheckpoint {
  644. dfc.Bucket = input.Bucket
  645. dfc.Key = input.Key
  646. dfc.VersionId = input.VersionId
  647. dfc.DownloadFile = input.DownloadFile
  648. dfc.ObjectInfo = ObjectInfo{}
  649. dfc.ObjectInfo.LastModified = getObjectmetaOutput.LastModified.Unix()
  650. dfc.ObjectInfo.Size = getObjectmetaOutput.ContentLength
  651. dfc.ObjectInfo.ETag = getObjectmetaOutput.ETag
  652. dfc.TempFileInfo = TempFileInfo{}
  653. dfc.TempFileInfo.TempFileUrl = input.DownloadFile + ".tmp"
  654. dfc.TempFileInfo.Size = getObjectmetaOutput.ContentLength
  655. sliceObject(objectSize, partSize, dfc)
  656. _err := prepareTempFile(dfc.TempFileInfo.TempFileUrl, dfc.TempFileInfo.Size)
  657. if _err != nil {
  658. return nil, _err
  659. }
  660. if enableCheckpoint {
  661. _err := updateCheckpointFile(dfc, checkpointFilePath)
  662. if _err != nil {
  663. doLog(LEVEL_ERROR, "Failed to update checkpoint file with error [%v].", _err)
  664. _errMsg := os.Remove(dfc.TempFileInfo.TempFileUrl)
  665. if _errMsg != nil {
  666. doLog(LEVEL_WARN, "Failed to remove temp download file with error [%v].", _errMsg)
  667. }
  668. return nil, _err
  669. }
  670. }
  671. }
  672. downloadFileError := obsClient.downloadFileConcurrent(input, dfc, extensions)
  673. err = handleDownloadFileResult(dfc.TempFileInfo.TempFileUrl, enableCheckpoint, downloadFileError)
  674. if err != nil {
  675. return nil, err
  676. }
  677. err = os.Rename(dfc.TempFileInfo.TempFileUrl, input.DownloadFile)
  678. if err != nil {
  679. doLog(LEVEL_ERROR, "Failed to rename temp download file [%s] to download file [%s] with error [%v].", dfc.TempFileInfo.TempFileUrl, input.DownloadFile, err)
  680. return nil, err
  681. }
  682. if enableCheckpoint {
  683. err = os.Remove(checkpointFilePath)
  684. if err != nil {
  685. doLog(LEVEL_WARN, "Download file successfully, but remove checkpoint file failed with error [%v].", err)
  686. }
  687. }
  688. return getObjectmetaOutput, nil
  689. }
  690. func updateDownloadFile(filePath string, rangeStart int64, output *GetObjectOutput) error {
  691. fd, err := os.OpenFile(filePath, os.O_WRONLY, 0666)
  692. if err != nil {
  693. doLog(LEVEL_ERROR, "Failed to open file [%s].", filePath)
  694. return err
  695. }
  696. defer func() {
  697. errMsg := fd.Close()
  698. if errMsg != nil {
  699. doLog(LEVEL_WARN, "Failed to close file with error [%v].", errMsg)
  700. }
  701. }()
  702. _, err = fd.Seek(rangeStart, 0)
  703. if err != nil {
  704. doLog(LEVEL_ERROR, "Failed to seek file with error [%v].", err)
  705. return err
  706. }
  707. fileWriter := bufio.NewWriterSize(fd, 65536)
  708. part := make([]byte, 8192)
  709. var readErr error
  710. var readCount int
  711. for {
  712. readCount, readErr = output.Body.Read(part)
  713. if readCount > 0 {
  714. wcnt, werr := fileWriter.Write(part[0:readCount])
  715. if werr != nil {
  716. doLog(LEVEL_ERROR, "Failed to write to file with error [%v].", werr)
  717. return werr
  718. }
  719. if wcnt != readCount {
  720. doLog(LEVEL_ERROR, "Failed to write to file [%s], expect: [%d], actual: [%d]", filePath, readCount, wcnt)
  721. return fmt.Errorf("Failed to write to file [%s], expect: [%d], actual: [%d]", filePath, readCount, wcnt)
  722. }
  723. }
  724. if readErr != nil {
  725. if readErr != io.EOF {
  726. doLog(LEVEL_ERROR, "Failed to read response body with error [%v].", readErr)
  727. return readErr
  728. }
  729. break
  730. }
  731. }
  732. err = fileWriter.Flush()
  733. if err != nil {
  734. doLog(LEVEL_ERROR, "Failed to flush file with error [%v].", err)
  735. return err
  736. }
  737. return nil
  738. }
  739. func handleDownloadTaskResult(result interface{}, dfc *DownloadCheckpoint, partNum int64, enableCheckpoint bool, checkpointFile string, lock *sync.Mutex) (err error) {
  740. if _, ok := result.(*GetObjectOutput); ok {
  741. lock.Lock()
  742. defer lock.Unlock()
  743. dfc.DownloadParts[partNum-1].IsCompleted = true
  744. if enableCheckpoint {
  745. _err := updateCheckpointFile(dfc, checkpointFile)
  746. if _err != nil {
  747. doLog(LEVEL_WARN, "Failed to update checkpoint file with error [%v].", _err)
  748. }
  749. }
  750. } else if result != errAbort {
  751. if _err, ok := result.(error); ok {
  752. err = _err
  753. }
  754. }
  755. return
  756. }
  757. func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc *DownloadCheckpoint, extensions []extensionOptions) error {
  758. pool := NewRoutinePool(input.TaskNum, MAX_PART_NUM)
  759. var downloadPartError atomic.Value
  760. var errFlag int32
  761. var abort int32
  762. lock := new(sync.Mutex)
  763. for _, downloadPart := range dfc.DownloadParts {
  764. if atomic.LoadInt32(&abort) == 1 {
  765. break
  766. }
  767. if downloadPart.IsCompleted {
  768. continue
  769. }
  770. task := downloadPartTask{
  771. GetObjectInput: GetObjectInput{
  772. GetObjectMetadataInput: input.GetObjectMetadataInput,
  773. IfMatch: input.IfMatch,
  774. IfNoneMatch: input.IfNoneMatch,
  775. IfUnmodifiedSince: input.IfUnmodifiedSince,
  776. IfModifiedSince: input.IfModifiedSince,
  777. RangeStart: downloadPart.Offset,
  778. RangeEnd: downloadPart.RangeEnd,
  779. },
  780. obsClient: &obsClient,
  781. extensions: extensions,
  782. abort: &abort,
  783. partNumber: downloadPart.PartNumber,
  784. tempFileURL: dfc.TempFileInfo.TempFileUrl,
  785. enableCheckpoint: input.EnableCheckpoint,
  786. }
  787. pool.ExecuteFunc(func() interface{} {
  788. result := task.Run()
  789. err := handleDownloadTaskResult(result, dfc, task.partNumber, input.EnableCheckpoint, input.CheckpointFile, lock)
  790. if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) {
  791. downloadPartError.Store(err)
  792. }
  793. return nil
  794. })
  795. }
  796. pool.ShutDown()
  797. if err, ok := downloadPartError.Load().(error); ok {
  798. return err
  799. }
  800. return nil
  801. }