使用 Python 提升生产者-消费者范式

上次我们讨论使用进程和线程的异步编程时,这里有一个链接。 强烈建议在阅读本文之前先阅读这篇文章,因为我们将使用那里提到的概念。 今天我们将简要讨论何时使用什么以及生产者-消费者范式的实现。

流程 线程
进程适用于 CPU 利用率较高的应用程序。 考虑处理大量数字或处理大量数据。 这些任务最好跨进程或 CPU 执行。 线程适用于我们有耗时的 I/O 操作的场景,例如:文件 I/O、http 调用或套接字连接。 进行此类 I/O 调用的线程等待响应,而其他线程可能会继续运行。

生产者-消费者模式

生产者-消费者设计模式今天广泛用于一系列应用程序中。 大多数集成接口都使用这种方法。 消息队列 (MQ) 广泛使用此范例并将其扩展到发布者/订阅者模式、P2P 模式或推/拉模式。

使用这种模式的基本思想是将一组任务分配给异步运行的多个线程。 我们还可以实现生产者和消费者的链接,并使用既充当生产者又充当消费者的中间节点。

以 android 通知为例,服务器将通知推送(生成)到 FCM,FCM 使用它们并进一步推送到各个设备。

另一个现实生活中的例子可能是邮政服务,人们将邮件发送给邮政服务,后者消费它们,然后将它们生产给充当消费者的收件人。

队列

在处理生产者和消费者时,我们需要一个队列。 生产者需要一个地方来推送消息,消费者需要一个地方来读取它们。 考虑到队列是这里广泛使用的数据结构,我们通常希望以 FIFO 方式处理消息。 尽管 List 和 Arrays 等其他数据结构应该可以正常工作。 对生产者和消费者来说非常重要 分享 队列的同一个对象.

生产者

生产者是异步运行并在队列中添加消息的对象。 然后推送到队列中的消息可供消费者使用。 如果消息被消费,生产者不会打扰,他们的工作是有限的,它是推送消息。 可能有多个生产者向队列生成消息。

消费者

消费者不断轮询队列,等待消息到达队列。 一旦消息可用,消费者就开始处理消息。 可能有多个消费者通过队列轮询。

毒药(可选)

帮助确定何时返回正在运行的线程。 当生产者/消费者收到毒药时,这是他们停止的信号。 这确保在处理结束时没有挂起的线程。

自己做:

有多个单一生产者和消费者的例子。 让我们提高一个档次。

假设:

有一些任务(即。 初始任务, final_task)可以比其他人相对更快地完成并且有任务(即。 中介任务) 这需要大量时间来处理。

设计:

将只有一个线程用于运行 初始任务 另一个为 final_task 但是,将有三个线程专用于 中介任务. 只要 初始任务 完成后,它需要被可用的线程拾取,然后执行 中介任务. 一旦完成,结果就会被生成到最终队列中,然后被消耗以执行 final_task.

import time  from queue import Queue  from threading import Thread   Poison = -1    class TimeConsumingNode(Thread):    def __init__(self, consumer_queue: Queue = None,                 producer_queue: Queue = None, task_time: int = 0):        super(TimeConsumingNode, self).__init__()        self.consumer_queue = consumer_queue        self.producer_queue = producer_queue        self.task_time = task_time     def run(self) -> None:       global Poison       while True:          consumed_message = self.consumer_queue.get()          print(self, f"consumed {consumed_message}")          if consumed_message == Poison:             print(self, "got poison")             # Pass the poison to intermediary producers.             self.consumer_queue.put(consumed_message)             # Pass the poison to the final consumer.             self.producer_queue.put(consumed_message)             break          # Mock processing the consumed message.          time.sleep(self.task_time)          # Mock some heavy tasks.          time.sleep(self.task_time)          print(self,                f"took at least {self.task_time * 2} for consuming "                f"{consumed_message} and for producing {consumed_message}")          self.producer_queue.put(consumed_message)    # Comparatively faster task node.  class ProducerNode(Thread):    def __init__(self, consumed_queue: Queue = None,                 produced_queue: Queue = None,                 task_time: int = 0):        super(ProducerNode, self).__init__()        self.consumed_queue = consumed_queue        self.produced_queue = produced_queue        self.task_time = task_time     def run(self) -> None:        global Poison        while not self.consumed_queue.empty():           some_number = self.consumed_queue.get()           time.sleep(self.task_time)           print(self, f"Producing {some_number}")           self.produced_queue.put(some_number)         # Add poison.        print(self, f"Adding poison")        self.produced_queue.put(Poison)    # Last task again comparatively faster one.  class ConsumerNode(Thread):    def __init__(self, consumed_queue: Queue = None, task_time: int = 0):       super(ConsumerNode, self).__init__()       self.consumed_queue = consumed_queue       self.task_time = task_time       self.poisons = 0     def run(self) -> None:       global Poison       while True:          consumed_message = self.consumed_queue.get()          if consumed_message == Poison:             self.poisons = self.poisons + 1             print(self, f"got {self.poisons} poisons")             if self.poisons == 3:                break          # Mock some time taking task          time.sleep(self.task_time / 2)          print(self,  f"took {self.task_time / 2} to consume {consumed_message}")    if __name__ == "__main__":     my_queue = Queue()     my_queue.put(2)     my_queue.put(3)     my_queue.put(4)     my_queue.put(5)     my_queue.put(6)      intermediary_queue = Queue()     final_queue = Queue()     first_prod_node = ProducerNode(consumed_queue=my_queue,  produced_queue=intermediary_queue,  task_time=3)      tc_node_1 = TimeConsumingNode(consumer_queue=intermediary_queue,  producer_queue=final_queue, task_time=4)     tc_node_2 = TimeConsumingNode(consumer_queue=intermediary_queue,  producer_queue=final_queue, task_time=5)     tc_node_3 = TimeConsumingNode(consumer_queue=intermediary_queue,  producer_queue=final_queue, task_time=2)      final_node = ConsumerNode(consumed_queue=final_queue, task_time=1)      first_prod_node.start()     tc_node_1.start()     tc_node_2.start()     tc_node_3.start()     final_node.start()      first_prod_node.join()     tc_node_1.join()     tc_node_2.join()     tc_node_3.join()     final_node.join()

输出:

风险:

如果实施不当,可能会导致

  1. 挂线
  2. 死锁
  3. 读取队列时出现 IndexOutOfBound 异常。

结论

生产者-消费者模式是一种非常有用的设计,可以在不同程度上加以利用,以实现多个耗时任务的异步处理。 该概念已被广泛纳入现代消息队列,即。 卡夫卡,RabbitMQ, Cloud AWS、GCP等提供的MQ。 他们强大而危险! 明智地使用它们!