Browse Source

Merge pull request '优化obs上传方案' (#161) from modelarts into develop

Reviewed-by: 史梦园 <1729788216@qq.com>
master
yuyuanshifu 4 years ago
parent
commit
bb399c1902
5 changed files with 341 additions and 233 deletions
  1. +10
    -40
      modules/storage/obs.go
  2. +11
    -42
      routers/repo/attachment.go
  3. +0
    -1
      routers/routes/routes.go
  4. +6
    -2
      web_src/js/components/MinioUploader.vue
  5. +314
    -148
      web_src/js/components/ObsUploader.vue

+ 10
- 40
modules/storage/obs.go View File

@@ -5,7 +5,7 @@
package storage

import (
"io"
"github.com/unknwon/com"
"path"
"strconv"
"strings"
@@ -102,57 +102,27 @@ func CompleteObsMultiPartUpload(uuid string, uploadID string) error {
return nil
}

func ObsUploadPart(uuid string, uploadId string, partNumber int, partSize int64, body io.Reader) (string, error) {
input := &obs.UploadPartInput{}
input.PartNumber = partNumber
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, uuid)), "/")
input.UploadId = uploadId
input.Bucket = setting.Bucket
input.PartSize = partSize
input.Body = body
output, err := ObsCli.UploadPart(input)
if err != nil {
log.Error("UploadPart failed:", err.Error())
return "", err
}

return output.ETag, nil
}

func ObsGenMultiPartSignedUrl(uuid string, uploadId string, partNumber int, partSize int64) (string, error) {
/*

input := &obs.CreateSignedUrlInput{}
input.Bucket = setting.Bucket
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/")
input.Expires = int(PresignedUploadPartUrlExpireTime)
input.Method = obs.HTTP_PUT
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, uuid)), "/")
input.Expires = 60 * 60
input.Method = obs.HttpMethodPut

input.QueryParams = map[string]string{
"Bucket": input.Bucket,
"Key": input.Key,
"PartNumber": com.ToStr(partNumber,10),
"UploadId": uploadId,
"PartSize": com.ToStr(partSize,10),
"partNumber": com.ToStr(partNumber,10),
"uploadId": uploadId,
//"partSize": com.ToStr(partSize,10),
}

input.Headers = map[string]string{

}

*/

Key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, uuid)), "/")
req, err := ObsCli.CreateUploadPartSignedUrl(setting.Bucket, Key, uploadId, partNumber, partSize)
output, err := ObsCli.CreateSignedUrl(input)
if err != nil {
log.Error("CreateSignedUrl failed:", err.Error())
return "", err
}

log.Info(req.URL.String())
log.Info("", req.Header)

return req.URL.String(), nil

return output.SignedUrl, nil
}

func ObsGetPreSignedUrl(uuid, fileName string) (string, error) {


+ 11
- 42
routers/repo/attachment.go View File

@@ -422,14 +422,13 @@ func GetSuccessChunks(ctx *context.Context) {
return
}
} else {
isExist, err = storage.ObsHasObject(models.AttachmentRelativePath(fileChunk.UUID))
isExist, err = storage.ObsHasObject(setting.BasePath + models.AttachmentRelativePath(fileChunk.UUID) + "/" + fileChunk.UUID)
if err != nil {
ctx.ServerError("ObsHasObject failed", err)
return
}
}


if isExist {
if fileChunk.IsUploaded == models.FileNotUploaded {
log.Info("the file has been uploaded but not recorded")
@@ -633,38 +632,6 @@ func GetObsKey(ctx *context.Context) {
})
}

func UploadPart(ctx *context.Context) {
tmp, err := ctx.Req.Body().String()
log.Info(tmp)

err = ctx.Req.ParseMultipartForm(100*1024*1024)
if err != nil {
ctx.Error(http.StatusBadRequest, fmt.Sprintf("ParseMultipartForm failed: %v", err))
return
}

file, fileHeader, err := ctx.Req.FormFile("file")
log.Info(ctx.Req.Form.Get("file"))
if err != nil {
ctx.Error(http.StatusBadRequest, fmt.Sprintf("FormFile failed: %v", err))
return
}



log.Info(fileHeader.Filename)

etag, err := storage.ObsUploadPart("", "", 1, 1, file)
if err != nil {
ctx.Error(500, fmt.Sprintf("ObsUploadPart failed: %v", err))
return
}

ctx.JSON(200, map[string]string{
"etag": etag,
})
}

func CompleteMultipart(ctx *context.Context) {
uuid := ctx.Query("uuid")
uploadID := ctx.Query("uploadID")
@@ -724,15 +691,17 @@ func CompleteMultipart(ctx *context.Context) {
}

if attachment.DatasetID != 0 {
if strings.HasSuffix(attachment.Name, ".zip") {
err = worker.SendDecompressTask(contexExt.Background(), uuid)
if err != nil {
log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error())
} else {
attachment.DecompressState = models.DecompressStateIng
err = models.UpdateAttachment(attachment)
if typeCloudBrain == models.TypeCloudBrainOne {
if strings.HasSuffix(attachment.Name, ".zip") {
err = worker.SendDecompressTask(contexExt.Background(), uuid)
if err != nil {
log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error())
log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error())
} else {
attachment.DecompressState = models.DecompressStateIng
err = models.UpdateAttachment(attachment)
if err != nil {
log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error())
}
}
}
}


+ 0
- 1
routers/routes/routes.go View File

@@ -529,7 +529,6 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("/get_multipart_url", repo.GetMultipartUploadUrl)
m.Post("/complete_multipart", repo.CompleteMultipart)
m.Post("/update_chunk", repo.UpdateMultipart)
m.Post("/upload_part", repo.UploadPart)
m.Get("/get_obs_key", repo.GetObsKey)
}, reqSignIn)



+ 6
- 2
web_src/js/components/MinioUploader.vue View File

@@ -21,6 +21,7 @@ import qs from 'qs';
import createDropzone from '../features/dropzone.js';

const {_AppSubUrl, _StaticUrlPrefix, csrf} = window.config;
const CloudBrainType = 0;

export default {
data() {
@@ -245,8 +246,8 @@ export default {
file_name: file.name,
size: file.size,
dataset_id: file.datasetId,
_csrf: csrf,
type:0
type: CloudBrainType,
_csrf: csrf
})
);
}
@@ -284,6 +285,7 @@ export default {
md5: file.uniqueIdentifier,
size: file.size,
fileType: file.type,
type: CloudBrainType,
_csrf: csrf
}
});
@@ -324,6 +326,7 @@ export default {
uploadID: file.uploadID,
size: partSize,
chunkNumber: currentChunk + 1,
type: CloudBrainType,
_csrf: csrf
}
});
@@ -382,6 +385,7 @@ export default {
file_name: file.name,
size: file.size,
dataset_id: file.datasetId,
type: CloudBrainType,
_csrf: csrf
})
);


+ 314
- 148
web_src/js/components/ObsUploader.vue View File

@@ -4,22 +4,24 @@
id="dataset"
class="dropzone"
/>

<p class="upload-info">
{{ file_status_text }}
<span class="success">{{ status }}</span>
</p>
</div>
</div>
</template>

<script>
/* eslint-disable eqeqeq */
// import Dropzone from 'dropzone/dist/dropzone.js';
// import 'dropzone/dist/dropzone.css'
import SparkMD5 from 'spark-md5';
import axios from 'axios';
import qs from 'qs';
import createDropzone from '../features/dropzone.js';
import ObsClient from 'esdk-obs-browserjs';

const {_AppSubUrl, _StaticUrlPrefix, csrf} = window.config;
const CloudBrainType = 1;

export default {
data() {
@@ -66,7 +68,6 @@ export default {

const $dropzone = $('div#dataset');
console.log('createDropzone');

const dropzoneUploader = await createDropzone($dropzone[0], {
url: '/todouploader',
maxFiles: this.maxFiles,
@@ -77,16 +78,14 @@ export default {
dictInvalidFileType: this.dropzoneParams.data('invalid-input-type'),
dictFileTooBig: this.dropzoneParams.data('file-too-big'),
dictRemoveFile: this.dropzoneParams.data('remove-file'),
previewTemplate,
previewTemplate
});

// 将文件加入文件列表
dropzoneUploader.on('addedfile', (file) => {
if(file.status == 'added'){
this.onFileAdded(file)
}
setTimeout(() => {
// eslint-disable-next-line no-unused-expressions
file.accepted && this.onFileAdded(file);
}, 200);
});

dropzoneUploader.on('maxfilesexceeded', function (file) {
if (this.files[0].status !== 'success') {
alert(this.dropzoneParams.data('waitting-uploading'));
@@ -99,15 +98,14 @@ export default {

this.dropzoneUploader = dropzoneUploader;
},

methods: {
resetStatus() {
this.progress = 0;
this.status = '';
},
updateProgress(file, progress) {
file.previewTemplate.querySelector(
'.dz-upload'
file.previewTemplate.querySelector(
'.dz-upload'
).style.width = `${progress}%`;
},
emitDropzoneSuccess(file) {
@@ -119,158 +117,326 @@ export default {
this.status = this.dropzoneParams.data('falied');
file.status = 'error';
this.dropzoneUploader.emit('error', file);
// this.dropzoneUploader.emit('complete', file);
},

onFileAdded(file) {
file.datasetId = document
.getElementById('datasetId')
.getAttribute('datasetId');
this.resetStatus();
this.status = this.dropzoneParams.data('obs-connecting');
this.do_multi_uploader(file)
this.computeMD5(file);
},

finishUpload(file) {
this.emitDropzoneSuccess(file);
setTimeout(() => {
window.location.reload();
}, 1000);
},

computeMD5(file) {
this.resetStatus();
const blobSlice =
File.prototype.slice ||
File.prototype.mozSlice ||
File.prototype.webkitSlice,
chunkSize = 1024 * 1024 * 64,
chunks = Math.ceil(file.size / chunkSize),
spark = new SparkMD5.ArrayBuffer(),
fileReader = new FileReader();
let currentChunk = 0;

const time = new Date().getTime();
// console.log('计算MD5...')
this.status = this.dropzoneParams.data('md5-computing');
file.totalChunkCounts = chunks;
loadNext();

fileReader.onload = (e) => {
fileLoaded.call(this, e);
};
fileReader.onerror = (err) => {
console.warn('oops, something went wrong.', err);
file.cancel();
};

function fileLoaded(e) {
spark.append(e.target.result); // Append array buffer
currentChunk++;
if (currentChunk < chunks) {
// console.log(`第${currentChunk}分片解析完成, 开始第${currentChunk +1}/${chunks}分片解析`);
this.status = `${this.dropzoneParams.data('loading-file')} ${(
(currentChunk / chunks) *
100
).toFixed(2)}% (${currentChunk}/${chunks})`;
this.updateProgress(file, ((currentChunk / chunks) * 100).toFixed(2));
loadNext();
return;
}

const md5 = spark.end();
console.log(
`MD5计算完成:${file.name} \nMD5:${md5} \n分片:${chunks} 大小:${
file.size
} 用时:${(new Date().getTime() - time) / 1000} s`
);
spark.destroy(); // 释放缓存
file.uniqueIdentifier = md5; // 将文件md5赋值给文件唯一标识
file.cmd5 = false; // 取消计算md5状态
this.computeMD5Success(file);
}

function loadNext() {
const start = currentChunk * chunkSize;
const end =
start + chunkSize >= file.size ? file.size : start + chunkSize;
fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));
}
},

// 获取key, uuid
get_result(){
var res
$.ajax({
url: '/attachments/get_obs_key',
type: 'GET',
async: false,
success: function(result){
res = result
async computeMD5Success(md5edFile) {
const file = await this.getSuccessChunks(md5edFile);
try {
if (file.uploadID == '' || file.uuid == '') {
// 未上传过
await this.newMultiUpload(file);
if (file.uploadID != '' && file.uuid != '') {
file.chunks = '';
this.multipartUpload(file);
} else {
// 失败如何处理
return;
}
return;
}
});
return res
},

// 构建ObsClient
getObsClient(result){
return new ObsClient({
access_key_id: result.access_key_id,
secret_access_key: result.secret_access_key,
server : result.server
if (file.uploaded == '1') {
// 已上传成功
// 秒传
if (file.attachID == '0') {
// 删除数据集记录,未删除文件
await addAttachment(file);
}
//不同数据集上传同一个文件
if (file.datasetID != '') {
if (Number(file.datasetID) != file.datasetId) {
var info = "该文件已上传,对应数据集(" + file.datasetName + ")-文件(" + file.realName + ")";
window.alert(info);
window.location.reload();
}
}
console.log('文件已上传完成');
this.progress = 100;
this.status = this.dropzoneParams.data('upload-complete');
this.finishUpload(file);
} else {
// 断点续传
this.multipartUpload(file);
}
} catch (error) {
this.emitDropzoneFailed(file);
console.log(error);
}

async function addAttachment(file) {
return await axios.post(
'/attachments/add',
qs.stringify({
uuid: file.uuid,
file_name: file.name,
size: file.size,
dataset_id: file.datasetId,
type: CloudBrainType,
_csrf: csrf,
})
);
}
},

async getSuccessChunks(file) {
const params = {
params: {
md5: file.uniqueIdentifier,
type: CloudBrainType,
_csrf: csrf
}
};
try {
const response = await axios.get('/attachments/get_chunks', params);
file.uploadID = response.data.uploadID;
file.uuid = response.data.uuid;
file.uploaded = response.data.uploaded;
file.chunks = response.data.chunks;
file.attachID = response.data.attachID;
file.datasetID = response.data.datasetID;
file.datasetName = response.data.datasetName;
file.realName = response.data.fileName;
return file;
} catch (error) {
this.emitDropzoneFailed(file);
console.log('getSuccessChunks catch: ', error);
return null;
}
},

async newMultiUpload(file) {
const res = await axios.get('/attachments/new_multipart', {
params: {
totalChunkCounts: file.totalChunkCounts,
md5: file.uniqueIdentifier,
size: file.size,
fileType: file.type,
type: CloudBrainType,
_csrf: csrf
}
});
file.uploadID = res.data.uploadID;
file.uuid = res.data.uuid;
},

// 断点续传
do_multi_uploader(file){
const result = this.get_result()
const upload_datasetId = document
.getElementById('datasetId')
.getAttribute('datasetId');
const obsClient = this.getObsClient(result)
const _this = this
var cp;
var hook;
multipartUpload(file) {
const blobSlice =
File.prototype.slice ||
File.prototype.mozSlice ||
File.prototype.webkitSlice,
chunkSize = 1024 * 1024 * 64,
chunks = Math.ceil(file.size / chunkSize),
fileReader = new FileReader(),
time = new Date().getTime();
let currentChunk = 0;

obsClient.uploadFile({
Bucket : result.bucket,
Key : result.key,
SourceFile : file,
PartSize : 64 * 1024 * 1024,
function loadNext() {
const start = currentChunk * chunkSize;
const end =
start + chunkSize >= file.size ? file.size : start + chunkSize;
fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));
}

// 更新进度条
ProgressCallback : function(transferredAmount, totalAmount, totalSeconds){
_this.updateProgress(file, ((transferredAmount / totalAmount) * 100).toFixed(2))
_this.status = `${_this.dropzoneParams.data('uploading')} ${(
(transferredAmount / totalAmount) *
100
).toFixed(2)}%`;
},
function checkSuccessChunks() {
const index = successChunks.indexOf((currentChunk + 1).toString());
if (index == -1) {
return false;
}
return true;
}

// 监听文件上传结果
EventCallback : function(eventType, eventParam, eventResult){
console.log("eventType1= ", eventType)
console.log("eventParam1= ", eventParam)
console.log("eventResult1= ", eventResult)
// 文件上传成功
if(eventType == 'completeMultipartUploadSucceed'){
console.log("file = ", file)
$.ajax({
url: '/attachments/add',
type: 'POST',
data: {
'uuid': result.uuid,
'file_name': file.name,
'size': file.size,
'dataset_id': upload_datasetId,
'_csrf': csrf,
'type': 1
},
async: false,
success: function (data) {
_this.progress = 100;
_this.status = _this.dropzoneParams.data('upload-complete');
_this.emitDropzoneSuccess(file)
setTimeout(() => {
window.location.reload();
}, 1000);
},
error: function(){
_this.emitDropzoneFailed(file)
}
});
async function getUploadChunkUrl(currentChunk, partSize) {
const res = await axios.get('/attachments/get_multipart_url', {
params: {
uuid: file.uuid,
uploadID: file.uploadID,
size: partSize,
chunkNumber: currentChunk + 1,
type: CloudBrainType,
_csrf: csrf
}
},
ResumeCallback : function(resumeHook, uploadCheckpoint){
hook = resumeHook;
cp = uploadCheckpoint;
}
}, function(err, result){
// 出现错误,再次调用断点续传接口,继续上传任务
if(err){
obsClient.uploadFile({
UploadCheckpoint : cp,
});
urls[currentChunk] = res.data.url;
}

// 断点续传后继续更新进度条
ProgressCallback : function(transferredAmount, totalAmount, totalSeconds){
_this.updateProgress(file, ((transferredAmount / totalAmount) * 100).toFixed(2))
_this.status = `${_this.dropzoneParams.data('uploading')} ${(
(transferredAmount / totalAmount) *
100
).toFixed(2)}%`;
},
async function uploadMinio(url, e) {
let urls = [];
const res = await axios.put(url, e.target.result, {
headers: {
'Content-Type': ''
}});
etags[currentChunk] = res.headers.etag;
}

// 监听断点续传的结果
EventCallback : function(eventType, eventParam, eventResult){
console.log("eventType2= ", eventType)
console.log("eventParam2= ", eventParam)
console.log("eventResult2= ", eventResult)
// 文件断点续传成功
if(eventType == 'completeMultipartUploadSucceed'){
$.ajax({
url: '/attachments/add',
type: 'POST',
data: {
'uuid': result.uuid,
'file_name': file.name,
'size': file.size,
'dataset_id': upload_datasetId,
'_csrf': csrf,
'type': 1
},
async: false,
success: function (data) {
_this.progress = 100;
_this.status = _this.dropzoneParams.data('upload-complete');
_this.emitDropzoneSuccess(file)
setTimeout(() => {
window.location.reload();
}, 1000);
console.log(data)
},
error: function(){
_this.emitDropzoneFailed(file)
}
});
}
if (eventType == 'uploadPartFailed'){
_this.emitDropzoneFailed(file)
}
async function updateChunk(currentChunk) {
await axios.post(
'/attachments/update_chunk',
qs.stringify({
uuid: file.uuid,
chunkNumber: currentChunk + 1,
etag: etags[currentChunk],
type: CloudBrainType,
_csrf: csrf
})
);
}
async function uploadChunk(e) {
try {
if (!checkSuccessChunks()) {
const start = currentChunk * chunkSize;
const partSize =
start + chunkSize >= file.size ? file.size - start : chunkSize;
// 获取分片上传url
await getUploadChunkUrl(currentChunk, partSize);
if (urls[currentChunk] != '') {
// 上传到minio
await uploadMinio(urls[currentChunk], e);
if (etags[currentChunk] != '') {
// 更新数据库:分片上传结果
//await updateChunk(currentChunk);
} else {
console.log("上传到minio uploadChunk etags[currentChunk] == ''");// TODO
}
});
} else {
console.log("uploadChunk urls[currentChunk] != ''");// TODO
}
}
});
},
} catch (error) {
this.emitDropzoneFailed(file);
console.log(error);
}
}

async function completeUpload() {
return await axios.post(
'/attachments/complete_multipart',
qs.stringify({
uuid: file.uuid,
uploadID: file.uploadID,
file_name: file.name,
size: file.size,
dataset_id: file.datasetId,
type: CloudBrainType,
_csrf: csrf
})
);
}

const successChunks = [];
let successParts = [];
successParts = file.chunks.split(',');
for (let i = 0; i < successParts.length; i++) {
successChunks[i] = successParts[i].split('-')[0];
}
const urls = []; // TODO const ?
const etags = [];
console.log('上传分片...');
this.status = this.dropzoneParams.data('uploading');
loadNext();
fileReader.onload = async (e) => {
await uploadChunk(e);
fileReader.abort();
currentChunk++;
if (currentChunk < chunks) {
console.log(
`第${currentChunk}个分片上传完成, 开始第${currentChunk +
1}/${chunks}个分片上传`
);
this.progress = Math.ceil((currentChunk / chunks) * 100);
this.updateProgress(file, ((currentChunk / chunks) * 100).toFixed(2));
this.status = `${this.dropzoneParams.data('uploading')} ${(
(currentChunk / chunks) *
100
).toFixed(2)}%`;
await loadNext();
} else {
await completeUpload();
console.log(
`文件上传完成:${file.name} \n分片:${chunks} 大小:${
file.size
} 用时:${(new Date().getTime() - time) / 1000} s`
);
this.progress = 100;
this.status = this.dropzoneParams.data('upload-complete');
this.finishUpload(file);
}
};
}
}
};
</script>


Loading…
Cancel
Save