Python多线程

Python中有一个变量叫做GIL全局解释器锁, 同一时间只能有一个线程拿到GIL运行, 线程进入IO等待态再将GIL释放供其他线程调用, 由于CPU切换花费时间多, 所以仅IO密集型适合多线程, 计算密集型不适合多线程

多线程调用

手动创建

import threading # 导入多线程库

def process_run(name):
    print(f"Hello! I'm {name}!")

if __name__ == '__main__':
    # 创建线程对象
    t1 = threading.Thread(target=process_run, # 告诉线程任务
                          args=('t1',))     # 额外参数
    # 设置t1为守护线程, 守护线程意味着主线程不再等待子线程运行完成再退出, 必须在执行前设置
    t1.setDaemon = True
    # 执行线程
    t1.start()
    # 等待子线程结束, 若不结束主线程会阻塞在这等待
    t1.join()

    # 其余信息
    # t1.is_alive()              返回线程是否存活
    # t1.name                    返回线程的变量
    # threading.current_thread() 返回当前的线程变量
    # threading.enumerate()      返回一个包含正在运行的线程的list
    # threading.active_count()   返回正在运行的线程数量

基于模板创建

import threading

# 基于模板创建类
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):  # 进程将要执行的命令
        self.fun1()
    def fun1(self): # 具体运行命令
        pass

if __name__ == "__main__":
    my_thread = MyThread() # 实例化对象
    my_thread.start()      # 运行

访问公共资源时用到

lock = threading.Lock()  # 创建一把同步锁
lock.acquire()           # 尝试拿锁, 若不可拿则进入等待
lock.release()           # 释放锁

lock = threading.RLock() # 创建一把递归锁, 同一进程可多次递归拿锁
lock.acquire()           # 拿锁, 若不可拿则进入等待
lock.acquire()           # 拿锁, 若上条拿到锁了接下来必拿到
lock.acquire()           # 拿锁
lock.release()           # 释放锁
lock.release()           # 释放锁
lock.release()           # 释放锁

同步

线程之间传递消息

import threading

event = threading.Event() # 创建同步对象
event.is_set()            # 返回event当前状态
event.set()               # 设置event的状态值为True
event.clear()             # 恢复event的状态值为False
event.wait()              # 如果event的状态为Fasle, 则阻塞当前线程

信号量

可视为复数数量的锁

import threading

semaphore = threading.Semaphore(5) # 创建信号量对象, 定义数量为5
semaphore.acquire() # 尝试获取信号量, 若不成功则进入等待状态
semaphore.release() # 释放信号量

List不安全与Queue安全

尝试运行下面代码, 可以看到报错, 原因在于List没有做多线程处理

不安全的List

import threading,time

test_list= [1,2,3,4,5]
print(test_list[-1])

def remove_last():
    remove_obj = test_list[-1]
    name = threading.current_thread().name
    print(f"I'm {name} and I get {remove_obj}! It's time to sleep!")
    time.sleep(1)
    print(f"I'm {name} and I wake up! Try to delete {remove_obj}!")
    test_list.remove(remove_obj)
    print(f"I'm {name} and I finish! Delete successfully!")


t1=threading.Thread(target=remove_last)
t1.start()

t2=threading.Thread(target=remove_last)
t2.start()

"""
5
I'm Thread-1 (remove_last) and I get 5! It's time to sleep!
I'm Thread-2 (remove_last) and I get 5! It's time to sleep!
I'm Thread-1 (remove_last) and I wake up! Try to delete 5!
I'm Thread-1 (remove_last) and I finish! Delete successfully!
I'm Thread-2 (remove_last) and I wake up! Try to delete 5!
Exception in thread Thread-2 (remove_last):
Traceback (most recent call last):
  File "/usr/local/python-3.10.0/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/usr/local/python-3.10.0/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/root/project/DDP/Multiprocess.py", line 12, in remove_last
    test_list.remove(remove_obj)
ValueError: list.remove(x): x not in list
root@torch:~/project/DDP# 
"""

Queue接口

import Queue

"""
三种队列
class queue.Queue(maxsize)         # 队列
class queue.LifoQueue(maxsize)     # 堆
class queue.PriorityQueue(maxsize) # 优先队列
"""

q = Queue.Queue(maxsize = 10) # 创建队列, 最长为10, 设为小于1的数代表无限长
q.put(10) # 向队尾插入一个元素, 传入两个参数, 第一个为item, 第二个block默认为1
          # 若当前队列为满, 且block为1则线程暂停. 若clock为0, 则引发Full异常
q.get()   # 从队尾插入一个元素, 传入两个参数, 第一个为item, 第二个block默认为1
          # 若当前队列为空, 且block为1则线程暂停. 若clock为0, 则引发Empty异常
q.qsize() # 返回队列大小
q.empty() # 返回队列是否为空
q.full()  # 返回队列是否已满
q.get([block[, timeout]]) # 获取队列, timeout等待时间
q.get_nowait()     # 等价q.get(False)
q.put_nowait(item) # 相当q.put(item, False)
q.task_done()      # 完成一项工作后向队列发送信号
q.join()           # 等到队列为空, 再执行操作

Python多进程

进程是python中最小的资源分配单元, 中间的数据内存不共享的, 每启动一个进程, 都要独立分配资源和拷贝访问的数据, 所以进程的启动和销毁的代价比较大

多进程调用

手动创建

from multiprocessing import Process

def fun1(name):
    pass

p = Process(target=fun1,      # 告诉进程任务
            args=('Python',)) # 额外参数

p.start() # 运行进程
p.join()  # 主进程等待子进程完成

基于模板创建

from multiprocessing import Process

class MyProcess(Process): # 继承Process类
    def __init__(self, name):
        super(MyProcess, self).__init__()
        self.name = name
    def run(self): # 进程执行操作
        pass

p = MyProcess('Python') # 实例化进程对象

通信

借助Queue队列通信

# 注意, 该Queue要是multiprocessing库实现的
from multiprocessing import Process, Queue

# 向子进程任务中传入Que对象
def fun1(que_obj):
    pass

q = Queue()
p = Process(target=fun1, args=(q,)) # 将Que对象给子进程

借助Pipe管道通信

# 从multiprocessing库里导入管道
from multiprocessing import Process, Pipe

def fun1(conn):
    conn.send('message') # 向管道发送信息
    conn.recv()          # 从管道接受信息
    conn.close()         # 关闭管道

conn1, conn2 = Pipe() # 实例化一个双向管道
p = Process(target=fun1, 
            args=(conn2,)) # 将其中一个管道传给子进程

conn1.send("message") # 主进程调用另外一个管道
conn1.recv()          # 若管道为空则进入阻塞状态

借助Manager共享数据

# 从multiprocessing库中导入Manager
from multiprocessing import Process, Manager

# 从Manager中载入实现的dict和list
dic = Manager().dict()
lis = Manager().list(range(5)

p = Process(target=fun1, 
            args=(dic, lis,)) # 将其传给子进程
# 与正常dict和list使用一样, 但多个进程可对其数据进行修改

进程池管理

创建一个进程池, 只有当其中有空闲时才分配子进程运行, 否则进入阻塞状态

# 导入实现的线程池
from multiprocessing import Process, Pool

pool = Pool(5) # 创建一个5个进程大小的进程池

pool.apply_async(func=fun1, args=(,)) # 异步分配进程, 若进程池已满则阻塞

pool.close() # 调用join前必须close, close后就不能添加新的进程了
pool.join()  # 等待进程池中的子进程执行完毕

进程池map

from multiprocessing import Pool 

pool = Pool()
pool.map(fun1, obj) # 传入一个可迭代对象obj
pool.close()
pool.join()
最后修改:2025 年 03 月 04 日
赛博讨口子