Python 调用示例
此脚本应能支持 Python 2 和 3。运行前请确保您已安装依赖包 requests
,并且已修改 if __name__ == "__main__":
之后的内容。
# -*- coding: utf8 -*-
import base64
import datetime
import hashlib
import hmac
import json
import os
import sys
import uuid
from typing import Optional, Union
import requests
if sys.version_info.major == 2:
from urllib import quote, urlencode
reload(sys) # type: ignore
sys.setdefaultencoding("utf8")
else:
from urllib.parse import quote, urlencode
METHOD_GET = "GET"
METHOD_POST = "POST"
file = False
def hmac_sha256(secret, data): # type: (Union[bytes, str], str) -> bytes
return hmac.new(
key=bytearray(secret)
if type(secret) == bytes
else bytearray(secret, encoding="utf8"),
msg=bytearray(data, encoding="utf8"),
digestmod=hashlib.sha256,
).digest()
def base64_of_hmac(data): # type: (bytes) -> bytes
return base64.b64encode(s=data)
def get_request_uuid(): # type: () -> str
return str(uuid.uuid1())
def get_sorted_str(data): # type: (dict) -> str
return "&".join(
map(
lambda x_y: "%s=%s" % (x_y[0], x_y[1]),
sorted(data.items(), key=lambda item: item[0]),
)
)
def build_sign(
query_params, body_params, eop_date, request_uuid, method
): # type: (dict, Union[bytearray, dict], str, str, str) -> str
body_str = json.dumps(body_params) if not file and body_params else ""
body = (
(
body_params
if file
else (
json.dumps(body_params).encode(encoding="utf-8")
if isinstance(body_params, dict)
else body_params.encode(encoding="utf-8")
)
)
if method == METHOD_POST
else body_str.encode(encoding="utf-8")
)
header_str = "ctyun-eop-request-id:%s\neop-date:%s\n" % (
request_uuid,
eop_date, # 格式为:%Y%m%dT%H%M%SZ。
) # 请求头中必要的两个参数。
query_str = encode_query_str(query=get_sorted_str(data=query_params))
sign_date = eop_date.split("T")[0]
# 计算鉴权密钥。
k_time = hmac_sha256(data=eop_date, secret=sk)
k_ak = hmac_sha256(data=ak, secret=k_time)
k_date = hmac_sha256(data=sign_date, secret=k_ak)
# 构建请求头的鉴权字段值。
signature_str = "%s\n%s\n%s" % (
header_str,
query_str,
hashlib.sha256(body).hexdigest(),
)
sign_header = "%s Headers=ctyun-eop-request-id;eop-date Signature=%s" % (
ak,
base64_of_hmac(data=hmac_sha256(data=signature_str, secret=k_date)).decode(
encoding="utf8"
),
)
# format: off
print_log(log_info="signature_str: %s" % repr(signature_str))
print_log(log_info="sign_header: %s" % sign_header)
# format: on
return sign_header.encode(encoding="utf8")
def get_sign_headers(
query_params, body, method, content_type
): # type: (dict, dict, str, str) -> dict
eop_date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%dT%H%M%SZ")
request_uuid = get_request_uuid()
return {
"Content-type": content_type,
"ctyun-eop-request-id": request_uuid,
"Eop-Authorization": build_sign(
body_params=body,
eop_date=eop_date,
method=method,
query_params=query_params,
request_uuid=request_uuid,
),
"Eop-date": eop_date,
}
def get(
url, query="", params=None, header_params=None
): # type: (str, str, Optional[dict], Optional[dict]) -> requests.Response
"""发送 GET 请求。
Parameters
----------
url : str
请求的 URL。
query : str, optional
请求的查询字符串(默认值:空字符串)。
params : dict, optional
请求的参数(默认值:空)。
header_params : dict, optional
请求的头部参数(默认值:空)。
Returns
-------
requests.Response
请求的响应。
"""
return execute(
header_params=header_params,
method=METHOD_GET,
params=params,
query=query,
url=url,
)
def post(
url,
query="",
params=None,
header_params=None,
content_type="application/json;charset=UTF-8",
):
"""发送 POST 请求。
Parameters
----------
url : str
请求的 URL。
query : str, optional
请求的查询字符串(默认值:空字符串)。
params : dict, optional
请求的参数(默认值:空)。
header_params : dict, optional
请求的头部参数(默认值,空)。
content_type : str, optional
请求的内容类型(默认值:application/json;charset=UTF-8)。
Returns
-------
requests.Response
请求的响应。
"""
return execute(
content_type=content_type,
header_params=header_params,
method=METHOD_POST,
params=params,
query=query,
url=url,
)
def execute(
url,
query,
method,
params=None,
header_params=None,
content_type="application/json;charset=UTF-8",
): # type: (str, str, str, Optional[dict], Optional[dict], str) -> requests.Response
if "application/x-www-form-urlencoded" in content_type:
params = urlencode(params)
header_params = header_params or {}
params = params or {}
if method == METHOD_GET:
body, query_params = ({}, params)
else:
body, query_params = (params, {})
if len(query) > 0:
for q in query.split("&"):
query_params[q.split("=")[0]] = q.split("=")[1]
print("\n" + "-" * 10)
headers = get_sign_headers(query_params, body, method, content_type)
headers.update(header_params)
headers.update(
{
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0"
}
)
if query:
url = url + "?" + encode_query_str(query)
# fmt: off
print_log(u"URL: %s" % url)
print_log(u"请求方式: %s" % method)
print_log(u"请求头: %s" % headers)
print_log(u"请求参数: %s" % params)
print_log(u"请求参数类型: %s" % type(params))
# fmt: on
requests.packages.urllib3.disable_warnings()
if method == METHOD_GET:
res = requests.get(url, params=params, headers=headers, verify=False)
else:
if "application/x-www-form-urlencoded" in content_type:
res = requests.post(data=params, headers=headers, url=url, verify=False)
elif "multipart/form-data" in content_type:
res = requests.post(data=params, headers=headers, url=url, verify=False)
else:
res = requests.post(headers=headers, json=params, url=url, verify=False)
# fmt: off
print_log(u"返回状态码: %s" % res.status_code)
print_log(u"返回: %s" % res.text)
# fmt: on
return res
def print_log(log_info): # type: (str) -> None
print("[%s]: %s" % (str(object=datetime.datetime.now()), log_info))
def encode_query_str(query): # type: (str) -> str
after_query = ""
if len(query) != 0:
param = query.split("&")
param.sort()
for str in param:
if len(after_query) < 1:
s = str.split("=")
if len(s) <= 2:
encode_str = quote(s[1])
str = s[0] + "=" + encode_str
after_query = after_query + str
else:
encode_str = ""
str = s[0] + "=" + encode_str
after_query = after_query + str
else:
s = str.split("=")
if len(s) >= 2:
encode_str = quote(s[1])
str = s[0] + "=" + encode_str
after_query = after_query + "&" + str
else:
encode_str = ""
str = s[0] + "=" + encode_str
after_query = after_query + "&" + str
return after_query
def generate_body(file_list, boundary, params): # type: (list, str, dict) -> bytearray
lastbody = bytearray()
for file in file_list:
file_name_key = list(file.keys())[0]
file_path = list(file.values())[0]
file_name = os.path.basename(file_path)
body1_array = bytearray(
"--"
+ boundary
+ "\r\n"
+ 'Content-Disposition: form-data; name="'
+ file_name_key
+ '"; filename="'
+ file_name
+ "\r\n"
+ "Content-Type: application/octet-stream"
+ "\r\n"
+ "\r\n",
encoding="utf8",
)
if os.path.exists(file_path):
with open(file_path, "rb") as f:
body2_array = bytearray(f.read())
body3_array = bytearray("\r\n", encoding="utf8")
lastbody.extend(body1_array)
lastbody.extend(body2_array)
lastbody.extend(body3_array)
for param in params:
body1_array = bytearray(
"--"
+ boundary
+ "\r\n"
+ 'Content-Disposition: form-data; name="'
+ list(param.keys())[0]
+ "\r\n"
+ "\r\n"
+ list(param.values())[0]
+ "\r\n",
encoding="utf8",
)
body3_array = bytearray("\r\n")
lastbody.extend(body1_array)
lastbody.extend(body3_array)
body4_array = bytearray("--" + boundary + "--\r\n", encoding="utf8")
lastbody.extend(body4_array)
return lastbody
if __name__ == "__main__":
ak = "xxxxxx" # NOTE: 请替换为您的 AccessKey。
sk = "xxxxxx" # NOTE:请替换为您的 SecurityKey。
# NOTE: 以下为调用示例,请按实际情况做出修改。
get(
params={"regionID": "xxxxxx"},
url="https://ctimage-global.ctapi.ctyun.cn/v4/image/list",
)
post(
params={"regionID": "xxxxxx", "instanceID": "xxxxxx", "imageName": "xxxxxx"},
url="https://ctimage-global.ctapi.ctyun.cn/v4/image/create",
)