Condition条件变量简介
在多线程编程中,线程同步是一个核心概念。当多个线程需要访问共享资源时,必须协调它们的执行顺序,以避免竞态条件和数据不一致问题。Python的threading
模块提供了多种同步原语,其中Condition
(条件变量)是最强大和灵活的机制之一。
条件变量允许一个或多个线程等待特定条件成立,而其他线程可以在条件改变时通知等待的线程。它通常与锁(Lock)结合使用,为复杂的线程间通信提供了更高级别的抽象。
生产者线程
创建数据
消费者线程
处理数据
Condition的基本方法
1. 创建Condition对象
创建Condition对象时可以传入一个锁对象(Lock或RLock),如果不传入则自动创建一个RLock。
import threading
# 创建Condition对象(自动创建关联的锁)
condition = threading.Condition()
# 使用已有的锁创建Condition
lock = threading.Lock()
condition_with_lock = threading.Condition(lock)
2. acquire() 和 release()
Condition对象本身也是一个锁对象,因此支持acquire()
和release()
方法,用于获取和释放底层锁。
3. wait(timeout=None)
调用此方法会释放底层锁,并使线程进入等待状态,直到被其他线程唤醒或超时。被唤醒后,线程会重新获取锁。
4. notify(n=1)
唤醒一个或多个等待此条件的线程(如果有)。调用此方法时必须持有锁。
5. notify_all()
唤醒所有等待此条件的线程。这是notify()
的更安全替代方法,可以避免某些线程被永久遗漏。
import threading
import time
condition = threading.Condition()
shared_data = []
def consumer():
with condition:
print("消费者: 等待数据...")
condition.wait() # 释放锁并等待
print(f"消费者: 接收到数据 -> {shared_data.pop()}")
print("消费者: 处理数据完成")
def producer():
time.sleep(2) # 模拟数据准备时间
with condition:
print("生产者: 准备数据...")
shared_data.append("重要数据")
print("生产者: 通知消费者")
condition.notify() # 唤醒一个消费者线程
# 创建并启动线程
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
t2.start()
t1.join()
t2.join()
经典案例:生产者-消费者模式
生产者-消费者模式是条件变量的典型应用场景。生产者生成数据放入缓冲区,消费者从缓冲区取出数据进行处理。当缓冲区满时,生产者需要等待;当缓冲区空时,消费者需要等待。
import threading
import time
import random
# 有限大小的缓冲区
BUFFER_SIZE = 5
buffer = []
condition = threading.Condition()
def producer():
global buffer
for i in range(10): # 生产10个产品
time.sleep(random.uniform(0.1, 0.5)) # 模拟生产时间
with condition:
# 检查缓冲区是否已满
while len(buffer) >= BUFFER_SIZE:
print("生产者: 缓冲区已满,等待中...")
condition.wait()
# 生产产品并放入缓冲区
item = f"产品-{i+1}"
buffer.append(item)
print(f"生产者: 生产了 {item} | 缓冲区: {buffer}")
# 通知消费者
condition.notify_all()
def consumer():
global buffer
for _ in range(10): # 消费10个产品
time.sleep(random.uniform(0.2, 0.8)) # 模拟消费时间
with condition:
# 检查缓冲区是否为空
while len(buffer) == 0:
print("消费者: 缓冲区为空,等待中...")
condition.wait()
# 从缓冲区取出产品
item = buffer.pop(0)
print(f"消费者: 消费了 {item} | 缓冲区: {buffer}")
# 通知生产者
condition.notify_all()
# 创建多个生产者和消费者
producers = [threading.Thread(target=producer) for _ in range(2)]
consumers = [threading.Thread(target=consumer) for _ in range(3)]
# 启动所有线程
for t in producers + consumers:
t.start()
# 等待所有线程完成
for t in producers + consumers:
t.join()
Condition与其它同步机制对比
同步机制 | 适用场景 | 特点 | 与Condition的区别 |
---|---|---|---|
Lock(互斥锁) | 简单的互斥访问 | 基本锁,只有两种状态(锁定/非锁定) | Condition内部使用Lock,但增加了等待/通知机制 |
RLock(可重入锁) | 同一线程多次获取锁 | 允许同一线程多次acquire() | Condition默认使用RLock作为底层锁 |
Semaphore(信号量) | 控制资源访问数量 | 维护一个计数器,限制同时访问的线程数 | Condition提供更精细的条件控制,而非简单的计数 |
Event(事件) | 简单线程间通信 | 基于标志的通信机制(set/wait) | Condition更灵活,允许多个条件等待队列 |
Barrier(屏障) | 线程同步点 | 所有线程到达屏障点后才能继续 | 解决不同问题,Condition用于条件满足时唤醒 |
Condition使用的最佳实践
- 始终使用with语句: 确保在退出代码块时自动释放锁,避免死锁
- 使用while检查条件: 防止虚假唤醒,确保条件真正满足
- 优先使用notify_all(): 除非有特殊理由,否则使用notify_all()更安全
- 避免嵌套锁: 如果使用RLock,确保acquire和release次数匹配
- 保持锁内代码简短: 锁定时尽量减少操作,避免长时间阻塞其他线程
- 处理超时情况: 在wait()中使用timeout参数,避免线程永久阻塞
import threading
import time
condition = threading.Condition()
data_ready = False
def worker():
with condition:
print("工作者: 等待数据准备...")
# 最多等待3秒
result = condition.wait(timeout=3.0)
if data_ready:
print("工作者: 数据已准备好,开始工作")
else:
print("工作者: 等待超时,继续执行其他任务")
def preparer():
time.sleep(5) # 模拟长时间准备
with condition:
global data_ready
data_ready = True
print("准备者: 数据已准备好")
condition.notify()
# 创建并启动线程
worker_thread = threading.Thread(target=worker)
preparer_thread = threading.Thread(target=preparer)
worker_thread.start()
preparer_thread.start()
worker_thread.join()
preparer_thread.join()
常见问题解答
1. Condition和Event有什么区别?
Event对象适用于简单的标志通知,而Condition提供了更复杂的等待/通知机制,支持多个等待条件,并且自动管理锁的获取和释放。
2. 为什么有时会看到多个Condition共享同一个锁?
当多个条件变量共享同一个锁时,可以实现更复杂的同步逻辑。例如,在有界缓冲区问题中,可以使用两个条件变量:一个用于"非满"条件(生产者等待),一个用于"非空"条件(消费者等待)。
3. Condition是否支持多个条件等待?
是的,Condition对象可以支持多个等待条件。但更常见的做法是为每个逻辑条件创建单独的Condition对象,这样可以使用notify()更精确地唤醒特定条件的等待线程。
4. 如何避免死锁?
避免死锁的关键原则:
- 按固定顺序获取锁
- 使用超时参数
- 保持锁内代码简短
- 避免在持有锁时调用可能阻塞的操作
- 使用with语句自动管理锁
5. Condition在asyncio中有对应物吗?
在异步编程中,asyncio.Condition提供了类似的功能,但基于协程而非线程。它的工作原理类似,但使用async with语法和await wait()。
发表评论