diff --git a/modules/obs/temporary.go b/modules/obs/temporary.go index 7a2ad9b64..dfb87ffc6 100755 --- a/modules/obs/temporary.go +++ b/modules/obs/temporary.go @@ -791,8 +791,8 @@ func (obsClient ObsClient) GetBucketRequestPaymentWithSignedUrl(signedUrl string } -func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uploadId string, partNumber int, partSize int64) (string, error) { - requestURL := "" +func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uploadId string, partNumber int, partSize int64) (*http.Request, error) { + var req *http.Request input := &UploadPartInput{} input.Bucket = bucketName @@ -808,7 +808,7 @@ func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uplo params, headers, _, err := input.trans(obsClient.conf.signature == SignatureObs) if err != nil { - return requestURL, err + return req, err } if params == nil { @@ -833,10 +833,63 @@ func (obsClient ObsClient) CreateUploadPartSignedUrl(bucketName, objectKey, uplo headers["Content-Length"] = []string{com.ToStr(partNumber,10)} - requestURL, err = obsClient.doAuth(HTTP_PUT, bucketName, objectKey, params, headers, "") + requestURL, err := obsClient.doAuth(HTTP_PUT, bucketName, objectKey, params, headers, "") if err != nil { - return requestURL, nil + return req, nil } - return requestURL, nil + var _data io.Reader + req, err = http.NewRequest(HTTP_PUT, requestURL, _data) + if obsClient.conf.ctx != nil { + req = req.WithContext(obsClient.conf.ctx) + } + if err != nil { + return req, err + } + + if isDebugLogEnabled() { + auth := headers[HEADER_AUTH_CAMEL] + delete(headers, HEADER_AUTH_CAMEL) + + var isSecurityToken bool + var securityToken []string + if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_AMZ]; isSecurityToken { + headers[HEADER_STS_TOKEN_AMZ] = []string{"******"} + } else if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_OBS]; isSecurityToken { + headers[HEADER_STS_TOKEN_OBS] = []string{"******"} + } + doLog(LEVEL_DEBUG, "Request headers: %v", headers) + headers[HEADER_AUTH_CAMEL] = auth + if isSecurityToken { + if obsClient.conf.signature == SignatureObs { + headers[HEADER_STS_TOKEN_OBS] = securityToken + } else { + headers[HEADER_STS_TOKEN_AMZ] = securityToken + } + } + } + + for key, value := range headers { + if key == HEADER_HOST_CAMEL { + req.Host = value[0] + delete(headers, key) + } else if key == HEADER_CONTENT_LENGTH_CAMEL { + req.ContentLength = StringToInt64(value[0], -1) + delete(headers, key) + } else { + req.Header[key] = value + } + } + + var lastRequest *http.Request + lastRequest = req + + req.Header[HEADER_USER_AGENT_CAMEL] = []string{USER_AGENT} + + if lastRequest != nil { + req.Host = lastRequest.Host + req.ContentLength = lastRequest.ContentLength + } + + return req, nil } diff --git a/modules/storage/obs.go b/modules/storage/obs.go index 0695dc62c..c80b9612f 100755 --- a/modules/storage/obs.go +++ b/modules/storage/obs.go @@ -140,14 +140,15 @@ func ObsGenMultiPartSignedUrl(uuid string, uploadId string, partNumber int, part */ Key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") - url, err := ObsCli.CreateUploadPartSignedUrl(setting.Bucket, Key, uploadId, partNumber, partSize) + req, err := ObsCli.CreateUploadPartSignedUrl(setting.Bucket, Key, uploadId, partNumber, partSize) if err != nil { log.Error("CreateSignedUrl failed:", err.Error()) return "", err } - log.Info(url) + log.Info(req.URL.String()) + log.Info("", req.Header) - return url, nil + return req.URL.String(), nil } diff --git a/routers/repo/attachment.go b/routers/repo/attachment.go index 9dcecb8c3..2be78a2e0 100755 --- a/routers/repo/attachment.go +++ b/routers/repo/attachment.go @@ -11,6 +11,7 @@ import ( "fmt" "mime/multipart" "net/http" + "path" "strconv" "strings" @@ -275,13 +276,29 @@ func GetPresignedPutObjectURL(ctx *context.Context) { // AddAttachment response for add attachment record func AddAttachment(ctx *context.Context) { - uuid := ctx.Query("uuid") - has, err := storage.Attachments.HasObject(models.AttachmentRelativePath(uuid)) + typeCloudBrain := ctx.QueryInt("type") + err := checkTypeCloudBrain(typeCloudBrain) if err != nil { - ctx.ServerError("HasObject", err) + ctx.ServerError("checkTypeCloudBrain failed", err) return } + uuid := ctx.Query("uuid") + has := false + if typeCloudBrain == models.TypeCloudBrainOne { + has, err = storage.Attachments.HasObject(models.AttachmentRelativePath(uuid)) + if err != nil { + ctx.ServerError("HasObject", err) + return + } + } else { + has, err = storage.ObsHasObject(models.AttachmentRelativePath(uuid)) + if err != nil { + ctx.ServerError("ObsHasObject", err) + return + } + } + if !has { ctx.Error(404, "attachment has not been uploaded") return @@ -294,6 +311,7 @@ func AddAttachment(ctx *context.Context) { Name: ctx.Query("file_name"), Size: ctx.QueryInt64("size"), DatasetID: ctx.QueryInt64("dataset_id"), + Type: typeCloudBrain, }) if err != nil { @@ -303,16 +321,19 @@ func AddAttachment(ctx *context.Context) { if attachment.DatasetID != 0 { if strings.HasSuffix(attachment.Name, ".zip") { - err = worker.SendDecompressTask(contexExt.Background(), uuid) - if err != nil { - log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error()) - } else { - attachment.DecompressState = models.DecompressStateIng - err = models.UpdateAttachment(attachment) + if typeCloudBrain == models.TypeCloudBrainOne { + err = worker.SendDecompressTask(contexExt.Background(), uuid) if err != nil { - log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error()) + log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error()) + } else { + attachment.DecompressState = models.DecompressStateIng + err = models.UpdateAttachment(attachment) + if err != nil { + log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error()) + } } } + //todo:decompress type_two } } @@ -581,6 +602,16 @@ func GetMultipartUploadUrl(ctx *context.Context) { }) } +func GetObsKey(ctx *context.Context) { + uuid := gouuid.NewV4().String() + key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") + + ctx.JSON(200, map[string]string{ + "uuid": uuid, + "key": key, + }) +} + func UploadPart(ctx *context.Context) { tmp, err := ctx.Req.Body().String() log.Info(tmp) diff --git a/routers/routes/routes.go b/routers/routes/routes.go index 7e29ebfa5..a49dd4b65 100755 --- a/routers/routes/routes.go +++ b/routers/routes/routes.go @@ -530,6 +530,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Post("/complete_multipart", repo.CompleteMultipart) m.Post("/update_chunk", repo.UpdateMultipart) m.Post("/upload_part", repo.UploadPart) + m.Get("/get_obs_key", repo.GetObsKey) }, reqSignIn) m.Group("/attachments", func() {