当前位置:首页 > Python > 正文

Python队列实现多线程教程 - 线程安全的生产者消费者模型详解

Python队列(Queue)实现多线程教程

本教程将教你:

  • 使用queue.Queue实现线程安全的数据交换
  • 创建生产者-消费者多线程模型
  • 通过队列实现线程间通信
  • 处理多线程任务管理

一、为什么需要队列?

Python多线程编程中,队列(Queue)是线程安全的数据结构,解决线程间数据共享的安全问题,避免使用锁机制带来的复杂性。

二、核心实现步骤

  1. 导入必要模块:threadingqueue
  2. 创建Queue实例
  3. 定义生产者函数(添加数据到队列)
  4. 定义消费者函数(从队列获取数据)
  5. 启动多个生产者和消费者线程
  6. 使用task_done()join()管理任务状态

三、完整代码示例

import threading
import queue
import time

# 创建线程安全队列
task_queue = queue.Queue(maxsize=5)

def producer(name):
    """生产者线程函数"""
    for i in range(3):
        item = f"{name}-产品{i}"
        task_queue.put(item)  # 阻塞添加数据
        print(f"生产者 {name} 创建了: {item}")
        time.sleep(0.5)

def consumer(name):
    """消费者线程函数"""
    while True:
        try:
            # 设置超时防止永久阻塞
            item = task_queue.get(timeout=2)
            print(f"消费者 {name} 处理了: {item}")
            # 标记任务已完成
            task_queue.task_done()
        except queue.Empty:
            break

if __name__ == "__main__":
    # 创建2个生产者线程
    producers = [
        threading.Thread(target=producer, args=("工厂A",)),
        threading.Thread(target=producer, args=("工厂B",))
    ]
    
    # 创建3个消费者线程
    consumers = [
        threading.Thread(target=consumer, args=("客户1",)),
        threading.Thread(target=consumer, args=("客户2",)),
        threading.Thread(target=consumer, args=("客户3",))
    ]
    
    # 启动所有线程
    for t in producers + consumers:
        t.daemon = True
        t.start()
    
    # 阻塞直到所有任务完成
    task_queue.join()
    print("所有任务处理完成!")

四、关键方法说明

方法 说明
Queue.put(item) 添加元素到队列(队列满时阻塞)
Queue.get() 从队列取出元素(队列空时阻塞)
Queue.task_done() 标记任务处理完成
Queue.join() 阻塞直到所有任务完成

五、实际应用场景

  • Web请求的异步处理
  • 大数据分块处理
  • IO密集型任务调度
  • 日志记录系统

发表评论