python队列模块Queue

一、初识Queue模块

Queue模块实现了多生产者、多消费者队列。它特别适用于信息必须在多个线程间安全地交换的多线程程序中。这个模块中的Queue类实现了所有必须的锁语义。它依赖于Python中线程支持的可用性;参见threading模块

模块实现了三类队列:FIFO(First In First Out,先进先出,默认为该队列)、LIFO(Last In First Out,后进先出)、基于优先级的队列。以下为其常用方法:

先进先出  q = Queue.Queue(maxsize)
后进先出  a = Queue.LifoQueue(maxsize)
优先级  Queue.PriorityQueue(maxsize)
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.put(item) 写入队列,timeout等待时间   非阻塞
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,函数向任务已经完成的队列发送一个信号
Queue.join(): 实际上意味着等到队列为空,再执行别的操作

更详细部分可以参看python标准库之Queue模块介绍。

二、队列示列

1、FIFO(先进先出)

import Queue
q = Queue.Queue()
for i in range(5):
    q.put(i)
while not q.empty():
    print q.get()

其输出结果如下:

[root@361way queue]# python fifo.py
1
2
3
4

其输出顺序与进入顺序相同。

2、LIFO(后进先出)

import Queue
q = Queue.LifoQueue()
for i in range(5):
    q.put(i)
while not q.empty():
    print q.get()

执行结果如下:

import Queue
q = Queue.LifoQueue()
for i in range(5):
    q.put(i)
while not q.empty():
    print q.get()

3、带优先级的队列

import Queue
class Job(object):
    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print 'New job:', description
        return
    def __cmp__(self, other):
        return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put( Job(3, 'Mid-level job') )
q.put( Job(10, 'Low-level job') )
q.put( Job(1, 'Important job') )
while not q.empty():
    next_job = q.get()
    print 'Processing job:', next_job.description

执行结果如下:

[root@361way queue]# python Queue_priority.py
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job

从上面的执行结果可以看出,优先级值设置越小,越先执行。另外这里是以单线程为例的,在多thread的示例中,多个线程同时get()  item 时,这时就可以根据优先级决定哪一个任务先执行。

三、队列与线程

在实际使用队列是与线程结合在一起的。这里列几个队列与线程的代码示例:

from Queue import *
from threading import Thread
import sys
'''this function will process the items in the queue, in serial'''
def processor():
    while True:
        if queue.empty() == True:
            print "the Queue is empty!"
            sys.exit(1)
        try:
            job = queue.get()
            print "I'm operating on job item: %s"%(job)
            queue.task_done()
        except:
            print "Failed to operate on job"
'''set variables'''
queue = Queue()
threads = 4
'''a list of job items. you would want this to be more advanced,
like reading from a file or database'''
jobs = [ "job1", "job2", "job3" ]
'''iterate over jobs and put each into the queue in sequence'''
#for job in jobs:
for job in range(100):
     print "inserting job into the queue: %s"%(job)
     queue.put(job)
'''start some threads, each one will process one job from the queue'''
#for i in range(100):
for i in range(threads):
     th = Thread(target=processor)
     th.setDaemon(True)
     th.start()
'''wait until all jobs are processed before quitting'''
queue.join()

需要注意的是processer函数里的“ while True:”行 ,如果没了这行,当线程(thread)数小于队列数时,第一轮循环完后就会卡住,不执行后面的循环了。所以加上该行,就相当于开始了一个死循环,直到所有的队列结束时,队列为空,循环结束。

示例2:

[root@361way tmp]# python queue-example-1.py
task 0 finished
task 1 finished
task 3 finished
task 2 finished
task 5 finished
task 4 finished
task 6 finished
task 7 finished
task 9 finished
task 8 finished
[root@361way tmp]# more queue-example-1.py
# File: queue-example-1.py
import threading
import Queue
import time, random
WORKERS = 2
class Worker(threading.Thread):
    def __init__(self, queue):
        self.__queue = queue
        threading.Thread.__init__(self)
    def run(self):
        while 1:
            item = self.__queue.get()
            if item is None:
                break # reached end of queue
            # pretend we're doing something that takes 10-100 ms
            time.sleep(random.randint(10, 100) / 1000.0)
            print "task", item, "finished"
#
# try it
queue = Queue.Queue(0)
for i in range(WORKERS):
    Worker(queue).start() # start a worker
for i in range(10):
    queue.put(i)
for i in range(WORKERS):
    queue.put(None) # add end-of-queue markers

参考页面:

Queue PyMoTW

librarybook

Python Cookbook

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注