说明: 场景是, 用python做一些任务, 但是有时间控制需求,所以写一个比较简单的控制办法.
import requests, datetime, time
import threading
class TaskThread(threading.Thread):
def __init__(self, target, args=()):
super(TaskThread, self).__init__()
self.func = target
self.args = args
# 线程入口函数,名字不能变
def run(self):
self.result = self.func(self.args)
def GetResult(self):
try:
return self.result
except Exception:
return None
def LimitDecor():
def Functions(func):
def Run(params):
limit_time = params[0]
thre_func = TaskThread(target=func, args=params)
thre_func.setDaemon(True)
thre_func.start()
sleep_num = int(limit_time // 1)
sleep_left = round(limit_time % 1, 1)
while True:
time.sleep(1)
infor = thre_func.GetResult()
if infor:
return infor
sleep_num -= 1
if sleep_num <= 0:
break
time.sleep(sleep_left)
return thre_func.GetResult()
return Run
return Functions
def Start(limit_time, process):
theadiing = TaskThread(target=Task, args=[limit_time, process])
theadiing.start()
theadiing.join()
return theadiing.GetResult()
@LimitDecor()
def Task(args=()):
process = args[1]
return process.Run()
class Process():
def __init__(self, args=()):
self.args = args
def Run(self):
print(self.args)
print("start Process")
time.sleep(4)
print("over Process")
a = 0
return a
if __name__ == '__main__':
process = Process(["buildAndroid", "debug"])
a = Start(5, process)
print(a)
使用时, Process类中填充业务, Run作为业务的执行入口.