`
kanpiaoxue
  • 浏览: 1777289 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Python的生产者消费者模式

 
阅读更多
#!python
# -*- coding: utf-8 -*-
"""
Description: 生产者/消费者模式的模块
File: thread_worker.py
Authors: kanpiaoxue
Date: 2015年11月12日 下午5:51:08
"""

from Queue import Queue
import logging
import threading
import time

import logsetting


class TaskProtocol(object):
    """
            执行任务协议类
    """

    def execute(self):
        """
                    执行任务
        """

        pass


class ProduceTasksProtocol(object):
    """
            产出运行任务(TaskProtocol)的协议类
    """

    def getTasks(self):
        """
                    产出运行的任务(TaskProtocol)列表
        @return: 任务列表
        """

        pass


class Producer(threading.Thread):
    """
        生产者消/费者模式:生产者类
    """

    def __init__(self, name, produceTasksProtocol, queue, sleepTime = 60):
        """
        @param name: 线程名称
        @param produceTasksProtocol: 生成 任务列表的协议类
        @param queue: 运行队列
        @param sleepTime: 生产者两次运行之间的间隔时间,单位:秒
        """

        threading.Thread.__init__(self, name = name)
        self.threadStop = False
        self.name = name
        self.produceTasksProtocol = produceTasksProtocol
        self.queue = queue
        self.sleepTime = int(sleepTime)

    def run(self):
        """
                    线程运行方法
        """

        logging.info('thread %s start to work.' % self.name)
        while not self.threadStop:
            tasks = self.produceTasksProtocol.getTasks()
            size = str(len(tasks))
            logging.info('produce %s tasks.' % size)
            for task in tasks:
                self.queue.put(task)
                logging.info('%s was put into queue.' % task)
            logging.info('%s will sleep %s.' % (self.name, self.sleepTime))
            time.sleep(self.sleepTime)

    def stop(self):
        """
        停止线程运行
        """

        self.threadStop = True

class Consumer(threading.Thread):
    """
        生产者消/费者模式:消费者类
    """

    def __init__(self, name, queue):
        """
        @param name: 线程名称
        @param queue: 运行队列
        """

        threading.Thread.__init__(self, name = name)
        self.threadStop = False
        self.name = name
        self.queue = queue

    def run(self):
        """
                    线程运行方法
        """

        logging.info('thread %s start to work.' % (self.name))
        while not self.threadStop:
            task = self.queue.get()
            logging.info('%s was take from queue and to work.' % task)
            start = time.clock()
            try:
                task.execute()
            except Exception as error:
                # 捕获异常,并记录错误日志。防止因为一个任务的失败造成线程退出工作
                logging.error('when execute task, Occur some exception: %s' % error)
            end = time.clock()
            logging.info('%s finish working. It consumes %s seconds.' % (task,str((end - start))))

    def stop(self):
        """
        停止线程运行
        """

        self.threadStop = True

if __name__ == '__main__':
    logsetting.init_log("./log/ctapi")
    class TaskProtocol_01(TaskProtocol):
        """
                执行任务协议类
        """
        def __init__(self, num):
            TaskProtocol.__init__(self)
            self.num = num

        def execute(self):
            """
                        执行任务
            """
            print self, 'start to work'

        def  __str__(self):
            return 'task-' + str(self.num)

    class ProduceTasksProtocol_01(ProduceTasksProtocol):
        """
                产出运行任务(TaskProtocol)的协议类
        """

        def __init__(self, count):
            ProduceTasksProtocol.__init__(self)
            self.count = count

        def getTasks(self):
            """
                        产出运行的任务(TaskProtocol)列表
            @return: 任务列表
            """
            return [TaskProtocol_01(i) for i in xrange(count)]

    count = 100
    produceTasksProtocol = ProduceTasksProtocol_01(count)
    queue = Queue()
    producer = Producer('producer', produceTasksProtocol, queue,sleepTime=5)
    producer.start()
    consumerCount = 5
    for x in range(consumerCount) :
        consumer = Consumer('consumer-' + str(x), queue)
        consumer.start()
    
    threading.current_thread().join()

 

分享到:
评论

相关推荐

    三分钟带你掌握python中的生产者与消费者模式

    1. 什么是生产者消费者模式?   在线程世界里,生产者就是生产数据(或者说发布任务)的线程,消费者就是消费数据(或者说处理任务)的线程。在任务执行过程中,如果生产者处理速度很快,而消费者处理速度很慢,...

    Python 程序语言设计模式思路-并发模式:消费者模式:协调生产者和消费者之间的数据交换

    生产者-消费者模式作为一种强大的设计模式,通过缓冲区协调生产者和消费者之间的数据交换,提高了系统的...通过合适的设计和实现,可以使生产者-消费者模式在Python应用中发挥重要作用,提高系统的响应速度和可维护性。

    生产者-消费者.zip

    生产者-消费者模式的核心思想是共享资源(通常是一个缓冲区)的分离,生产者负责生成数据并放入缓冲区,而消费者则从缓冲区取出数据进行消费。这种模式利用了线程间的协作,实现了数据的生产和消费的解耦,并避免了...

    Python生成器实现简单"生产者消费者"模型代码实例

    总的来说,Python生成器在实现“生产者消费者”模型时,通过控制数据的生成和传递,有效地管理了内存资源,同时也简化了多线程或并发编程中的同步问题。在实际开发中,生成器常用于处理大数据流、实现惰性计算、以及...

    生产者消费者演示程序

    通过对这个代码的分析和学习,我们可以更深入地理解多线程同步以及生产者消费者模式在实际应用中的实现方法。 总之,生产者消费者问题是多线程编程中的一个重要概念,它展示了如何通过同步机制来协调不同任务之间的...

    理解生产者消费者模型及在Python编程中的运用实例

    ### 理解生产者消费者模型及在Python编程中的运用实例 #### 生产者消费者模型概念 生产者消费者模型是一种经典的计算机科学模型,主要用于解决多线程或并发环境下的资源管理问题。它通过将任务分解为两个独立的...

    kafka模拟生产者消费者(集群模式)实例

    在本文中,我们将深入探讨如何在集群模式下模拟Kafka的生产者和消费者。Kafka是一种分布式流处理平台,常用于大数据实时处理和消息传递。它由Apache开发,以其高吞吐量、低延迟和可扩展性而闻名。 首先,我们要理解...

    详解Python 模拟实现生产者消费者模式的实例

    在Python中,我们可以利用线程、队列以及循环等知识来模拟实现生产者消费者模式。以下是一个简单的实例: 首先,我们需要导入必要的库: ```python import queue import time import threading import random ``` ...

    生产者消费者多线程代码

    下面是一个简单的伪代码示例,展示了如何使用互斥锁和条件变量实现生产者消费者模式: ```python import threading buffer_size = 10 buffer = [None] * buffer_size count = 0 lock = threading.Lock() not_full ...

    python多进程下的生产者和消费者模型

    Python的`multiprocessing.Queue`提供了一种简单且高效的方式来实现生产者消费者模型,通过队列作为桥梁,有效地解决了生产者和消费者之间的同步问题,确保了系统的稳定性和高效性。在设计这类系统时,理解并应用这...

    生产者消费者

    生产者消费者模式是一种经典的多线程同步问题,它在计算机科学和编程中有着广泛的应用。这个模式主要用于解决数据处理过程中的资源协调问题,确保生产者(生成数据的线程)和消费者(消耗数据的线程)之间的工作平衡...

    Python rabbitMQ如何实现生产消费者模式

    本文将详细讲解如何在Python中使用RabbitMQ来实现生产者消费者模式。 首先,我们需要在本地或服务器上安装RabbitMQ。安装完成后,可以通过Web管理界面或命令行工具`rabbitmqctl`来检查和管理队列。在Python中,我们...

    Python之两种模式的生产者消费者模型详解

    这两种模式的生产者消费者模型在 Python 中都有其应用场景。`queue` 模块的实现更适用于多线程环境,能够有效地处理并发问题,而 `yield` 协程的实现则提供了更轻量级的同步机制,适用于低延迟、高效率的需求。根据...

    生产者和消费者

    在计算机科学领域,"生产者和消费者"是一个经典的多线程问题,主要涉及并发编程和资源管理。...在实际工作中,生产者-消费者模式常用于优化系统性能,例如在数据库系统、网络服务器和操作系统内核中都有广泛应用。

    sheng xiao.rar_生产者消费者

    Java的`java.util.concurrent`包提供了`BlockingQueue`接口,简化了生产者消费者模式的实现。 在实际开发中,"生产者消费者"模式广泛应用于各种场景,如消息队列、数据库连接池、缓存系统等。例如,一个网络服务器...

    信号灯法实现生产者与消费者模式.rar

    **生产者与消费者模式** 是一个经典的多线程问题,涉及到两个主要角色:生产者负责生成数据,而消费者则负责消耗这些数据。在共享资源的情况下,如何协调两者之间的操作,避免数据竞争和死锁,就成为了一个关键挑战...

    python条件变量之生产者与消费者操作实例分析

    ### Python条件变量之生产者与消费者操作实例分析 #### 一、引言 在多线程编程中,线程间的同步是非常重要的一个方面。当多个线程共享资源时,为了防止资源冲突,需要采取合适的同步机制来确保程序的正确运行。...

    操作系统生产者消费者课设

    10. **设计模式**:生产者消费者问题也可以用生产者-消费者设计模式来解决,该模式在软件设计中广泛使用,用于解耦数据的生产和消费。 在这个课设中,你需要设计并实现一个能够正确处理生产者和消费者之间同步和...

Global site tag (gtag.js) - Google Analytics