0%

队列

今天来学习队列。

如何理解“队列”

先进者先出,就是典型的“队列”。基本操作包括:入队和出队。

队列跟栈一样,也是一种操作受限的线性表数据结构。

队列的应用也非常广泛,特别是一些具有某些额外特性的队列,比如循环队列、阻塞队列、并发队列。它们在很多偏底层系统、框架、中间件的开发中,起着关键性的作用。比如高性能队列DisruptorLinux环形缓存,都用到了循环并发队列;Java concurrent并发包利用ArrayBlockingQueue来实现公平锁等。

顺序队列和链式队列

跟栈一样,队列可以用数组来实现,也可以用链表来实现。用数组实现的栈叫做顺序栈,用链表实现的栈叫做链式栈。同样,用数组实现的队列叫做顺序队列,用链表实现的队列叫做链式队列

顺序队列实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class ArrayQueue:

def __init__(self, capacity: int):
self.items = []
self.capacity = capacity
self.head = 0
self.tail = 0

def enqueue(self, item: str) -> bool:
"""
入队
:return:
"""
if self.tail == self.capacity:
# 说明队列已满
if self.head == 0:
return False
# self.tail指向数据下一个index
# self.head=2 self.tail=7 执行5次数据搬移操作
for i in range(self.tail - self.head):
# 数据搬移操作
self.items[i] = self.items[self.head + i]
# 修改队首 队尾
self.tail = self.tail - self.head
self.head = 0

self.items.insert(self.tail, item)
self.tail += 1
return True

def dequeue(self):
"""
出队
:return:
"""
# 队列为空
if self.tail == self.head:
return None
item = self.items[self.head]
self.head += 1
return item

def __repr__(self):
return " ".join(item for item in self.items[self.head: self.tail])


if __name__ == '__main__':

q = ArrayQueue(7)
for num in range(1, 8):
q.enqueue(str(num))

print(q)
# self.head=0 self.tail=7

q.dequeue()
q.dequeue()
print(q)
# self.head=2 self.tail=7

q.enqueue(str(1))
q.enqueue(str(2))
print(q)

链式队列实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Node:

def __init__(self, data=None):
self.data = data
self.next = None


class LinkedQueue:

def __init__(self):
self.head = None
self.tail = None

def enqueue(self, item):
node = Node(item)
if self.tail:
self.tail.next = node
else:
self.head = node
self.tail = node

def dequeue(self):
if self.head:
item = self.head.data
# 后移head
self.head = self.head.next
# 只有一个元素时
if not self.head:
self.tail = None
return item

def __repr__(self):
values = []
current = self.head
while current:
values.append(current.data)
current = current.next
return '->'.join([value for value in values])


if __name__ == '__main__':
lq = LinkedQueue()
for i in range(1, 10):
lq.enqueue(str(i))

print(lq)

lq.dequeue()
lq.dequeue()
print(lq)

lq.enqueue(str(1))
lq.enqueue(str(2))
print(lq)

感受

多画图来分析

循环队列

确定好队空和队满的判定条件。

用数组实现的非循环队列中,队满的判断条件是tail == n,队空的判断条件是head == tail

用数组实现的循环队列中,队空的判断条件是head == tail队满的判断条件是(tail+1)%n=head

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from itertools import chain


class CircularQueue:

def __init__(self, capacity):
self.items = []
# 循环队列会浪费一个数组的存储空间
self.capacity = capacity + 1
self.head = 0
self.tail = 0

def enqueue(self, item):
if (self.tail + 1) % self.capacity == self.head:
return False
self.items.append(item)
self.tail = (self.tail + 1) % self.capacity
return True

def dequeue(self):
if self.tail == self.head:
return False
value = self.items[self.head]
self.head = (self.head+1) % self.capacity
return value

def __repr__(self):
if self.tail >= self.head:
return ' '.join(item for item in self.items[self.head: self.tail])
else:
return ' '.join(item for item in chain(self.items[self.head:], self.items[:self.tail]))


if __name__ == '__main__':
q = CircularQueue(5)
for i in range(5):
q.enqueue(str(i))
print(q) # 0 1 2 3 4
q.dequeue()
q.dequeue()
print(q) # 2 3 4
q.enqueue(str(5))
print(q) # 2 3 4 5
q.enqueue(str(5))
print(q) # 2 3 4 5 5 0

这是https://github.com/wangzheng0822/algo/blob/master/python/09_queue/circular_queue.py里面的代码,我在最后又加了一次入队enqueue(str(5)),发现结果不是自己预料的那样,输出2 3 4 5 5,而是输出了2 3 4 5 5 0

经过画图以及调试分析,发现直接原因是__repr__里面当self.tail < self.head时输出对象的问题。

根本原因是在初始化的时候,self.items = []以及enqueue()self.items.append(item)

正确代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class CircularQueue(object):

def __init__(self, capacity):
self.capacity = capacity + 1
# 初始化固定长度的列表
self.items = [None] * self.capacity
self.head = 0
self.tail = 0

def enqueue(self, item):
if (self.tail + 1) % self.capacity == self.head:
return False

self.items[self.tail] = item
self.tail = (self.tail + 1) % self.capacity
return True

def dequeue(self):
if self.tail == self.head:
return False

value = self.items[self.head]
self.head = (self.head + 1) % self.capacity
return value

def __repr__(self):
if self.tail >= self.head:
return ' '.join(item for item in self.items[self.head: self.tail])
return ' '.join(item for item in chain(self.items[self.head:], self.items[: self.tail]))


if __name__ == '__main__':
q = CircularQueue(5)
for i in range(5):
q.enqueue(str(i))
print(q) # 0 1 2 3 4
q.dequeue()
q.dequeue()
print(q) # 2 3 4
q.enqueue(str(5))
print(q) # 2 3 4 5
q.enqueue(str(6))
print(q) # 2 3 4 5 6
q.dequeue() # 3 4 5 6
q.enqueue(str(7))
print(q) # 3 4 5 6 7

阻塞队列和并发队列

阻塞队列

阻塞队列其实就是在队列基础上增加了阻塞操作。简单来说,就是在队列为空的时候,从队头取数据会被阻塞。因为此时还没有数据可取,直到队列中有了数据才能返回;如果队列已经满了,那么插入数据的操作就会被阻塞,直到队列中有空闲位置后再插入数据,然后再返回。

通过阻塞队列,可以实现一个“生产者-消费者模型”

并发队列

线程安全的队列称为并发队列。最简单的实现方式是直接在enqueue()dequeue()方法上加锁,但是锁粒度大并发度会比较低,同一时刻仅允许一个存或者取操作。实际上,基于数组的循环队列,利用CAS原子操作,可以实现非常高效的并发队列。这也是循环队列比链式队列应用更加广泛的原因。

解答开篇

线程池没有空闲线程时,新的任务请求线程资源时,线程池该如何处理?各种处理策略又是如何实现的?

  1. 非阻塞的处理方式,直接拒绝任务请求。
  2. 阻塞的处理方式,将请求排队,等到有空闲线程时,取出排队的请求继续处理。
    • 基于链表的实现方式,可以实现一个支持无限排队的无界队列,但是可能会导致过多的请求排队等待,请求处理的响应时间过长。所以,针对响应时间比较敏感的系统,基于链表实现的无限排队的线程池是不合适的。
    • 基于数组实现的有界队列,队列的大小有限,所以线程池中排队的请求超过队列大小时,接下来的请求就会被拒绝,这种方式对响应时间敏感的系统来说,就相对更加合理。不过,设置一个合理的队列大小,也是非常有讲究的。队列太大导致等待的请求太多,队列太小会导致无法充分利用系统资源,发挥最大性能。

实际上,对于大部分资源有限的场景,当没有空闲资源时,基本都可以通过“队列”这种数据结构来实现请求排队。

课后思考

  1. 还有哪些类似的池结构或者场景中会用到队列的排队请求?

    redis连接池;kafkaRabbitMQ等消息队列

  2. 如何实现无锁并发队列?

    可以使用CAS+数组的方式来实现

参考链接:

  1. Python实现循环队列(基于list)
您的支持将鼓励我继续创作!