python在同一个线程中多次执行同一方法时,假设该方法执行耗时较长且每次执行过程及结果互不影响,如果只在主进程中执行,效率会很低,因此使用multiprocessing.Pool(processes=n)及其apply_async()方法提高程序执行的并行度从而提高程序的执行效率,其中processes=n为程序并行执行的进程数。
apply()方法是阻塞的,也就是说等待当前子进程执行完毕后,再执行下一个进程。
示例代码:
import time
import multiprocessing
def apply_test(s):
time.sleep(3)
print('info: %s' % s)
if __name__ == '__main__':
print('开始主进程。。。')
start = time.time()
# 使用线程池建立3个子进程
pool = multiprocessing.Pool(3)
print('开始3个子进程。。。')
for i in range(3):
pool.apply(apply_test, [i])
print('主进程结束,耗时 %s' % (time.time() - start))
运行结果:
apply_async()是异步非阻塞式,不用等待当前进程执行完毕,随时跟进操作系统调度来进行进程切换,即多个进程并行执行,提高程序的执行效率。
示例代码1:
import time
import multiprocessing
def apply_test(s):
time.sleep(3)
print('info: %s' % s)
if __name__ == '__main__':
print('开始主进程。。。')
start = time.time()
# 使用线程池建立3个子进程
pool = multiprocessing.Pool(3)
print('开始3个子进程。。。')
for i in range(3):
pool.apply_async(apply_test, [i])
print('主进程结束,耗时 %s' % (time.time() - start))
# 为了演示效果,这儿使用休眠方式
time.sleep(10)
运行结果:
示例代码2: 【主进程等待子进程都结束再结束】
import time
import multiprocessing
def apply_test(s):
time.sleep(3)
print('info: %s' % s)
if __name__ == '__main__':
print('开始主进程。。。')
start = time.time()
# 使用线程池建立3个子进程
pool = multiprocessing.Pool(3)
print('开始3个子进程。。。')
for i in range(3):
pool.apply_async(apply_test, [i])
pool.close()
pool.join()
print('主进程结束,耗时 %s' % (time.time() - start))
运行结果:
示例代码3:
import time
import random
import multiprocessing
def func(x):
ts = random.randint(1, 10)
time.sleep(ts)
print(f'{x}执行完毕!耗时{ts}s')
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(6):
print(f"开始执行第{i}个任务...")
pool.apply_async(func, args=(i, ))
pool.close()
pool.join()
运行结果:
在使用apply_async()方法接收多个参数的方法时,在任务方法中正常定义多个参数,参数以元组形式传入即可 但是给apply_async()方法传入多个值获取多个迭代结果时就会报错,因为该方法只能接收一个值,所以可以将该方法放入一个列表生成式中。
示例代码4:
import multiprocessing
def func(x):
return x ** 2
if __name__ == '__main__':
pool = multiprocessing.Pool()
res = [pool.apply_async(func, (i, )) for i in range(6)]
print([x for x in res])
print([x.get() for x in res])
pool.close()
pool.join()
运行结果:
有时候在使用多进程或者多线程执行程序时,当程序有bug时,某个进程或者线程可能会挂掉,但是自己又不容易或者很难发现是哪个线程或进程挂掉了。如示例代码5所示:
示例代码5:
import time
import random
import multiprocessing
def func(x, y):
ret = x / y
return ret
def task(i):
ts = random.randint(1, 10)
time.sleep(ts)
nums = [-1, 0, 1, 2]
x, y = random.choice(nums), random.choice(nums)
value = func(x, y)
print(f'{i}执行完毕!耗时{ts}s,结果为{value}')
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(6):
print(f"开始执行第{i}个任务...")
pool.apply_async(task, args=(i,))
pool.close()
pool.join()
运行结果:
在上述例子中,我们是打印了某个进程号,但真正项目中是不会这样打印日志的,就很难发现某个进程或者线程已经挂掉了,这时候需要使用回调函数,打印某个进程或者线程挂掉的error信息,如示例代码6所示。
示例代码6:
import time
import random
import multiprocessing
def func(x, y):
ret = x / y
return ret
def task(i):
ts = random.randint(1, 10)
time.sleep(ts)
nums = [-1, 0, 1, 2]
x, y = random.choice(nums), random.choice(nums)
value = func(x, y)
print(f'{i}执行完毕!耗时{ts}s,结果为{value}')
def error_callback(error):
print(f"Error info: {error}")
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(6):
print(f"开始执行第{i}个任务...")
pool.apply_async(task, args=(i,), error_callback=error_callback)
pool.close()
pool.join()
运行结果:
上述执行结果就很容易看出进程或者线程在运行过程中有挂掉的,而且打印出了挂掉的原因,也有利用我们后期排除程序中的bug。
同样的,除了想让报错的程序回调一下,同时也想让异步函数执行完毕也给一个回调响应值,这时可以加上callback参数,响应结果,如示例代码7所示。
示例代码7:
import time
import random
import multiprocessing
def func(x, y):
ret = x / y
return ret
def task(i):
ts = random.randint(1, 10)
time.sleep(ts)
nums = [-1, 0, 1, 2]
x, y = random.choice(nums), random.choice(nums)
value = func(x, y)
# print(f'{i}执行完毕!耗时{ts}s,结果为{value}')
return f'{i}执行完毕!耗时{ts}s,结果为{value}'
def error_callback(error):
print(f"Error info: {error}")
def call_back(info):
print(f"Right info: {info}")
if __name__ == '__main__':
pool = multiprocessing.Pool(6)
for i in range(6):
print(f"开始执行第{i}个任务...")
pool.apply_async(task, args=(i,), callback=call_back, error_callback=error_callback)
pool.close()
pool.join()
运行结果:
注意:join()等待所有子进程结束后再运行,使用join()前先使用close()关闭它。