(python篇) 多进程与多线程

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、多任务

  • 并发与并行

    • 并发

      CPU调度执行速度太快了,看上去一起执行任务数多于CPU核心数

    • 并行

      真正一起执行任务数小于等于CPU核心数

    • 并发是逻辑上的同时发生并行更多是侧重于物理上的同时发生。

  • 实现多任务的方式

    • 多进程模式

      启动多个进程每个进程虽然只有一个线程但是多个进程可以一起执行多个任务

    • 多线程模式

      启动一个进程在一个进程的内部启动多个线程这样多个线程也可以一起执行多个任务

    • 多进程+多线程

      启动多个进程每个进程再启动多个线程

    • 协程

    • 多进程+协程

二、进程

1、使用进程

  • 启动进程实现多任务

    • multiprocessing模块

      跨平台的多进程模块提供了一个Process类用来实例化一个进程对象

    • Process类

      作用创建进程(子进程)

    • __name__

      这是 Windows 上多进程的实现问题。在 Windows 上子进程会自动 import 启动它的这个文件而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。所以必须把创建子进程的部分用那个 if 判断保护起来import 的时候 __name__ 不是 __main__ 就不会递归运行了。

      参数说明
      target指定进程执行的任务
      args给进程函数传递的参数是一个元组

      注意此时进程被创建但是不会启动进程执行

    • 启动进程实现多任务

      from multiprocessing import Process

      创建子进程

 import time
 from multiprocessing import Process

 def run1():
     for i in range(7):
         print("lucky is a good man")
         time.sleep(1)

 def run2(name, word):
     for i in range(5):
         print("%s is a %s man"%(name, word))
         time.sleep(1)

 if __name__ == "__main__":
     t1 = time.time()

	'''
	P = Process(target=run,args=("nice",),name='当前进程名称')
    target指定 子进程运行的函数
    args 指定传递的参数 , 是元组类型
    启动进程Process对象.start()
	'''
     p1 = Process(target=run1)
     p2 = Process(target=run2, args=("lucky", "cool"))

     p1.start()
     p2.start()

     # 主进程的结束不能影响子进程所以可以等待子进程的结束再结束主进程
     # 等待子进程结束才能继续运行主进程
     p1.join()
     p2.join()

     t2 = time.time()
     print("耗时%.2f"%(t2-t1))
**获取进程信息**

+ os.getpid()	 获取当前进程id号
+ os.getppid() 获取当前进程的父进程id号
+ multiprocessing.current_process().name   获取当前进程名称

**父子进程的先后顺序**

+ 默认   父进程的结束不能影响子进程  让父进程等待子进程结束再执行父进程

+ p.join()  阻塞当前进程直到调用join方法的那个进程执行完再继续执行当前进程。

+ 全局变量在过个进程中不能共享

  **注意:** 在子线程中修改全局变量时对父进程中的全局变量没有影响

2、全局变量在多个子进程中不能共享

原因:

​ 在创建子进程时对全局变量做了一个备份,父进程中num变量与子线程中的num不是一个变量

3、启动大量子进程

from multiprocessing import Pool
import time

def fun1(name):
    print(f'我是{name}开始')
    time.sleep(3)
    print(f'我是{name}结束')

if __name__ == '__main__':
	print('CPU number:' + str(multiprocessing.cpu_count())) # 获取CPU核心数
    p = Pool(2)             # 开启并发数,实例化进程池  默认为核心数
    for i in range(1, 11):  # 创建了10个进程
        p.apply_async(fun1, args=(i, ))  # 把任务添加到进程池
        
    p.close()  # 关闭进程池
    p.join()  # 进程池等待
    '''
    调用join之前必须先调用close
    调用close之后就不能再继续添加新的进程
    进程池对象调用join会等待进程池中所有的子进程结束完毕再去执行父进程
    '''

4、map方法

from multiprocessing import Pool
import time

def fun1(name):
    print(f'我是{name}开始')
    time.sleep(3)
    print(f'我是{name}结束')

if __name__ == '__main__':
    p = Pool(2)  # 实例化进程池  默认为核心数
    t_list = list(range(1, 11))  # 传递进去的参数 1-10
    # print(t_list)
    p.map(fun1, t_list)  # 任务添加到进程池

5、进程间通信

队列

  • 队列常用函数

    Queue.empty() 如果队列为空返回True, 反之False

    Queue.full() 如果队列满了返回True,反之False

    Queue.get([block[, timeout]]) 获取队列timeout等待时间

    Queue.get_nowait() 相当Queue.get(False)

    Queue.put(item) 阻塞式写入队列timeout等待时间

    Queue.put_nowait(item) 相当Queue.put(item, False)

  • 特点先进先出

  • 注意

    get方法有两个参数blocked和timeout意思为阻塞和超时时间。默认blocked是true即阻塞式。

    当一个队列为空的时候如果再用get取则会阻塞所以这时候就需要吧blocked设置为false即非阻塞式实际上它就会调用get_nowait()方法此时还需要设置一个超时时间在这么长的时间内还没有取到队列元素那就抛出Queue.Empty异常。

    当一个队列为满的时候如果再用put放则会阻塞所以这时候就需要吧blocked设置为false即非阻塞式实际上它就会调用put_nowait()方法此时还需要设置一个超时时间在这么长的时间内还没有放进去元素那就抛出Queue.Full异常。

  • 队列的大小

    Queue.qsize() 返回队列的大小 不过在 Mac OS 上没法运行。

from multiprocessing import Process, Queue

def test(q):
    # 子进程放入数据
    q.put('a')
    q.put('b')
    q.put('c')

if __name__ == '__main__':
    q = Queue()   #创建队列
    p = Process(target=test, args=(q, ))
    p.start()
    p.join()
    print(q.get())  # 主进程拿数据
    print(q.get())  # 主进程拿数据
    print(q.get())  # 主进程拿数据
    print(q.get(timeout=3))  # 主进程拿数据

字典共享
Manager是一个进程间高级通信的方法 支持Python的字典和列表的数据类型

from multiprocessing import Process, Manager

def test(d):
    d['name'] = 'lucky'
    d['sex'] = 'man'
    
if __name__ == '__main__':
    p_dict = Manager().dict()  # 创建了进程通信的字典类型
    p = Process(target=test, args=(p_dict, ))
    p.start()
    p.join()
    print(p_dict)  # {'name': 'lucky', 'sex': 'man'}

列表共享

from multiprocessing import Process, Manager

def test(l):
    l.append('a')
    l.append('b')
    l.append('c')

if __name__ == '__main__':
    p_list = Manager().list()
    p = Process(target=test, args=(p_list, ))
    p.start()
    p.join()
    print(p_list)  # ['a', 'b', 'c']
  • deamon
 import multiprocessing
 import time
 def fun():
       time.sleep(100)
 if __name__=='__main__':
      p = multiprocessing.Process(target=fun)

	# 设置在start()方法之前,设置为True当父进程结束后子进程会自动被终止
      p.daemon = True 
      p.start()
  • 进程名.terminate() 强行终止子进程
 import multiprocessing
 import time
 def fun():
     time.sleep(100)
     
 if __name__=='__main__':
     p = multiprocessing.Process(target=fun)
     p.start()
     p.terminate()
     p.join()

6、进程实现生产者消费者

生产者消费者模型描述

生产者是指生产数据的任务消费者是指消费数据的任务。

当生产者的生产能力远大于消费者的消费能力生产者就需要等消费者消费完才能继续生产新的数据同理如果消费者的消费能力远大于生产者的生产能力消费者就需要等生产者生产完数据才能继续消费这种等待会造成效率的低下为了解决这种问题就引入了生产者消费者模型。

即两个或者更多的进程线程共享同一个缓冲区其中一个或多个进程线程作为“生产者”会不断地向缓冲区中添加数据另一个或者多个进程线程作为“消费者”从缓冲区中取走数据。

from multiprocessing import Process
from multiprocessing import Queue
import time

def product(q):
    print("启动生产子进程……")
    for data in ["good", "nice", "cool", "handsome"]:
        time.sleep(2)
        print("生产出%s"%data)
        # 将生产的数据写入队列
        q.put(data)
    print("结束生产子进程……")

def customer(q):
    print("启动消费子进程……")
    while 1:
        print("等待生产者生产数据")
        # 获取生产者生产的数据如果队列中没有数据会阻塞等待队列中有数据再获取
        value = q.get()
        print("消费者消费了%s数据"%(value))
    print("结束消费子进程……")

if __name__ == "__main__":
    q = Queue()

    p1 = Process(target=product, args=(q,))
    p2 = Process(target=customer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    # p2子进程里面是死循环无法等待它的结束
    # p2.join()
    # 强制结束子进程
    p2.terminate()

三、线程

1、threading模块

import threading
import time

def run():
    # 获取当前线程名称  默认名称Thread-1
    print(threading.current_thread().name)
    print('run函数开始')
    time.sleep(3)
    print('run函数结束')

if __name__ == '__main__':
    th = threading.Thread(target=run, name='lucky-1')
    th.start()
    th.join()  # 阻塞等待  和进程一样的
    print('over')
    # 查看主线程名称
    print(threading.main_thread().name)

2、多线程

import threading
import time

def run(i):
    print('子线程开始', threading.current_thread().name)
    print(f'{i}开始干活')
    time.sleep(3)
    print(f'{i}干活结束')
    
if __name__ == '__main__':
    t1 = time.time()
    t_list = []   # 存储线程对象
    
    # 并发执行5个线程
    for i in range(1, 6):
        thr = threading.Thread(target=run, args=(i, ))
        t_list.append(thr) # 线程对象添加到列表中
        
    # 循环开启子线程
    for t in t_list:
        t.start()
        
    # 循环阻塞子线程
    for i in t_list:
        i.join()

	# 打印执行时间
    print(time.time() - t1)

3、共享内存

同一进程下不同线程共享数据
使用锁来防止内存混乱

import threading

i = 0
lock = threading.Lock()

def sum1():
    global i
    with lock:
        for x in range(1000000):
            i += x
            i -= x
    print('sum1', i)

def sum2():
    global i
    with lock:
        for x in range(1000000):
            i += x
            i -= x
    print('sum2', i)

if __name__ == '__main__':
    thr1 = threading.Thread(target=sum1)
    thr2 = threading.Thread(target=sum2)
    thr1.start()
    thr2.start()
    thr1.join()
    thr2.join()
    print('over')

4、定时执行

import threading
import time

def run():
    print('执行了')

if __name__ == '__main__':
    # 3秒以后干run的活
    t = threading.Timer(3, run)
    t.start()

5、线程池

from concurrent.futures import ThreadPoolExecutor, as_completed

def run(i):
    print('开始执行子线程', i)
    return i

if __name__ == '__main__':
    pool = ThreadPoolExecutor(3) # 线程并发个数   
    List = [1, 2, 3, 4, 5]
   
    for res in pool.map(run, List):
        print(res)
'''
开始执行子线程 1
开始执行子线程 2
开始执行子线程 3
1
2
3
开始执行子线程 5
开始执行子线程 4
4
5
'''
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: python