# -*- coding: utf8 -*-
import requests
import json
import hashlib
import base64
import hmac
import datetime
HTTP_668 = 'https://sdwan-global.ctapi.ctyun.cn'
METHOD_GET = 'GET'
METHOD_POST = 'POST'
AK = 'a998119edd7611e8a3d7186590d96509'
SK = 'afe2f64635f4f4171218b422324043302340a717'
def to_md5_base64(params_str):
md = hashlib.md5()
md.update(bytearray(params_str.encode('utf-8')))
digest = md.digest()
result = base64.b64encode(digest)
return result
def calculate_hmac(secret_key, data_str):
result = base64.b64encode(hmac.new(bytearray(secret_key.encode('utf-8')),
bytearray(data_str.encode('utf-8')), hashlib.sha1).digest())
return result
def local2utc():
now = datetime.datetime.now()
utc = now + datetime.timedelta(hours=-8)
utc_time = utc.strftime('%a, %d %b %Y %H:%M:%S UTC')
return utc_time
def fill_headers(params_str, servicePath, ak=None, sk=None, request_date=None):
ak = ak or AK
sk = sk or SK
md5_base64 = to_md5_base64(params_str)
requestDate = request_date or local2utc()
data = '%s\n%s\n%s' % (md5_base64, requestDate, servicePath)
print(repr('data_str: %s' % data))
hmacs = calculate_hmac(sk, data)
headers = {
'ACCESSKEY': ak,
'contentMD5': md5_base64,
'requestDate': requestDate,
'hmac': hmacs,
# 'platform': '3',
}
return headers
def get(host, uri, params=None, header_params=None, custom_info=None):
"""
GET请求
:param host: 请求地址
:param uri: 接口
:param params: 参数
:param header_params: 额外请求头参数
:return:
"""
return execute(host, uri, METHOD_GET, params, header_params, custom_info=custom_info)
def post(host, uri, params, header_params=None, custom_info=None):
"""
POST请求
:param host: 请求地址
:param uri: 接口
:param params: 参数
:param header_params: 额外请求头参数
:param custom_info:
:return:
"""
return execute(host, uri, METHOD_POST, params, header_params, custom_info)
def execute(host, uri, method, params=None, header_params=None, custom_info=None):
url = host + uri
params = params or {}
header_params = header_params or {}
headers = fill_headers('', uri)
headers.update(header_params)
custom_info = custom_info or {}
if method != METHOD_GET and custom_info:
params.update({'customInfo': custom_info})
elif custom_info:
headers.update({'customInfo': json.dumps(custom_info)})
headers['Content-Type'] = "application/json"
print_log(u'url: %s' % url)
print_log(u'请求方式: %s' % method)
print_log(u'请求头: %s' % headers)
print_log(u'请求参数: %s' % params)
if method == METHOD_GET:
res = requests.get(url, data=json.dumps(params), headers=headers)
else:
res = requests.post(url, data=json.dumps(params), headers=headers)
print_log(u'返回状态码: %s' % res.status_code)
print_log(u'返回: %s' % res.text)
return res
def print_log(log_info):
now = datetime.datetime.now()
log_info = '[%s]: %s' % (str(now), log_info)
print log_info
custom_info = {
"identity": {
"accountId": "bac_9eb5d50887d1442d9feae0b598c3297e",
"userId": "62069fea57934345bc1c311da6e7bf2f",
}
}
params = {}
get(HTTP_668, '/openapi/v4/sdwan/edge-config-count/list', params=params, custom_info=custom_info)
params = {
"edgeID": "87a74704-3efd-11ed-910d-005056893e3e",
"page": "0",
"pageSize": "10"
}
get(HTTP_668, '/openapi/v4/sdwan/edge-static-route/list', params=params, custom_info=custom_info)