溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

s3cmd數(shù)據(jù)操作怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-30 16:31:52 來(lái)源:億速云 閱讀:206 作者:iii 欄目:云計(jì)算

本篇內(nèi)容介紹了“s3cmd數(shù)據(jù)操作怎么實(shí)現(xiàn)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

1. 基本原理

s3cmd數(shù)據(jù)操作怎么實(shí)現(xiàn)

操作流程:
1.客戶(hù)端完成文件切片,之后向API server提交上傳操作請(qǐng)求,生成對(duì)應(yīng)的presign URL(步驟1和2,如果想控制客戶(hù)端上傳數(shù)量,可以在這個(gè)階段生成指定數(shù)量的token)
2.使用生成的presign URL構(gòu)造HTTP請(qǐng)求,向S3服務(wù)上傳數(shù)據(jù)。(步驟3)
3.客戶(hù)端完成所有分塊上傳以后,向API server提交Complete請(qǐng)求,之后再由API server向S3服務(wù)發(fā)送complete請(qǐng)求(步驟4和5)
4.客戶(hù)端獲取到API server的返回,完成最后的操作。(可以考慮在這里回收token)

2. 優(yōu)缺點(diǎn)

優(yōu)點(diǎn):
1. accesskey和secretkey不會(huì)存儲(chǔ)在客戶(hù)端,避免key泄露
2. 每個(gè)presignURL對(duì)應(yīng)一個(gè)keyname,在有效時(shí)間內(nèi)可以任意上傳和覆蓋已有文件,比較靈活。
3. 服務(wù)端可以結(jié)合各種Auth系統(tǒng)完成客戶(hù)端的認(rèn)證和授權(quán),方便集成現(xiàn)有業(yè)務(wù)。
4. 客戶(hù)端上傳下載方式靈活,拿到presignURL以后,可以通過(guò)任意支持HTTP協(xié)議的客戶(hù)端進(jìn)行上傳下載操作。
5. 適合大文件上傳,對(duì)比之介紹的Presign方式,在數(shù)據(jù)上傳階段支持并發(fā)上傳,上傳效率有非常大的提升。

缺點(diǎn):
1. 上傳步驟需要多次交互,流程稍復(fù)雜。
2. 受限S3分塊上傳標(biāo)準(zhǔn),文件小于5M不適用該方法。

3. 具體實(shí)現(xiàn)

安裝服務(wù)端依賴(lài)

pip install boto
pip install flask-restful

服務(wù)端demo代碼如下:

# -*- coding: utf-8 -*-
import time
import hmac
from hashlib import sha1 as sha
import boto
import boto.s3.connection
import re

py3k = False
try:
    from urlparse import urlparse, unquote
    from base64 import encodestring
except:
    py3k = True
    from urllib.parse import urlparse, unquote
    from base64 import encodebytes as encodestring
from flask import Flask, request
from flask_restful import Api, Resource

app = Flask(__name__)
api = Api(app)


from boto.s3.multipart import MultiPartUpload

class MultiPartUpload_Presign(MultiPartUpload):

    def __init__(self,id,bucket,key_name):
        MultiPartUpload.__init__(self)
        self.id = id
        self.bucket = bucket
        self.key_name = key_name

    def complete_upload(self):
        xml = self.to_xml()
        return self.bucket.complete_multipart_upload(self.key_name,
                                                     self.id, xml)

class S3PreSign():
    def __init__(self, object_name,metadata=None, policy=None):
        self.service_url = 's3.ceph.work' #填S3服務(wù)的endpoint
        self.access_key = '' #access key
        self.secret_key = '' #secret key
        self.bucket_name = 'multi-upload' #bucket名稱(chēng)
        self.object_name = str(object_name)
        # self.Expires = int(time.time()) + int(expires)
        conn = boto.connect_s3(
        aws_access_key_id = self.access_key,
        aws_secret_access_key = self.secret_key,
        host = self.service_url,
        port = 80,
        is_secure=False,               # uncommmnt if you are not using ssl
        # calling_format = boto.s3.connection.OrdinaryCallingFormat(),
        calling_format = boto.s3.connection.SubdomainCallingFormat(),
        )
        self.bucket = conn.get_bucket(self.bucket_name)
        self.upload_ID = self.Make_uploadID(self.object_name ,metadata=metadata, policy=policy)


    def Make_uploadID(self,object_name,metadata=None,policy=None):
        mpu = self.bucket.initiate_multipart_upload(object_name, metadata=metadata, policy=policy)
        return mpu.id

    def complete_upload(self,upload_ID):
        mpu = MultiPartUpload_Presign(id=upload_ID, bucket=self.bucket, key_name=self.object_name)
        status_ = 200
        try:
            mpu.complete_upload()
        except:
            status_ = 422
        finally:
            return status_

    def get_signature_str(self, sign_str):
        if py3k:
            key = self.secret_key.encode('utf-8')
            msg = sign_str.encode('utf-8')
        else:
            key = self.secret_key
            msg = sign_str
        h = hmac.new(key, msg, digestmod=sha)
        return (encodestring(h.digest()).strip()).replace('+', '%2b')

    def build_url(self, expires,partNumber, Signature):
        url_ = "http://{bucket_name}.{service_url}/{object_name}?uploadId={uploadId}&partNumber={partNumber}&Expires={Expires}&AWSAccessKeyId={AWSAccessKeyId}&Signature={Signature}".format(
            bucket_name=self.bucket_name,
            service_url=self.service_url,
            object_name=self.object_name,
            uploadId=self.upload_ID,
            partNumber=partNumber,
            Expires= expires,
            AWSAccessKeyId=self.access_key,
            Signature=Signature
        )
        return url_

    def build_url_with_partid(self,expires, partNumber, partMd5 ):
        sign_str = "PUT\n{partMd5}\n\n{Expires}\n/{bucket_name}/{object_name}?partNumber={partNumber}&uploadId={uploadId}".format(
            partMd5=partMd5,
            Expires=expires,
            bucket_name=self.bucket_name,
            object_name=self.object_name,
            partNumber=partNumber,
            uploadId=self.upload_ID)
        Signature_ = self.get_signature_str(sign_str)
        return self.build_url(expires, partNumber, Signature_)



class MultiPart_List(Resource):

    def post(self):
        PartNumber_ = {}
        metadata = {}
        policy = None
        # print request.form['keyname']
        if 'keyname' in request.form:
            keyname = request.form['keyname']
        else:
            return "no key", 400
        if 'expires' in request.form:
            expires = request.form['expires']
        else:
            return "no expires", 400
        if 'contenttype' in request.form:
            metadata['Content-Type'] = str(request.form['contenttype'])
        if 'x-amz-acl' in request.form:
            policy = str(request.form['x-amz-acl'])
        for part_ in request.form:
            if re.match(r'^\d{1,}$',part_):
                # print part_
                PartNumber_[part_] = request.form[part_]
            meatadata_rule = 'x-amz-meta-'
            if re.match(meatadata_rule, part_):
                # print part_
                metadata[part_.split(meatadata_rule)[1]] = str(request.form[part_])
        print metadata,policy,keyname,expires

        s3client = S3PreSign(keyname)
        result = {}
        result['UploadID'] = s3client.upload_ID
        expires = int(time.time()) + int(expires)
        for p_ in PartNumber_:
            result[p_] = s3client.build_url_with_partid(expires,p_,PartNumber_[p_])
        return result, 201


class Complete_MultiPart(Resource):

    def post(self):
        if 'keyname' in request.form:
            keyname = request.form['keyname']
        else:
            return "no key", 400
        if 'uploadid' in request.form:
            uploadid = request.form['uploadid']
        else:
            return "no UploadID", 400

        s3client = S3PreSign(keyname)
        result = s3client.complete_upload(uploadid)
        return {"status_code":result}, result


api.add_resource(MultiPart_List, '/presign')
api.add_resource(Complete_MultiPart, '/complete')


if __name__ == '__main__':
    app.run(debug=True)

安裝客戶(hù)端依賴(lài)

pip install requests

客戶(hù)端demo代碼如下:

# -*- coding: utf-8 -*-
import requests
from base64 import encodestring
from hashlib import md5
import os
import json
from multiprocessing import Pool


def multipart_upload_with_part(url_, part_file_path, partMD5):
    headers = {}
    headers["Content-MD5"] = partMD5
    with open(part_file_path,'r') as fh:
        response = requests.put(url_, headers=headers, data=fh.read())
        if response.status_code == 200:
            print "{} upload Sucessful !".format(part_file_path)

class S3client():
    def __init__(self, key_name, expires,part_num, uploadfile_path, policy=None, contenttype=None, metadata=None ,processes_num=2):
        self.multipart_data = {}
        if key_name:
            self.multipart_data['keyname'] = key_name
        if expires:
            self.multipart_data['expires'] = expires
        if policy:
            self.multipart_data['x-amz-acl'] = policy
        if contenttype:
            self.multipart_data['contenttype'] = contenttype
        if metadata:
            for k in metadata:
                self.multipart_data[k] = metadata[k]
        self.part_num = part_num
        self.processes_num = processes_num
        self.uploadfile_path = uploadfile_path
        self.server = 'http://localhost:5000/' #這里填你API服務(wù)器地址
        self.upload_file_list_ = {}


    def split_file(self):
        filelist = []
        statinfo = os.stat(self.uploadfile_path)
        chunksize = statinfo.st_size / self.part_num
        print "File size: %d(MB)" % (statinfo.st_size / (1024 * 1024))
        print self.uploadfile_path,chunksize
        with open(self.uploadfile_path, "rb") as f:
            index = 1
            while True:
                chunk = f.read(chunksize)
                if (chunk):
                    fn = "%s.part.%d" % (self.uploadfile_path, index)
                    # print "creating", fn
                    with open(fn, "wb") as fw:
                        fw.write(chunk)
                    partMD5 = self.compute_hash(fn)
                    tmp_ = {}
                    tmp_[fn] = str(partMD5)
                    filelist.append(tmp_)
                    index = index + 1
                else:
                    break
        return filelist

    def compute_hash(self, filepath, buf_size=8192, size=None, hash_algorithm=md5):
        hash_obj = hash_algorithm()
        with open(filepath) as fp:
            spos = fp.tell()
            if size and size < buf_size:
                s = fp.read(size)
            else:
                s = fp.read(buf_size)
            while s:
                if not isinstance(s, bytes):
                    s = s.encode('utf-8')
                hash_obj.update(s)
                if size:
                    size -= len(s)
                    if size <= 0:
                        break
                if size and size < buf_size:
                    s = fp.read(size)
                else:
                    s = fp.read(buf_size)
            base64_digest = encodestring(hash_obj.digest()).decode('utf-8')
            if base64_digest[-1] == '\n':
                base64_digest = base64_digest[0:-1]
            return base64_digest

    def make_upload_list(self):
        upload_file_list = self.split_file()
        for f in upload_file_list:
            part_path = f.keys()[0]
            partMD5 = f.values()[0]
            # partnum_ = f.keys()[0].split(".")[-1]
            yield {part_path:partMD5}


    def get_multipart_presignurl(self):
        upload_file_list = self.make_upload_list()
        for i in upload_file_list:
            self.multipart_data[i.keys()[0].split(".")[-1]] = i.values()[0]
            self.upload_file_list_[i.keys()[0].split(".")[-1]] = {i.keys()[0]:i.values()[0]}
        url_ = self.server + "presign"
        r = requests.post(url_, data=self.multipart_data)
        allurl_ = json.loads(r.text)
        UploadID = allurl_.pop('UploadID')
        return UploadID,allurl_

    def complete(self,UploadID,key_name):
        data = {"uploadid":UploadID,'keyname':key_name}
        url_ = self.server + "complete"
        r = requests.post(url_, data=data)
        if r.status_code == 200:
            print "Multipart upload finished!"
        else:
            print "Multipart upload failed!"

    def upload_mulprocess(self,allurl_):
        p = Pool(processes=self.processes_num)
        for url in allurl_:
            partNUm = url
            tmp_file = self.upload_file_list_[partNUm]
            filepath = tmp_file.keys()[0]
            partMD5 = tmp_file.values()[0]
            put_url = allurl_[url]
            p.apply_async(multipart_upload_with_part, (put_url,filepath,partMD5,))
        print 'Waiting for all subprocesses done...'
        p.close()
        p.join()



if __name__ == "__main__":
    key_name = 'abc.json' #上傳的object名稱(chēng)
    part_num = 6 #文件切分?jǐn)?shù)量
    expires = 300 #簽名有效時(shí)長(zhǎng)
    file_path = '/tmp/abc.json' #上傳文件本地路徑
    processes_num = 2 #上傳并發(fā)數(shù)
    contenttype = 'application/json' #文件的Content-type
    policy = 'public-read' #設(shè)置object的ACL權(quán)限
    metadata = {'x-amz-meta-abc':'abcd'} #object的metadata

    #第一步:參數(shù)初始化
    s3client = S3client(key_name,expires,part_num,file_path,policy,contenttype,metadata,2)
    #第二步:生成PresignURL
    UploadID,upload_file_list = s3client.get_multipart_presignurl()
    #第三步:使用生成的PresignURL上傳數(shù)據(jù)
    s3client.upload_mulprocess(upload_file_list)
    #第四步:提交compelte請(qǐng)求,完成最后的各個(gè)分塊數(shù)據(jù)邏輯合并
    s3client.complete(UploadID,key_name)

“s3cmd數(shù)據(jù)操作怎么實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI