前提条件
- 开通对象存储服务。
- 在天翼云对象存储控制台获取Access Key、Secret Key和外网访问控制的endpoint。
- 安装Python环境,下载对应版本的ZOS官方Python SDK (本文以sdk_python3.X.tar为例),请参见开发者文档。
环境配置
- 解压sdk_python3.X.tar,在解压出的sdk文件夹下,打开cmd导入需要的包。
pip3 install -r requirements.txt
- 把sdk文件夹中的service-2.sns.sdk-extras.json和service-2.s3.sdk-extras.json文件放到“C:\Users\你的用户名.aws\models\s3\2006-03-01”文件夹(如果不存在该文件夹,可自行创建)下。
分段上传
操作场景
- 大型文件上传:当需要上传大型文件时,使用分段上传可以更高效地完成任务。相比于一次性上传整个文件,分段上传允许在网络故障或其他原因导致上传中断时,只需要重新传输中断的片段,而不需要重新上传整个文件。
- 网络不稳定环境:在网络连接不稳定或带宽有限的环境中,分段上传可以降低因网络中断或超时导致整个文件上传失败的风险。当上传过程中发生中断,只需要重新上传中断的片段,而不需要重新上传整个文件。
- 文件大小不确定:可以在需要上传的文件大小还不确定的情况下开始上传,这种场景在视频监控等行业应用中比较常见。
操作步骤
- 创建py文件,并引入boto3包的session模块。
from boto3.sessionimport Session
- 配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。
access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint
- 获取对象存储服务的操作客户端。
session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url)
- 开启分块上传,获取uploadID。
# 在’’中填入要上传对象的桶名(必须已存在)和对象名。 dict_cmp = s3_client.create_multipart_upload(Bucket='?', Key='?') uploadID = dict_cmp['UploadId'] # 记录uploadID,用于后续上传分块。
- 分块读取要上传的文件。
# 设置列表,用于收集文件的所有分块 chunks = [] # 分块读取文件 with open("此处输入你的文件地址", 'rb') as f: while True: chunk = f.read(5 * 1024 * 1024) # 此处输入分块大小,本示例是5M,分块大小最低为5M if not chunk: break chunks.append(chunk)
- 上传所有分块并记录各个分块的ETag和编号。
# 设置列表,用于记录上传分块的ETag和编号。 parts = [] i = 0 for chunk in chunks: i += 1 # 上传分块 dict_up = s3_client.upload_part(Bucket='?', Body=chunk, Key='?', PartNumber=i, UploadId=uploadID) # 记录该分块的ETag和编号 part = { 'ETag': dict_up['ETag'], 'PartNumber': i, } parts.append(part)
- 完成所有分块的拼接,存入ZOS。
# 完成所有分块拼接后,上传成功。 s3_client.complete_multipart_upload( Bucket='?', Key='?', MultipartUpload={ 'Parts': parts }, UploadId=uploadID, )
整体代码
from boto3.session import Session
access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint
key = '?' # 传入桶内后的文件名
bucketName = '?' # 要传入文件的桶名
try:
session = Session(access_key, secret_key)
s3_client = session.client('s3', endpoint_url=url)
# 开启分块上传,获取uploadID
dict_cmp = s3_client.create_multipart_upload(Bucket=bucketName, Key=key)
uploadID = dict_cmp['UploadId']#记录uploadID,用于后续上传分块。
filePos = "此处输入文件路径"
size = 5 * 1024 * 1024 #此处输入分块大小,本示例是5M,分块大小最低为5M
# 设置列表,用于收集文件的所有分块
chunks = []
# 分块读取文件
with open(filePos, 'rb') as f:
while True:
chunk = f.read(size)
if not chunk:
break
chunks.append(chunk)
# 设置列表,用于收集上传分块的ETag和编号。
parts = []
i = 0
for chunk in chunks:
i += 1
# 上传分块
dict_up = s3_client.upload_part(Bucket=bucketName, Body=chunk,
Key=key, PartNumber=i,
UploadId=uploadID)
# 记录该分块的ETag和编号
part = {
'ETag': dict_up['ETag'],
'PartNumber': i,
}
parts.append(part)
# 完成整个拼接传入
s3_client.complete_multipart_upload(
Bucket=bucketName,
Key=key,
MultipartUpload={
'Parts': parts
},
UploadId=uploadID,
)
print('文件上传成功!')
except BaseException:
print("上传异常,请检查各个参数后重试。")
finally:
# 关闭文件流
f.close()
追加上传
操作场景
通过普通上传创建的对象,用户无法在原对象上进行追加写操作,如果对象内容发生了改变,只能重新上传同名对象来进行修改。这在日志、视频监控等数据复写较频繁的场景下使用不方便。所以可以通过追加上传的方式来只上传增加部分的内容,增强扩展性,提高文件的上传效率。
操作步骤
-
创建py文件,并引入boto3包的session模块。
from boto3.sessionimport Session
-
配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。
access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint
-
获取对象存储服务的操作客户端。
session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url)
-
读取文件并记录文件md5值。
with open('输入要上传文件的路径', 'rb') as file: data = file.read() md5 = hashlib.md5(data).digest() md5 = base64.b64encode(md5)
-
上传对象。
(1)首次上传
s3_client.put_object(Bucket='输入要传入的桶名', Metadata=dict(m1='m1'), Body=data, Key='输入存入后对象的键值', ContentMD5=str(md5, 'utf-8'), Append=True,# 开启追加上传 AppendPosition=0)# 指定追加上传开始的位置
(2)追加上传
# 获取需要追加上传的对象信息 response = s3_client.head_object(Bucket="bucketblocks", Key="mac.txt") # 获取当前对象长度 appendPos = response['ContentLength'] s3_client.put_object(Bucket='输入要传入的桶名', Metadata=dict(m1='m1'), Body=data, Key='输入存入后对象的键值', ContentMD5=str(md5, 'utf-8'), Append=True,# 开启追加上传 AppendPosition=appendPos)# 指定追加上传开始的位置
整体代码
from boto3.session import Session
import hashlib
import base64
access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint
# 上传文件桶的桶名
bname = '输入要传入的桶名'
# 文件在桶内存储的key值
key = '输入存入后对象的键值'
# 追加上传开始的位置,默认为0
appendPos = 0
try:
# 读取文件并记录文件md5值
with open('输入要上传文件的路径', 'rb') as file:
data = file.read()
md5 = hashlib.md5(data).digest()
md5 = base64.b64encode(md5)
# 获取对象存储服务的操作客户端
session = Session(access_key, secret_key)
s3_client = session.client("s3", endpoint_url=url)
# 获取桶内所有对象信息
response = s3_client.list_objects(Bucket=bname)
# 遍历所有对象键值
for obj in response['Contents']:
# 桶中存在与待上传对象同key的对象,需要判断该对象能否进行追加上传
if obj['Key'] == key:
res = s3_client.head_object(Bucket=bname, Key=key)
if res['ResponseMetadata']['HTTPHeaders']['x-rgw-object-type'] != 'Appendable':
print('该对象不是Appendable类型,无法进行追加上传.')
raise Exception
#桶内对象可以进行追加上传,更新追加上传开始的位置
appendPos = res['ContentLength']
# 完成追加上传
s3_client.put_object(Bucket=bname,
Metadata=dict(m1='m1'),
Body=data,
Key=key,
ContentMD5=str(md5, 'utf-8'),
Append=True,
AppendPosition=appendPos)
if appendPos == 0:
print('初次上传成功.')
else:
print('追加上传成功.')
except BaseException:
print("参数有问题,请修改后重试.")
finally:
# 关闭文件流
file.close()
断点续传
操作场景
- 大文件上传:当需要上传大文件时,如视频、备份文件或软件包等,由于上传时间较长,在上传过程中可能会遇到各种问题,例如网络中断、上传工具崩溃或用户手动中断等。断点续传的操作场景允许用户在上传中断后,能够从中断点继续上传,而不需要重新上传整个文件。
- 网络不稳定:在网络环境不稳定的情况下,如移动网络或低带宽连接,大文件的上传过程可能会中断或超时。通过断点续传,在宽带不稳定的情况下,上传仍可恢复并继续进行。
- 长时间上传:某些上传任务可能需要较长的时间才能完成,这可能会增加上传过程中意外中断的风险。断点续传可以将上传任务分段处理,并保存上传进度信息,以便在上传中断后能够恢复并继续上传。
- 客户端或服务端故障:在上传过程中,当客户端或服务端发生故障时,断点续传允许用户重新连接并从中断点继续上传,而不会对上传任务造成重大影响。
操作步骤
- 创建py文件,并引入boto3包的session模块。
from boto3.sessionimport Session
- 配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。
access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint
- 获取对象存储服务的操作客户端。
session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url)
- 查询桶中还没有完成或放弃的分块上传,获取需要续传对象的UploadId和Key。
response = s3_client.list_multipart_uploads(Bucket='输入对象传入的桶名') uploadInfo = response['Uploads']
- 根据上一步得到的UploadId和Key来获取当前对象已经上传完成的分段信息。
response = s3_client.list_parts(Bucket='输入对象传入的桶名', Key='Key值', UploadId='UploadId值') finishedPartsInfo = response['Parts'] # 设置列表,用于记录当前已经完成上传分块的编号。 finishedPartNumbers = [] # 记录分块的大小。 partSize = 0 # 设置列表,用于收集上传分块的ETag和编号。 parts = [] for finishedPart in finishedPartsInfo: finishedPartNumbers.append(finishedPart['PartNumber']) if partSize < finishedPart['Size']: partSize = finishedPart['Size'] part = { 'ETag': finishedPart['ETag'], 'PartNumber': finishedPart['PartNumber'] } parts.append(part)
- 上传对象中未完成的分段,并记录各分段的ETag信息。
# 设置列表,用于收集文件的所有分块 chunks = [] # 分块读取文件 with open("此处输入你的文件地址", 'rb') as f: while True: chunk = f.read(5*1024*1024) # 此处输入分块大小,本示例是5M,分块大小最低为5M if not chunk: break chunks.append(chunk) i = 0 for chunk in chunks: i += 1 # 当该分块已经完成上传时,跳过。 if i in finishedPartNumbers: continue # 上传分块 response = s3_client.upload_part(Bucket=bucketName, Body=chunk, Key=key, PartNumber=i, UploadId='UploadId值') # 记录该分块的ETag和编号 part = { 'ETag': response['ETag'], 'PartNumber': i, } parts.append(part)
- 完成所有分块的拼接,存入ZOS。
# 完成所有分块拼接后,上传成功。 s3_client.complete_multipart_upload( Bucket=bucketName, Key=key, MultipartUpload={ 'Parts': parts }, UploadId=uploadID, )
整体代码
from boto3.session import Session
access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint
bucketName = '?' # 要传入文件的桶名
filePos = "?" # 待上传文件所在位置
try:
session = Session(access_key, secret_key)
s3_client = session.client('s3', endpoint_url=url)
# 查询桶中还没有完成或放弃的分块上传,获取需要续传对象的UploadId和Key。
response = s3_client.list_multipart_uploads(Bucket=bucketName)
uploadInfo = response['Uploads']
if uploadInfo is None:
print(bucketName + '桶内没有需要断点上传的对象。')
raise BaseException
# 获取需要续传的对象的UploadId和Key,这里以第一个对象为例
uploadId = uploadInfo[0]['UploadId']
key = uploadInfo[0]['Key']
# 获取已经完成上传的分块信息
response = s3_client.list_parts(Bucket=bucketName, Key=key, UploadId=uploadId)
finishedPartsInfo = response['Parts']
# 设置列表,用于记录当前已经完成上传分块的编号。
finishedPartNumbers = []
# 记录分块的大小。
partSize = 0
# 设置列表,用于收集上传分块的ETag和编号。
parts = []
for finishedPart in finishedPartsInfo:
finishedPartNumbers.append(finishedPart['PartNumber'])
if partSize < finishedPart['Size']:
partSize = finishedPart['Size']
part = {
'ETag': finishedPart['ETag'],
'PartNumber': finishedPart['PartNumber']
}
parts.append(part)
# 设置列表,用于收集文件的所有分块
chunks = []
# 分块读取文件
with open(filePos, 'rb') as f:
while True:
chunk = f.read(partSize) # 此处输入分块大小,本示例是5M,分块大小最低为5M
if not chunk:
break
chunks.append(chunk)
i = 0
for chunk in chunks:
i += 1
# 当该分块已经完成上传时,跳过。
if i in finishedPartNumbers:
continue
# 上传分块
response = s3_client.upload_part(Bucket=bucketName, Body=chunk,
Key=key, PartNumber=i,
UploadId=uploadId)
# 记录该分块的ETag和编号
part = {
'ETag': response['ETag'],
'PartNumber': i,
}
parts.append(part)
# 完成整个拼接传入
s3_client.complete_multipart_upload(
Bucket=bucketName,
Key=key,
MultipartUpload={
'Parts': parts
},
UploadId=uploadId
)
print('文件上传成功!')
except BaseException:
print("上传异常,请检查各个参数后重试。")
finally:
f.close()