// Copyright 2019 Huawei Technologies Co.,Ltd. // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the // License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. //nolint:golint, unused package obs import ( "bufio" "encoding/xml" "errors" "fmt" "io" "io/ioutil" "os" "path/filepath" "sync" "sync/atomic" "syscall" ) var errAbort = errors.New("AbortError") // FileStatus defines the upload file properties type FileStatus struct { XMLName xml.Name `xml:"FileInfo"` LastModified int64 `xml:"LastModified"` Size int64 `xml:"Size"` } // UploadPartInfo defines the upload part properties type UploadPartInfo struct { XMLName xml.Name `xml:"UploadPart"` PartNumber int `xml:"PartNumber"` Etag string `xml:"Etag"` PartSize int64 `xml:"PartSize"` Offset int64 `xml:"Offset"` IsCompleted bool `xml:"IsCompleted"` } // UploadCheckpoint defines the upload checkpoint file properties type UploadCheckpoint struct { XMLName xml.Name `xml:"UploadFileCheckpoint"` Bucket string `xml:"Bucket"` Key string `xml:"Key"` UploadId string `xml:"UploadId,omitempty"` UploadFile string `xml:"FileUrl"` FileInfo FileStatus `xml:"FileInfo"` UploadParts []UploadPartInfo `xml:"UploadParts>UploadPart"` } func (ufc *UploadCheckpoint) isValid(bucket, key, uploadFile string, fileStat os.FileInfo) bool { if ufc.Bucket != bucket || ufc.Key != key || ufc.UploadFile != uploadFile { doLog(LEVEL_INFO, "Checkpoint file is invalid, the bucketName or objectKey or uploadFile was changed. clear the record.") return false } if ufc.FileInfo.Size != fileStat.Size() || ufc.FileInfo.LastModified != fileStat.ModTime().Unix() { doLog(LEVEL_INFO, "Checkpoint file is invalid, the uploadFile was changed. clear the record.") return false } if ufc.UploadId == "" { doLog(LEVEL_INFO, "UploadId is invalid. clear the record.") return false } return true } type uploadPartTask struct { UploadPartInput obsClient *ObsClient abort *int32 extensions []extensionOptions enableCheckpoint bool } func (task *uploadPartTask) Run() interface{} { if atomic.LoadInt32(task.abort) == 1 { return errAbort } input := &UploadPartInput{} input.Bucket = task.Bucket input.Key = task.Key input.PartNumber = task.PartNumber input.UploadId = task.UploadId input.SseHeader = task.SseHeader input.SourceFile = task.SourceFile input.Offset = task.Offset input.PartSize = task.PartSize extensions := task.extensions var output *UploadPartOutput var err error if extensions != nil { output, err = task.obsClient.UploadPart(input, extensions...) } else { output, err = task.obsClient.UploadPart(input) } if err == nil { if output.ETag == "" { doLog(LEVEL_WARN, "Get invalid etag value after uploading part [%d].", task.PartNumber) if !task.enableCheckpoint { atomic.CompareAndSwapInt32(task.abort, 0, 1) doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.PartNumber) } return fmt.Errorf("get invalid etag value after uploading part [%d]", task.PartNumber) } return output } else if obsError, ok := err.(ObsError); ok && obsError.StatusCode >= 400 && obsError.StatusCode < 500 { atomic.CompareAndSwapInt32(task.abort, 0, 1) doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.PartNumber) } return err } func loadCheckpointFile(checkpointFile string, result interface{}) error { ret, err := ioutil.ReadFile(checkpointFile) if err != nil { return err } if len(ret) == 0 { return nil } return xml.Unmarshal(ret, result) } func updateCheckpointFile(fc interface{}, checkpointFilePath string) error { result, err := xml.Marshal(fc) if err != nil { return err } err = ioutil.WriteFile(checkpointFilePath, result, 0666) return err } func getCheckpointFile(ufc *UploadCheckpoint, uploadFileStat os.FileInfo, input *UploadFileInput, obsClient *ObsClient, extensions []extensionOptions) (needCheckpoint bool, err error) { checkpointFilePath := input.CheckpointFile checkpointFileStat, err := os.Stat(checkpointFilePath) if err != nil { doLog(LEVEL_DEBUG, fmt.Sprintf("Stat checkpoint file failed with error: [%v].", err)) return true, nil } if checkpointFileStat.IsDir() { doLog(LEVEL_ERROR, "Checkpoint file can not be a folder.") return false, errors.New("checkpoint file can not be a folder") } err = loadCheckpointFile(checkpointFilePath, ufc) if err != nil { doLog(LEVEL_WARN, fmt.Sprintf("Load checkpoint file failed with error: [%v].", err)) return true, nil } else if !ufc.isValid(input.Bucket, input.Key, input.UploadFile, uploadFileStat) { if ufc.Bucket != "" && ufc.Key != "" && ufc.UploadId != "" { _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, obsClient, extensions) if _err != nil { doLog(LEVEL_WARN, "Failed to abort upload task [%s].", ufc.UploadId) } } _err := os.Remove(checkpointFilePath) if _err != nil { doLog(LEVEL_WARN, fmt.Sprintf("Failed to remove checkpoint file with error: [%v].", _err)) } } else { return false, nil } return true, nil } func prepareUpload(ufc *UploadCheckpoint, uploadFileStat os.FileInfo, input *UploadFileInput, obsClient *ObsClient, extensions []extensionOptions) error { initiateInput := &InitiateMultipartUploadInput{} initiateInput.ObjectOperationInput = input.ObjectOperationInput initiateInput.ContentType = input.ContentType var output *InitiateMultipartUploadOutput var err error if extensions != nil { output, err = obsClient.InitiateMultipartUpload(initiateInput, extensions...) } else { output, err = obsClient.InitiateMultipartUpload(initiateInput) } if err != nil { return err } ufc.Bucket = input.Bucket ufc.Key = input.Key ufc.UploadFile = input.UploadFile ufc.FileInfo = FileStatus{} ufc.FileInfo.Size = uploadFileStat.Size() ufc.FileInfo.LastModified = uploadFileStat.ModTime().Unix() ufc.UploadId = output.UploadId err = sliceFile(input.PartSize, ufc) return err } func sliceFile(partSize int64, ufc *UploadCheckpoint) error { fileSize := ufc.FileInfo.Size cnt := fileSize / partSize if cnt >= 10000 { partSize = fileSize / 10000 if fileSize%10000 != 0 { partSize++ } cnt = fileSize / partSize } if fileSize%partSize != 0 { cnt++ } if partSize > MAX_PART_SIZE { doLog(LEVEL_ERROR, "The source upload file is too large") return fmt.Errorf("The source upload file is too large") } if cnt == 0 { uploadPart := UploadPartInfo{} uploadPart.PartNumber = 1 ufc.UploadParts = []UploadPartInfo{uploadPart} } else { uploadParts := make([]UploadPartInfo, 0, cnt) var i int64 for i = 0; i < cnt; i++ { uploadPart := UploadPartInfo{} uploadPart.PartNumber = int(i) + 1 uploadPart.PartSize = partSize uploadPart.Offset = i * partSize uploadParts = append(uploadParts, uploadPart) } if value := fileSize % partSize; value != 0 { uploadParts[cnt-1].PartSize = value } ufc.UploadParts = uploadParts } return nil } func abortTask(bucket, key, uploadID string, obsClient *ObsClient, extensions []extensionOptions) error { input := &AbortMultipartUploadInput{} input.Bucket = bucket input.Key = key input.UploadId = uploadID if extensions != nil { _, err := obsClient.AbortMultipartUpload(input, extensions...) return err } _, err := obsClient.AbortMultipartUpload(input) return err } func handleUploadFileResult(uploadPartError error, ufc *UploadCheckpoint, enableCheckpoint bool, obsClient *ObsClient, extensions []extensionOptions) error { if uploadPartError != nil { if enableCheckpoint { return uploadPartError } _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, obsClient, extensions) if _err != nil { doLog(LEVEL_WARN, "Failed to abort task [%s].", ufc.UploadId) } return uploadPartError } return nil } func completeParts(ufc *UploadCheckpoint, enableCheckpoint bool, checkpointFilePath string, obsClient *ObsClient, extensions []extensionOptions) (output *CompleteMultipartUploadOutput, err error) { completeInput := &CompleteMultipartUploadInput{} completeInput.Bucket = ufc.Bucket completeInput.Key = ufc.Key completeInput.UploadId = ufc.UploadId parts := make([]Part, 0, len(ufc.UploadParts)) for _, uploadPart := range ufc.UploadParts { part := Part{} part.PartNumber = uploadPart.PartNumber part.ETag = uploadPart.Etag parts = append(parts, part) } completeInput.Parts = parts var completeOutput *CompleteMultipartUploadOutput if extensions != nil { completeOutput, err = obsClient.CompleteMultipartUpload(completeInput, extensions...) } else { completeOutput, err = obsClient.CompleteMultipartUpload(completeInput) } if err == nil { if enableCheckpoint { _err := os.Remove(checkpointFilePath) if _err != nil { doLog(LEVEL_WARN, "Upload file successfully, but remove checkpoint file failed with error [%v].", _err) } } return completeOutput, err } if !enableCheckpoint { _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, obsClient, extensions) if _err != nil { doLog(LEVEL_WARN, "Failed to abort task [%s].", ufc.UploadId) } } return completeOutput, err } func (obsClient ObsClient) resumeUpload(input *UploadFileInput, extensions []extensionOptions) (output *CompleteMultipartUploadOutput, err error) { uploadFileStat, err := os.Stat(input.UploadFile) if err != nil { doLog(LEVEL_ERROR, fmt.Sprintf("Failed to stat uploadFile with error: [%v].", err)) return nil, err } if uploadFileStat.IsDir() { doLog(LEVEL_ERROR, "UploadFile can not be a folder.") return nil, errors.New("uploadFile can not be a folder") } ufc := &UploadCheckpoint{} var needCheckpoint = true var checkpointFilePath = input.CheckpointFile var enableCheckpoint = input.EnableCheckpoint if enableCheckpoint { needCheckpoint, err = getCheckpointFile(ufc, uploadFileStat, input, &obsClient, extensions) if err != nil { return nil, err } } if needCheckpoint { err = prepareUpload(ufc, uploadFileStat, input, &obsClient, extensions) if err != nil { return nil, err } if enableCheckpoint { err = updateCheckpointFile(ufc, checkpointFilePath) if err != nil { doLog(LEVEL_ERROR, "Failed to update checkpoint file with error [%v].", err) _err := abortTask(ufc.Bucket, ufc.Key, ufc.UploadId, &obsClient, extensions) if _err != nil { doLog(LEVEL_WARN, "Failed to abort task [%s].", ufc.UploadId) } return nil, err } } } uploadPartError := obsClient.uploadPartConcurrent(ufc, checkpointFilePath, input, extensions) err = handleUploadFileResult(uploadPartError, ufc, enableCheckpoint, &obsClient, extensions) if err != nil { return nil, err } completeOutput, err := completeParts(ufc, enableCheckpoint, checkpointFilePath, &obsClient, extensions) return completeOutput, err } func handleUploadTaskResult(result interface{}, ufc *UploadCheckpoint, partNum int, enableCheckpoint bool, checkpointFilePath string, lock *sync.Mutex) (err error) { if uploadPartOutput, ok := result.(*UploadPartOutput); ok { lock.Lock() defer lock.Unlock() ufc.UploadParts[partNum-1].Etag = uploadPartOutput.ETag ufc.UploadParts[partNum-1].IsCompleted = true if enableCheckpoint { _err := updateCheckpointFile(ufc, checkpointFilePath) if _err != nil { doLog(LEVEL_WARN, "Failed to update checkpoint file with error [%v].", _err) } } } else if result != errAbort { if _err, ok := result.(error); ok { err = _err } } return } func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpointFilePath string, input *UploadFileInput, extensions []extensionOptions) error { pool := NewRoutinePool(input.TaskNum, MAX_PART_NUM) var uploadPartError atomic.Value var errFlag int32 var abort int32 lock := new(sync.Mutex) for _, uploadPart := range ufc.UploadParts { if atomic.LoadInt32(&abort) == 1 { break } if uploadPart.IsCompleted { continue } task := uploadPartTask{ UploadPartInput: UploadPartInput{ Bucket: ufc.Bucket, Key: ufc.Key, PartNumber: uploadPart.PartNumber, UploadId: ufc.UploadId, SseHeader: input.SseHeader, SourceFile: input.UploadFile, Offset: uploadPart.Offset, PartSize: uploadPart.PartSize, }, obsClient: &obsClient, abort: &abort, extensions: extensions, enableCheckpoint: input.EnableCheckpoint, } pool.ExecuteFunc(func() interface{} { result := task.Run() err := handleUploadTaskResult(result, ufc, task.PartNumber, input.EnableCheckpoint, input.CheckpointFile, lock) if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) { uploadPartError.Store(err) } return nil }) } pool.ShutDown() if err, ok := uploadPartError.Load().(error); ok { return err } return nil } // ObjectInfo defines download object info type ObjectInfo struct { XMLName xml.Name `xml:"ObjectInfo"` LastModified int64 `xml:"LastModified"` Size int64 `xml:"Size"` ETag string `xml:"ETag"` } // TempFileInfo defines temp download file properties type TempFileInfo struct { XMLName xml.Name `xml:"TempFileInfo"` TempFileUrl string `xml:"TempFileUrl"` Size int64 `xml:"Size"` } // DownloadPartInfo defines download part properties type DownloadPartInfo struct { XMLName xml.Name `xml:"DownloadPart"` PartNumber int64 `xml:"PartNumber"` RangeEnd int64 `xml:"RangeEnd"` Offset int64 `xml:"Offset"` IsCompleted bool `xml:"IsCompleted"` } // DownloadCheckpoint defines download checkpoint file properties type DownloadCheckpoint struct { XMLName xml.Name `xml:"DownloadFileCheckpoint"` Bucket string `xml:"Bucket"` Key string `xml:"Key"` VersionId string `xml:"VersionId,omitempty"` DownloadFile string `xml:"FileUrl"` ObjectInfo ObjectInfo `xml:"ObjectInfo"` TempFileInfo TempFileInfo `xml:"TempFileInfo"` DownloadParts []DownloadPartInfo `xml:"DownloadParts>DownloadPart"` } func (dfc *DownloadCheckpoint) isValid(input *DownloadFileInput, output *GetObjectMetadataOutput) bool { if dfc.Bucket != input.Bucket || dfc.Key != input.Key || dfc.VersionId != input.VersionId || dfc.DownloadFile != input.DownloadFile { doLog(LEVEL_INFO, "Checkpoint file is invalid, the bucketName or objectKey or downloadFile was changed. clear the record.") return false } if dfc.ObjectInfo.LastModified != output.LastModified.Unix() || dfc.ObjectInfo.ETag != output.ETag || dfc.ObjectInfo.Size != output.ContentLength { doLog(LEVEL_INFO, "Checkpoint file is invalid, the object info was changed. clear the record.") return false } if dfc.TempFileInfo.Size != output.ContentLength { doLog(LEVEL_INFO, "Checkpoint file is invalid, size was changed. clear the record.") return false } stat, err := os.Stat(dfc.TempFileInfo.TempFileUrl) if err != nil || stat.Size() != dfc.ObjectInfo.Size { doLog(LEVEL_INFO, "Checkpoint file is invalid, the temp download file was changed. clear the record.") return false } return true } type downloadPartTask struct { GetObjectInput obsClient *ObsClient extensions []extensionOptions abort *int32 partNumber int64 tempFileURL string enableCheckpoint bool } func (task *downloadPartTask) Run() interface{} { if atomic.LoadInt32(task.abort) == 1 { return errAbort } getObjectInput := &GetObjectInput{} getObjectInput.GetObjectMetadataInput = task.GetObjectMetadataInput getObjectInput.IfMatch = task.IfMatch getObjectInput.IfNoneMatch = task.IfNoneMatch getObjectInput.IfModifiedSince = task.IfModifiedSince getObjectInput.IfUnmodifiedSince = task.IfUnmodifiedSince getObjectInput.RangeStart = task.RangeStart getObjectInput.RangeEnd = task.RangeEnd var output *GetObjectOutput var err error if task.extensions != nil { output, err = task.obsClient.GetObject(getObjectInput, task.extensions...) } else { output, err = task.obsClient.GetObject(getObjectInput) } if err == nil { defer func() { errMsg := output.Body.Close() if errMsg != nil { doLog(LEVEL_WARN, "Failed to close response body.") } }() _err := updateDownloadFile(task.tempFileURL, task.RangeStart, output) if _err != nil { if !task.enableCheckpoint { atomic.CompareAndSwapInt32(task.abort, 0, 1) doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.partNumber) } return _err } return output } else if obsError, ok := err.(ObsError); ok && obsError.StatusCode >= 400 && obsError.StatusCode < 500 { atomic.CompareAndSwapInt32(task.abort, 0, 1) doLog(LEVEL_WARN, "Task is aborted, part number is [%d]", task.partNumber) } return err } func getObjectInfo(input *DownloadFileInput, obsClient *ObsClient, extensions []extensionOptions) (getObjectmetaOutput *GetObjectMetadataOutput, err error) { if extensions != nil { getObjectmetaOutput, err = obsClient.GetObjectMetadata(&input.GetObjectMetadataInput, extensions...) } else { getObjectmetaOutput, err = obsClient.GetObjectMetadata(&input.GetObjectMetadataInput) } return } func getDownloadCheckpointFile(dfc *DownloadCheckpoint, input *DownloadFileInput, output *GetObjectMetadataOutput) (needCheckpoint bool, err error) { checkpointFilePath := input.CheckpointFile checkpointFileStat, err := os.Stat(checkpointFilePath) if err != nil { doLog(LEVEL_DEBUG, fmt.Sprintf("Stat checkpoint file failed with error: [%v].", err)) return true, nil } if checkpointFileStat.IsDir() { doLog(LEVEL_ERROR, "Checkpoint file can not be a folder.") return false, errors.New("checkpoint file can not be a folder") } err = loadCheckpointFile(checkpointFilePath, dfc) if err != nil { doLog(LEVEL_WARN, fmt.Sprintf("Load checkpoint file failed with error: [%v].", err)) return true, nil } else if !dfc.isValid(input, output) { if dfc.TempFileInfo.TempFileUrl != "" { _err := os.Remove(dfc.TempFileInfo.TempFileUrl) if _err != nil { doLog(LEVEL_WARN, "Failed to remove temp download file with error [%v].", _err) } } _err := os.Remove(checkpointFilePath) if _err != nil { doLog(LEVEL_WARN, "Failed to remove checkpoint file with error [%v].", _err) } } else { return false, nil } return true, nil } func sliceObject(objectSize, partSize int64, dfc *DownloadCheckpoint) { cnt := objectSize / partSize if objectSize%partSize > 0 { cnt++ } if cnt == 0 { downloadPart := DownloadPartInfo{} downloadPart.PartNumber = 1 dfc.DownloadParts = []DownloadPartInfo{downloadPart} } else { downloadParts := make([]DownloadPartInfo, 0, cnt) var i int64 for i = 0; i < cnt; i++ { downloadPart := DownloadPartInfo{} downloadPart.PartNumber = i + 1 downloadPart.Offset = i * partSize downloadPart.RangeEnd = (i+1)*partSize - 1 downloadParts = append(downloadParts, downloadPart) } dfc.DownloadParts = downloadParts if value := objectSize % partSize; value > 0 { dfc.DownloadParts[cnt-1].RangeEnd = dfc.ObjectInfo.Size - 1 } } } func createFile(tempFileURL string, fileSize int64) error { fd, err := syscall.Open(tempFileURL, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { doLog(LEVEL_WARN, "Failed to open temp download file [%s].", tempFileURL) return err } defer func() { errMsg := syscall.Close(fd) if errMsg != nil { doLog(LEVEL_WARN, "Failed to close file with error [%v].", errMsg) } }() err = syscall.Ftruncate(fd, fileSize) if err != nil { doLog(LEVEL_WARN, "Failed to create file with error [%v].", err) } return err } func prepareTempFile(tempFileURL string, fileSize int64) error { parentDir := filepath.Dir(tempFileURL) stat, err := os.Stat(parentDir) if err != nil { doLog(LEVEL_DEBUG, "Failed to stat path with error [%v].", err) _err := os.MkdirAll(parentDir, os.ModePerm) if _err != nil { doLog(LEVEL_ERROR, "Failed to make dir with error [%v].", _err) return _err } } else if !stat.IsDir() { doLog(LEVEL_ERROR, "Cannot create folder [%s] due to a same file exists.", parentDir) return fmt.Errorf("cannot create folder [%s] due to a same file exists", parentDir) } err = createFile(tempFileURL, fileSize) if err == nil { return nil } fd, err := os.OpenFile(tempFileURL, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { doLog(LEVEL_ERROR, "Failed to open temp download file [%s].", tempFileURL) return err } defer func() { errMsg := fd.Close() if errMsg != nil { doLog(LEVEL_WARN, "Failed to close file with error [%v].", errMsg) } }() if fileSize > 0 { _, err = fd.WriteAt([]byte("a"), fileSize-1) if err != nil { doLog(LEVEL_ERROR, "Failed to create temp download file with error [%v].", err) return err } } return nil } func handleDownloadFileResult(tempFileURL string, enableCheckpoint bool, downloadFileError error) error { if downloadFileError != nil { if !enableCheckpoint { _err := os.Remove(tempFileURL) if _err != nil { doLog(LEVEL_WARN, "Failed to remove temp download file with error [%v].", _err) } } return downloadFileError } return nil } func (obsClient ObsClient) resumeDownload(input *DownloadFileInput, extensions []extensionOptions) (output *GetObjectMetadataOutput, err error) { getObjectmetaOutput, err := getObjectInfo(input, &obsClient, extensions) if err != nil { return nil, err } objectSize := getObjectmetaOutput.ContentLength partSize := input.PartSize dfc := &DownloadCheckpoint{} var needCheckpoint = true var checkpointFilePath = input.CheckpointFile var enableCheckpoint = input.EnableCheckpoint if enableCheckpoint { needCheckpoint, err = getDownloadCheckpointFile(dfc, input, getObjectmetaOutput) if err != nil { return nil, err } } if needCheckpoint { dfc.Bucket = input.Bucket dfc.Key = input.Key dfc.VersionId = input.VersionId dfc.DownloadFile = input.DownloadFile dfc.ObjectInfo = ObjectInfo{} dfc.ObjectInfo.LastModified = getObjectmetaOutput.LastModified.Unix() dfc.ObjectInfo.Size = getObjectmetaOutput.ContentLength dfc.ObjectInfo.ETag = getObjectmetaOutput.ETag dfc.TempFileInfo = TempFileInfo{} dfc.TempFileInfo.TempFileUrl = input.DownloadFile + ".tmp" dfc.TempFileInfo.Size = getObjectmetaOutput.ContentLength sliceObject(objectSize, partSize, dfc) _err := prepareTempFile(dfc.TempFileInfo.TempFileUrl, dfc.TempFileInfo.Size) if _err != nil { return nil, _err } if enableCheckpoint { _err := updateCheckpointFile(dfc, checkpointFilePath) if _err != nil { doLog(LEVEL_ERROR, "Failed to update checkpoint file with error [%v].", _err) _errMsg := os.Remove(dfc.TempFileInfo.TempFileUrl) if _errMsg != nil { doLog(LEVEL_WARN, "Failed to remove temp download file with error [%v].", _errMsg) } return nil, _err } } } downloadFileError := obsClient.downloadFileConcurrent(input, dfc, extensions) err = handleDownloadFileResult(dfc.TempFileInfo.TempFileUrl, enableCheckpoint, downloadFileError) if err != nil { return nil, err } err = os.Rename(dfc.TempFileInfo.TempFileUrl, input.DownloadFile) if err != nil { doLog(LEVEL_ERROR, "Failed to rename temp download file [%s] to download file [%s] with error [%v].", dfc.TempFileInfo.TempFileUrl, input.DownloadFile, err) return nil, err } if enableCheckpoint { err = os.Remove(checkpointFilePath) if err != nil { doLog(LEVEL_WARN, "Download file successfully, but remove checkpoint file failed with error [%v].", err) } } return getObjectmetaOutput, nil } func updateDownloadFile(filePath string, rangeStart int64, output *GetObjectOutput) error { fd, err := os.OpenFile(filePath, os.O_WRONLY, 0666) if err != nil { doLog(LEVEL_ERROR, "Failed to open file [%s].", filePath) return err } defer func() { errMsg := fd.Close() if errMsg != nil { doLog(LEVEL_WARN, "Failed to close file with error [%v].", errMsg) } }() _, err = fd.Seek(rangeStart, 0) if err != nil { doLog(LEVEL_ERROR, "Failed to seek file with error [%v].", err) return err } fileWriter := bufio.NewWriterSize(fd, 65536) part := make([]byte, 8192) var readErr error var readCount int for { readCount, readErr = output.Body.Read(part) if readCount > 0 { wcnt, werr := fileWriter.Write(part[0:readCount]) if werr != nil { doLog(LEVEL_ERROR, "Failed to write to file with error [%v].", werr) return werr } if wcnt != readCount { doLog(LEVEL_ERROR, "Failed to write to file [%s], expect: [%d], actual: [%d]", filePath, readCount, wcnt) return fmt.Errorf("Failed to write to file [%s], expect: [%d], actual: [%d]", filePath, readCount, wcnt) } } if readErr != nil { if readErr != io.EOF { doLog(LEVEL_ERROR, "Failed to read response body with error [%v].", readErr) return readErr } break } } err = fileWriter.Flush() if err != nil { doLog(LEVEL_ERROR, "Failed to flush file with error [%v].", err) return err } return nil } func handleDownloadTaskResult(result interface{}, dfc *DownloadCheckpoint, partNum int64, enableCheckpoint bool, checkpointFile string, lock *sync.Mutex) (err error) { if _, ok := result.(*GetObjectOutput); ok { lock.Lock() defer lock.Unlock() dfc.DownloadParts[partNum-1].IsCompleted = true if enableCheckpoint { _err := updateCheckpointFile(dfc, checkpointFile) if _err != nil { doLog(LEVEL_WARN, "Failed to update checkpoint file with error [%v].", _err) } } } else if result != errAbort { if _err, ok := result.(error); ok { err = _err } } return } func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc *DownloadCheckpoint, extensions []extensionOptions) error { pool := NewRoutinePool(input.TaskNum, MAX_PART_NUM) var downloadPartError atomic.Value var errFlag int32 var abort int32 lock := new(sync.Mutex) for _, downloadPart := range dfc.DownloadParts { if atomic.LoadInt32(&abort) == 1 { break } if downloadPart.IsCompleted { continue } task := downloadPartTask{ GetObjectInput: GetObjectInput{ GetObjectMetadataInput: input.GetObjectMetadataInput, IfMatch: input.IfMatch, IfNoneMatch: input.IfNoneMatch, IfUnmodifiedSince: input.IfUnmodifiedSince, IfModifiedSince: input.IfModifiedSince, RangeStart: downloadPart.Offset, RangeEnd: downloadPart.RangeEnd, }, obsClient: &obsClient, extensions: extensions, abort: &abort, partNumber: downloadPart.PartNumber, tempFileURL: dfc.TempFileInfo.TempFileUrl, enableCheckpoint: input.EnableCheckpoint, } pool.ExecuteFunc(func() interface{} { result := task.Run() err := handleDownloadTaskResult(result, dfc, task.partNumber, input.EnableCheckpoint, input.CheckpointFile, lock) if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) { downloadPartError.Store(err) } return nil }) } pool.ShutDown() if err, ok := downloadPartError.Load().(error); ok { return err } return nil }