m0_63572378 2023-11-05 23:45 采纳率: 0%
浏览 11

Python多线程实现多消费者多生产者模型,子线程无法终止

我是想通过监听键盘实现从键盘输入esc键停止子线程,但是我发现消费者线程停止一个以后就卡住了,有哪位可以帮忙就解答一下吗

# 导入多线程模块
import threading
# 导入时间模块
import time
# 导入随机模块
import random

from pynput import keyboard
from queue import Queue
# 使用共享区模拟变量
_queue=Queue(maxsize=10)
# 创建条件对象
condition = threading.Condition()
count=0
isEnd=False
# 生产者线程类
class Producer(threading.Thread):
    #重写结构方法
    def __init__(self,threadName):
        super(Producer,self).__init__()
        self.threadName = threadName
    def run(self):
        global count # 引用全局共享变量count
        global _queue
        while True:
            if isEnd:
                print("{}stop".format(self.threadName))
                return False
            time.sleep(1)
            # 使用条件对象获取锁并锁定# 使用条件对象获取锁并锁定
            if condition.acquire():
                if _queue.full():
                    print("共享区已满,生产者Producer线程进入阻塞Block状态,停止放入!")
                    condition.wait() # 当前线程进入到阻塞状态
                    
                else:
                    count += 1 # 共享变量自增1
                    _queue.put(count)
                    cqueue=_queue.qsize()
                    msg = time.ctime() + '' + self.threadName + '生产了商品'+str(count)+',队列总计商品个数: '+ str(cqueue)
                    print(msg)
                    
                condition.notify() # 唤醒其他阻塞状态的线程(如,生产这线程)
                condition.release() # 解除锁定
                time.sleep(random.randrange(10)/5) # 随机休眠n秒
# 消费者线程类
class Customer(threading.Thread):
    # 重写构造方法
    def __init__(self,threadName):
        threading.Thread.__init__(self)
        self.threadName = threadName
    # 重写run 方法
    def run(self):
        global count # 引用全局共享变量count
        global _queue
        while True:
            #使用条件对象获取锁并锁定
            if condition.acquire():
                # 判断共享变量是否以为0(已空)
                if _queue.empty():
                    print("共享区已空,{}线程进入阻塞Block状态,停止获取!!".format(self.threadName))
                    #condition.wait() #当前线程进入到阻塞状态
                    print("{}1111111".format(self.threadName))
                    if isEnd:
                        print("{}stop".format(self.threadName))
                        return False
                    condition.wait() #当前线程进入到阻塞状态
                else:
                    msg = time.ctime() + '' + self.threadName + '消费了商品'+str(_queue.get())+',共享区总计商品个数:' + str(_queue.qsize())
                    print(msg)
                    condition.notify() # 唤醒其他阻塞状态的线程(如,生产者线程)
                condition.release() # 接触锁定
                time.sleep(random.randrange(10)) # 随机休眠n秒


def keyboard_on_press(key):
    global isEnd
    try:
        print('字母键{0} press'.format(key.char))
    except AttributeError:
        print('特殊键{0} press'.format(key))
        if key == keyboard.Key.esc:
            isEnd = True
            return False
 
 
# 开启键盘监听的线程函数
def start_key_listen():
    with keyboard.Listener(on_press=keyboard_on_press) as KeyboardListener:
        KeyboardListener.join()

# 脚本程序入口
if __name__ == '__main__':
    
    t1 = threading.Thread(target=start_key_listen)
    t1.start()
 
    print("ESC可以停止生产")
    time.sleep(3)
    # 等待线程结束

    producer1 = Producer('生产者1')
    producer2 = Producer('生产者2')
 
    producer2.start()
    producer1.start()

    customer_threads = []
    for i in range(5):
        customer = Customer('消费者_{}' .format(i))
        customer_threads.append(customer)
        customer.start()
 
    for customer in customer_threads:
        customer.join()
        print("aaaaa")
    producer1.join()
    producer2.join()
    t1.join()

    print('主线程已完成!')
  • 写回答

2条回答 默认 最新

  • 木头人123。 2023-11-06 09:07
    关注

    看起来问题出在消费者线程的逻辑上。你在消费者线程中使用了一个condition.wait(),它会使得线程进入阻塞状态,直到被一个condition.notify()唤醒。当你的生产者线程停止后,生产者线程不再提供condition.notify(),因此消费者线程会一直停在condition.wait()这一步,即使isEnd=True

    你可以尝试在condition.wait()前检查isEnd的状态,如果isEnd=True,就跳出循环,结束线程。以下是修改后的消费者线程的run方法:

    def run(self):
        global count  # 引用全局共享变量count
        global _queue
        while True:
            # 使用条件对象获取锁并锁定
            if condition.acquire():
                # 判断共享变量是否已为空(已空)
                if _queue.empty():
                    print("共享区已空,{}线程进入阻塞Block状态,停止获取!!".format(self.threadName))
                    if isEnd:
                        print("{}stop".format(self.threadName))
                        return False
                    condition.wait()  # 当前线程进入到阻塞状态
                else:
                    msg = time.ctime() + '' + self.threadName + '消费了商品' + str(_queue.get()) + ',共享区总计商品个数:' + str(_queue.qsize())
                    print(msg)
                    condition.notify()  # 唤醒其他阻塞状态的线程(如,生产者线程)
                condition.release()  # 解除锁定
                time.sleep(random.randrange(10))  # 随机休眠n秒
    

    这样修改后,消费者线程在每次检查队列是否为空之前,都会先检查isEnd的状态,如果发现isEnd=True,就会立即结束线程,而不会进入condition.wait()

    评论

报告相同问题?

问题事件

  • 创建了问题 11月5日

悬赏问题

  • ¥15 flexsim统计分布设置
  • ¥15 win10远程协助无法复制粘贴
  • ¥15 关于#stm32#的问题:输出低电平,给出STM32标准库代码,实现测量滴速功能,使用的GPIO口试PA6,定时器是TIM3
  • ¥15 单闭环无静差调速实验
  • ¥15 RK3588s使用IMX219的报错问题
  • ¥15 完成以下任务要代码跟截图
  • ¥20 类型 :异常报告消息:Servlet执行抛出一个异常描述:服务器遇到一个意外的情况,求帮助,急?
  • ¥15 算法流程图(相关搜索:循环结构|选择结构|顺序结构)
  • ¥15 three.js引入camera但是不能实现移动
  • ¥160 关于maxsurf的问题