Python多线程
Python中有一个变量叫做GIL全局解释器锁, 同一时间只能有一个线程拿到GIL运行, 线程进入IO等待态再将GIL释放供其他线程调用, 由于CPU切换花费时间多, 所以仅IO密集型适合多线程, 计算密集型不适合多线程
多线程调用
手动创建
import threading # 导入多线程库
def process_run(name):
print(f"Hello! I'm {name}!")
if __name__ == '__main__':
# 创建线程对象
t1 = threading.Thread(target=process_run, # 告诉线程任务
args=('t1',)) # 额外参数
# 设置t1为守护线程, 守护线程意味着主线程不再等待子线程运行完成再退出, 必须在执行前设置
t1.setDaemon = True
# 执行线程
t1.start()
# 等待子线程结束, 若不结束主线程会阻塞在这等待
t1.join()
# 其余信息
# t1.is_alive() 返回线程是否存活
# t1.name 返回线程的变量
# threading.current_thread() 返回当前的线程变量
# threading.enumerate() 返回一个包含正在运行的线程的list
# threading.active_count() 返回正在运行的线程数量基于模板创建
import threading
# 基于模板创建类
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self): # 进程将要执行的命令
self.fun1()
def fun1(self): # 具体运行命令
pass
if __name__ == "__main__":
my_thread = MyThread() # 实例化对象
my_thread.start() # 运行锁
访问公共资源时用到
lock = threading.Lock() # 创建一把同步锁
lock.acquire() # 尝试拿锁, 若不可拿则进入等待
lock.release() # 释放锁
lock = threading.RLock() # 创建一把递归锁, 同一进程可多次递归拿锁
lock.acquire() # 拿锁, 若不可拿则进入等待
lock.acquire() # 拿锁, 若上条拿到锁了接下来必拿到
lock.acquire() # 拿锁
lock.release() # 释放锁
lock.release() # 释放锁
lock.release() # 释放锁同步
线程之间传递消息
import threading
event = threading.Event() # 创建同步对象
event.is_set() # 返回event当前状态
event.set() # 设置event的状态值为True
event.clear() # 恢复event的状态值为False
event.wait() # 如果event的状态为Fasle, 则阻塞当前线程信号量
可视为复数数量的锁
import threading
semaphore = threading.Semaphore(5) # 创建信号量对象, 定义数量为5
semaphore.acquire() # 尝试获取信号量, 若不成功则进入等待状态
semaphore.release() # 释放信号量List不安全与Queue安全
尝试运行下面代码, 可以看到报错, 原因在于List没有做多线程处理
不安全的
List
import threading,time
test_list= [1,2,3,4,5]
print(test_list[-1])
def remove_last():
remove_obj = test_list[-1]
name = threading.current_thread().name
print(f"I'm {name} and I get {remove_obj}! It's time to sleep!")
time.sleep(1)
print(f"I'm {name} and I wake up! Try to delete {remove_obj}!")
test_list.remove(remove_obj)
print(f"I'm {name} and I finish! Delete successfully!")
t1=threading.Thread(target=remove_last)
t1.start()
t2=threading.Thread(target=remove_last)
t2.start()
"""
5
I'm Thread-1 (remove_last) and I get 5! It's time to sleep!
I'm Thread-2 (remove_last) and I get 5! It's time to sleep!
I'm Thread-1 (remove_last) and I wake up! Try to delete 5!
I'm Thread-1 (remove_last) and I finish! Delete successfully!
I'm Thread-2 (remove_last) and I wake up! Try to delete 5!
Exception in thread Thread-2 (remove_last):
Traceback (most recent call last):
File "/usr/local/python-3.10.0/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
self.run()
File "/usr/local/python-3.10.0/lib/python3.10/threading.py", line 946, in run
self._target(*self._args, **self._kwargs)
File "/root/project/DDP/Multiprocess.py", line 12, in remove_last
test_list.remove(remove_obj)
ValueError: list.remove(x): x not in list
root@torch:~/project/DDP#
"""Queue接口
import Queue
"""
三种队列
class queue.Queue(maxsize) # 队列
class queue.LifoQueue(maxsize) # 堆
class queue.PriorityQueue(maxsize) # 优先队列
"""
q = Queue.Queue(maxsize = 10) # 创建队列, 最长为10, 设为小于1的数代表无限长
q.put(10) # 向队尾插入一个元素, 传入两个参数, 第一个为item, 第二个block默认为1
# 若当前队列为满, 且block为1则线程暂停. 若clock为0, 则引发Full异常
q.get() # 从队尾插入一个元素, 传入两个参数, 第一个为item, 第二个block默认为1
# 若当前队列为空, 且block为1则线程暂停. 若clock为0, 则引发Empty异常
q.qsize() # 返回队列大小
q.empty() # 返回队列是否为空
q.full() # 返回队列是否已满
q.get([block[, timeout]]) # 获取队列, timeout等待时间
q.get_nowait() # 等价q.get(False)
q.put_nowait(item) # 相当q.put(item, False)
q.task_done() # 完成一项工作后向队列发送信号
q.join() # 等到队列为空, 再执行操作Python多进程
进程是python中最小的资源分配单元, 中间的数据内存不共享的, 每启动一个进程, 都要独立分配资源和拷贝访问的数据, 所以进程的启动和销毁的代价比较大
多进程调用
手动创建
from multiprocessing import Process
def fun1(name):
pass
p = Process(target=fun1, # 告诉进程任务
args=('Python',)) # 额外参数
p.start() # 运行进程
p.join() # 主进程等待子进程完成基于模板创建
from multiprocessing import Process
class MyProcess(Process): # 继承Process类
def __init__(self, name):
super(MyProcess, self).__init__()
self.name = name
def run(self): # 进程执行操作
pass
p = MyProcess('Python') # 实例化进程对象通信
借助Queue队列通信
# 注意, 该Queue要是multiprocessing库实现的
from multiprocessing import Process, Queue
# 向子进程任务中传入Que对象
def fun1(que_obj):
pass
q = Queue()
p = Process(target=fun1, args=(q,)) # 将Que对象给子进程借助Pipe管道通信
# 从multiprocessing库里导入管道
from multiprocessing import Process, Pipe
def fun1(conn):
conn.send('message') # 向管道发送信息
conn.recv() # 从管道接受信息
conn.close() # 关闭管道
conn1, conn2 = Pipe() # 实例化一个双向管道
p = Process(target=fun1,
args=(conn2,)) # 将其中一个管道传给子进程
conn1.send("message") # 主进程调用另外一个管道
conn1.recv() # 若管道为空则进入阻塞状态借助Manager共享数据
# 从multiprocessing库中导入Manager
from multiprocessing import Process, Manager
# 从Manager中载入实现的dict和list
dic = Manager().dict()
lis = Manager().list(range(5)
p = Process(target=fun1,
args=(dic, lis,)) # 将其传给子进程
# 与正常dict和list使用一样, 但多个进程可对其数据进行修改进程池管理
创建一个进程池, 只有当其中有空闲时才分配子进程运行, 否则进入阻塞状态
# 导入实现的线程池
from multiprocessing import Process, Pool
pool = Pool(5) # 创建一个5个进程大小的进程池
pool.apply_async(func=fun1, args=(,)) # 异步分配进程, 若进程池已满则阻塞
pool.close() # 调用join前必须close, close后就不能添加新的进程了
pool.join() # 等待进程池中的子进程执行完毕进程池map
from multiprocessing import Pool
pool = Pool()
pool.map(fun1, obj) # 传入一个可迭代对象obj
pool.close()
pool.join()