一、进程对象的方法
start:启动进程
run:负责进行执行target指定的任务函数
from multiprocessing import Process
import time
def work():
for i in range(6):
print(f"主进程{i}工作")
time.sleep(1)
def work1():
for i in range(5):
print(f"主进程{i}学习")
time.sleep(1)
if __name__ == '__main__':
t1=Process(target=work)
t2=Process(target=work1)
t1.start()
t2.start()
print('主进程执行完毕')
for i in range(6):
print('------------------------hahaha')
time.sleep(1)
主进程执行完毕
------------------------hahaha
主进程0工作
主进程0学习
------------------------hahaha
主进程1工作
主进程1学习
------------------------hahaha
主进程2工作
主进程2学习
------------------------hahaha
主进程3工作
主进程3学习
------------------------hahaha
主进程4工作
主进程4学习
------------------------hahaha
主进程5工作
二、多个进程如何共享数据
案例一、进程与进程之间相互独立,不会改变全局变量
from multiprocessing import Process
import time
n=0
def work():
global n
n=1000+n
print('work',n)
def work1():
global n
n = 10000 + n
print('work1', n)
if __name__ == '__main__':
t1 = Process(target=work)
t2 = Process(target=work1)
t1.start()
t2.start()
print('主进程执行完毕',n)
主进程执行完毕 0
work 1000
work1 10000
案例二、使用进程中得队列JoinableQueue实现
进程的创建由用户控制,进程的调度、执行、阻塞调度由操作系统控制
data=q.get(timeout=1):表示如果超过1s队列中还没有数据,处理数据的线程阻塞
from multiprocessing import Process,JoinableQueue,Manager
import time
def work(q):
for i in range(5):
for j in range(20):
q.put(f'生产数据{j}')
print(f'【生产数据{j}】')
time.sleep(1)
def handle(q):
while True:
for i in range(4):
try:
data=q.get(timeout=1)
except:
return
else:
print('获取数据:',data)
q.task_done()
time.sleep(1)
if __name__ == '__main__':
que = JoinableQueue()
st=time.time()
t=Process(target=work,kwargs={'q':que})
t.start()
thread_list=[]
for i in range(3):
t1=Process(target=handle,kwargs={'q':que})
t1.start()
thread_list.append(t1)
#todo 等待数据生产完成
t.join()
for t1 in thread_list:
t1.join()
#todo 等待队列中的数据处理完成
que.join()
print('主线程执行完毕')
st1=time.time()
print('程序执行时间:',st1-st)
1、queue模块中的队列(Queue),它只能用于单个进程中(多个线程之间的数据共享)
2、multiprocessing中的队列,专为python多进程(Process)数据共享而设计的(多个进程的数据传输)——multiprocessing.Queue()
3、multiprocessing.Manager().Queue():python中进程池通信专用的队列
三、进程池的使用
t1 = ThreadPoolExecutor(max_workers=5):创建一个线程池,线程池中最多支持同时执行多少个任务
t1.submit(*args, **kwargs
):往线程池中提交执行的任务
t1.shutdown():等待线程池中所有的任务执行完毕之后,开始执行
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
import time
import threading
def work(name):
for i in range(6):
print(f"进程1工作---{name}")
time.sleep(1)
def work1(name):
for i in range(5):
print(f"进程2学习---{name}")
time.sleep(1)
if __name__ == '__main__':
#todo 创建一个进程池
t1 = ProcessPoolExecutor(max_workers=5) # todo 进程池中最多支持同时执行多少个任务
st=time.time()
#todo 往进程池中提交执行的任务
t1.submit(work,'kobe')
t1.submit(work1,'james')
#todo 等待进程池中所有的任务执行完毕之后,开始执行
t1.shutdown()
et=time.time()
print('执行的时间:',et-st)
四、往线程池中批量提交任务
t1.map(func1,li):往线程池中批量提交任务
等价于
for i in li:
t1.submit(func1,i)
每遍历出一条数据,向线程池中提交一条数据
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
import time
import threading
def func1(item):
for i in range(2):
print('正在执行任务{},的第{}轮'.format(item,i))
time.sleep(0.25)
if __name__ == '__main__':
#todo 创建一个进程池
t1 = ProcessPoolExecutor(max_workers=3) # todo 进程池中最多支持同时执行多少个任务
li=[11,6,8,24,22] #todo 用例数据
#todo 批量往进程池中提交任务
t1.map(func1,li)
#todo 等价于
# for i in li:
# t1.submit(func1,i)
五、with操作线程池和进程池对象
from concurrent.futures.process import ProcessPoolExecutor
import time
import threading
def func1(item):
for i in range(2):
print('正在执行任务{},的第{}轮'.format(item,i))
time.sleep(0.25)
if __name__ == '__main__':
st=time.time()
with ProcessPoolExecutor(max_workers=2) as tp:
tp.map(func1,[11,22,33])
et=time.time()
print('时长:',et-st)
六、进程间实现数据通信
q=Manager().Queue():使用进程池专用的队列进行数据传输
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
def work1(q):
number=q.get()
number = number + 100
print("--------work1-----number---", number)
q.put(number)
def work2(q):
number = q.get()
number = number + 66
print("--------work2-----number---", number)
q.put(number)
if __name__ == '__main__':
number = 100
#todo 使用进程池专用的队列进行数据传输
q=Manager().Queue()
q.put(number)
with ProcessPoolExecutor(max_workers=3) as f:
f.submit(work1,q)
f.submit(work2,q)
number=q.get()
print('主进程number',number)
--------work1-----number— 200
--------work2-----number— 266
主进程number 266