# -*- coding: utf8 -*-
import urllib
import datetime
import requests
import json
import hashlib
import base64
import hmac
import datetime
import uuid
import os
import sys
if sys.version_info.major == 2:
from urllib import quote
VERSION = '2'
else:
VERSION = '3'
from urllib.parse import quote
METHOD_GET = 'GET'
METHOD_POST = 'POST'
file = False
# 官网accessKey
ak = 'xxx'
# 官网securityKey
sk = 'xxx'
def hmac_sha256(secret, data):
if type(secret) == bytes:
secret = bytearray(secret)
else:
secret = bytearray(secret, 'utf8')
data = bytearray(data, 'utf8')
return hmac.new(secret, data, digestmod=hashlib.sha256).digest()
def base64_of_hmac(data):
return base64.b64encode(data)
def get_request_uuid():
return str(uuid.uuid1())
def get_sorted_str(data, method):
"""
鉴权用的参数整理
:param data: dict 需要整理的参数
:return: str
"""
sorted_data = sorted(data.items(), key=lambda item: item[0])
str_list = map(lambda x_y: '%s=%s' % (x_y[0], x_y[1]), sorted_data)
return '&'.join(str_list)
def build_sign(query_params, body_params, eop_date, request_uuid, method):
"""
计算鉴权字段
:param query_params: dict get请求中的参数
:param body_params: dict post请求中的参数
:param eop_date: str 请求时间,格式为:'%Y%m%dT%H%M%SZ'
:return: str
"""
body_str = ""
if not file:
body_str = json.dumps(body_params) if body_params else ''
if method == METHOD_POST:
if file:
body_digest = hashlib.sha256(body_params).hexdigest()
# print ('body_params +'+body_params+' body_params')
# print ('body_digest +'+body_digest+' body_digest')
else:
if isinstance(body_params, dict):
body_digest = hashlib.sha256(json.dumps(body_params).encode('utf-8')).hexdigest()
else:
body_digest = hashlib.sha256(body_params.encode('utf-8')).hexdigest()
else:
body_digest = hashlib.sha256(body_str.encode('utf-8')).hexdigest()
# 请求头中必要的两个参数
header_str = 'ctyun-eop-request-id:%s\neop-date:%s\n' % (request_uuid, eop_date)
# url中的参数,或get参数
query_str = encodeQueryStr(get_sorted_str(query_params, method))
signature_str = '%s\n%s\n%s' % (header_str, query_str, body_digest)
print_log(repr('signature_str is: %s' % signature_str))
sign_date = eop_date.split('T')[0]
# 计算鉴权密钥
k_time = hmac_sha256(sk, eop_date)
k_ak = hmac_sha256(k_time, ak)
k_date = hmac_sha256(k_ak, sign_date)
signature_base64 = base64_of_hmac(hmac_sha256(k_date, signature_str))
# 构建请求头的鉴权字段值
sign_header = '%s Headers=ctyun-eop-request-id;eop-date Signature=%s' % (ak, signature_base64.decode('utf8'))
print("sign_header :" + sign_header)
return sign_header.encode('utf8')
def get_sign_headers(query_params, body, method, content_type):
"""
获取鉴权用的请求头参数
:param query_params: dict get请求中的参数
:param body: dict post请求中的参数
:return:
"""
now = datetime.datetime.now()
eop_date = datetime.datetime.strftime(now, '%Y%m%dT%H%M%SZ')
request_uuid = get_request_uuid()
headers = { # 三个鉴权用的参数
'Content-type': content_type,
'ctyun-eop-request-id': request_uuid,
'Eop-Authorization': build_sign(query_params=query_params, body_params=body, eop_date=eop_date,
request_uuid=request_uuid, method=method),
'Eop-date': eop_date,
}
return headers
def get(url, query="", params=None, header_params=None):
return execute(url, query, method=METHOD_GET, params=params, header_params=header_params)
def post(url,query="", params=None, header_params=None, content_type='application/json;charset=UTF-8'):
return execute(url,query, method=METHOD_POST, params=params, header_params=header_params,
content_type=content_type)
def execute(url, query, method, params=None, header_params=None, content_type='application/json;charset=UTF-8'):
if 'application/x-www-form-urlencoded' in content_type:
if VERSION == '2':
params = urllib.urlencode(params)
else:
params = urllib.parse.urlencode(params)
params = params or {}
header_params = header_params or {}
if method == METHOD_GET:
query_params, body = (params, {})
else:
query_params = {}
if len(query) > 0:
for q in query.split('&'):
query_params[q.split("=")[0]] = q.split("=")[1]
body = params
headers = get_sign_headers(query_params, body, method, content_type)
headers.update(header_params)
headers.update(
{"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0"}
)
url = url + "?" + encodeQueryStr(query)
print_log('url: %s' % url)
print_log('请求方式: %s' % method)
print_log('请求头:\n %s' % headers)
print_log('请求参数:\n %s' % params)
print_log('请求参数类型:\n %s' % type(params))
requests.packages.urllib3.disable_warnings()
if method == METHOD_GET:
res = requests.get(url, params=params, headers=headers, verify=False)
else:
if 'application/x-www-form-urlencoded' in content_type:
res = requests.post(url, data=params, headers=headers, verify=False)
elif 'multipart/form-data' in content_type:
res = requests.post(url, data=params, headers=headers, verify=False)
else:
res = requests.post(url, json=params, headers=headers, verify=False)
print_log('返回状态码: %s' % res.status_code)
print_log('返回: %s' % res.json())
return res
def print_log(log_info):
now = datetime.datetime.now()
log_info = '[%s]: %s' % (str(now), log_info)
print(log_info)
def encodeQueryStr(query):
afterQuery = ""
if (len(query) != 0):
param = query.split("&")
param.sort()
for str in param:
if (len(afterQuery) < 1):
s = str.split("=")
if (len(s) <= 2):
encodeStr = quote(s[1])
str = s[0] + "=" + encodeStr
afterQuery = afterQuery + str
else:
encodeStr = ""
str = s[0] + "=" + encodeStr
afterQuery = afterQuery + str
else:
s = str.split("=")
if (len(s) >= 2):
encodeStr = quote(s[1])
str = s[0] + "=" + encodeStr
afterQuery = afterQuery + "&" + str
else:
encodeStr = ""
str = s[0] + "=" + encodeStr
afterQuery = afterQuery + "&" + str
return afterQuery
get("https://cthpc-global.ctapi.ctyun.cn/v4/cthpc/list-cluster-type",
params={"regionID": "6019b5007a0b11eab5db0242ac110002"})