# -*- coding: utf8 -*-
import urllib
import datetime
import requests
import json
import hashlib
import base64
import hmac
import datetime
import uuid
import os
import sys
if sys.version_info.major ==2:
from urllib import quote
sys.setdefaultencoding('utf8')
VERSION ='2'
else:
VERSION ='3'
from urllib.parse import quote
from importlib import reload
from urllib3 import encode_multipart_formdata
from collections import OrderedDict
reload(sys)
METHOD_GET ='GET'
METHOD_POST ='POST'
file=False
# 官网accessKey
ak =''
# 官网securityKey
sk =''
defhmac_sha256(secret, data):
iftype(secret)==bytes:
secret =bytearray(secret)
else:
secret =bytearray(secret,'utf8')
data =bytearray(data,'utf8')
return hmac.new(secret, data, digestmod=hashlib.sha256).digest()
defbase64_of_hmac(data):
return base64.b64encode(data)
defget_request_uuid():
returnstr(uuid.uuid1())
defget_sorted_str(data, method):
"""
鉴权用的参数整理
:param data: dict 需要整理的参数
:return: str
"""
sorted_data =sorted(data.items(), key=lambda item: item[0])
str_list =map(lambda x_y:'%s=%s'%(x_y[0], x_y[1]), sorted_data)
return'&'.join(str_list)
defbuild_sign(query_params, body_params, eop_date, request_uuid, method):
"""
计算鉴权字段
:param query_params: dict get请求中的参数
:param body_params: dict post请求中的参数
:param eop_date: str 请求时间,格式为:'%Y%m%dT%H%M%SZ'
:return: str
"""
body_str =""
ifnotfile:
body_str = json.dumps(body_params)if body_params else''
if method == METHOD_POST:
iffile:
body_digest = hashlib.sha256(body_params).hexdigest()
# print ('body_params +'+body_params+' body_params')
# print ('body_digest +'+body_digest+' body_digest')
else:
ifisinstance(body_params,dict):
body_digest = hashlib.sha256(json.dumps(body_params).encode('utf-8')).hexdigest()
else:
body_digest = hashlib.sha256(body_params.encode('utf-8')).hexdigest()
else:
body_digest = hashlib.sha256(body_str.encode('utf-8')).hexdigest()
# 请求头中必要的两个参数
header_str ='ctyun-eop-request-id:%s\neop-date:%s\n'%(request_uuid, eop_date)
# url中的参数,或get参数
query_str = encodeQueryStr(get_sorted_str(query_params, method))
signature_str ='%s\n%s\n%s'%(header_str, query_str, body_digest)
print_log(repr('signature_str is: %s'% signature_str))
sign_date = eop_date.split('T')[0]
# 计算鉴权密钥
k_time = hmac_sha256(sk, eop_date)
k_ak = hmac_sha256(k_time, ak)
k_date = hmac_sha256(k_ak, sign_date)
signature_base64 = base64_of_hmac(hmac_sha256(k_date, signature_str))
# 构建请求头的鉴权字段值
sign_header ='%s Headers=ctyun-eop-request-id;eop-date Signature=%s'%(ak, signature_base64.decode('utf8'))
print("sign_header :"+ sign_header)
return sign_header.encode('utf8')
defget_sign_headers(query_params, body, method, content_type):
"""
获取鉴权用的请求头参数
:param query_params: dict get请求中的参数
:param body: dict post请求中的参数
:return:
"""
now = datetime.datetime.now()
eop_date = datetime.datetime.strftime(now,'%Y%m%dT%H%M%SZ')
request_uuid = get_request_uuid()
headers ={# 三个鉴权用的参数
'Content-type': content_type,
'ctyun-eop-request-id': request_uuid,
'Eop-Authorization': build_sign(query_params=query_params, body_params=body, eop_date=eop_date,
request_uuid=request_uuid, method=method),
'Eop-date': eop_date,
}
return headers
defget(url, query="", params=None, header_params=None):
return execute(url, query, method=METHOD_GET, params=params, header_params=header_params)
defpost(url,query="", params=None, header_params=None, content_type='application/json;charset=UTF-8'):
return execute(url,query, method=METHOD_POST, params=params, header_params=header_params,
content_type=content_type)
defexecute(url, query, method, params=None, header_params=None, content_type='application/json;charset=UTF-8'):
if'application/x-www-form-urlencoded'in content_type:
if VERSION =='2':
params = urllib.urlencode(params)
else:
params = urllib.parse.urlencode(params)
params = params or{}
header_params = header_params or{}
if method == METHOD_GET:
query_params, body =(params,{})
else:
query_params ={}
iflen(query)>0:
for q in query.split('&'):
query_params[q.split("=")[0]]= q.split("=")[1]
body = params
headers = get_sign_headers(query_params, body, method, content_type)
headers.update(header_params)
headers.update(
{"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0"}
)
url = url +"?"+ encodeQueryStr(query)
print_log('url: %s'% url)
print_log('请求方式: %s'% method)
print_log('请求头:\n %s'% headers)
print_log('请求参数:\n %s'% params)
print_log('请求参数类型:\n %s'%type(params))
requests.packages.urllib3.disable_warnings()
if method == METHOD_GET:
res = requests.get(url, params=params, headers=headers, verify=False)
else:
if'application/x-www-form-urlencoded'in content_type:
res = requests.post(url, data=params, headers=headers, verify=False)
elif'multipart/form-data'in content_type:
res = requests.post(url, data=params, headers=headers, verify=False)
else:
res = requests.post(url, json=params, headers=headers, verify=False)
print_log('返回状态码: %s'% res.status_code)
print_log('返回: %s'% res.json())
return res
defprint_log(log_info):
now = datetime.datetime.now()
log_info ='[%s]: %s'%(str(now), log_info)
print(log_info)
defencodeQueryStr(query):
afterQuery =""
if(len(query)!=0):
param = query.split("&")
param.sort()
forstrin param:
if(len(afterQuery)<1):
s =str.split("=")
if(len(s)<=2):
encodeStr = quote(s[1])
str= s[0]+"="+ encodeStr
afterQuery = afterQuery +str
else:
encodeStr =""
str= s[0]+"="+ encodeStr
afterQuery = afterQuery +str
else:
s =str.split("=")
if(len(s)>=2):
encodeStr = quote(s[1])
str= s[0]+"="+ encodeStr
afterQuery = afterQuery +"&"+str
else:
encodeStr =""
str= s[0]+"="+ encodeStr
afterQuery = afterQuery +"&"+str
return afterQuery
# params = {
# "clusterID": "50cde668-5678-11ec-91ee-005056b415b1"
# }
# # 发起请求
# get("https://cck-global.ctapi-test.ctyun.cn/v1/cce/nodepool/list", query='', params=params)
defgenerate_body(file_list, boundary, params):
lastbody =bytearray()
forfilein file_list:
file_name_key =list(file.keys())[0]
file_path =list(file.values())[0]
file_name = os.path.basename(file_path)
body1_array =bytearray(
'--'+ boundary +"\r\n"+"Content-Disposition: form-data; name=\""+ file_name_key +"\"; filename=\""+ file_name +"\r\n"+"Content-Type: application/octet-stream"+"\r\n"+"\r\n",
'utf8')
# body1_array = bytearray('--' + boundary + "\r\n" + "Content-Disposition: form-data; filename=\"" + filename + "\"\r\n" + "Content-Type: application/octet-stream" + "\r\n" + "\r\n")
if os.path.exists(file_path):
withopen(file_path,'rb')as f:
body2_array =bytearray(f.read())
body3_array =bytearray("\r\n",'utf8')
lastbody.extend(body1_array)
lastbody.extend(body2_array)
lastbody.extend(body3_array)
for param in params:
body1_array =bytearray(
'--'+ boundary +"\r\n"+"Content-Disposition: form-data; name=\""+list(param.keys())[
0]+"\r\n"+"\r\n"+list(param.values())[0]+"\r\n",'utf8')
body3_array =bytearray("\r\n")
lastbody.extend(body1_array)
lastbody.extend(body3_array)
# body1_array = bytearray('--' + boundary + "\r\n" + "Content-Disposition: form-data; filename=\"" + filename + "\"\r\n" + "Content-Type: application/octet-stream" + "\r\n" + "\r\n")
body4_array =bytearray('--'+ boundary +"--\r\n",'utf8')
lastbody.extend(body4_array)
return lastbody