Skip to main content

8.5. heapq —堆队列算法

源代码: Lib/heapq.py


该模块提供了堆队列算法的实现,也称为优先级队列算法。

堆是二叉树,每个父节点的值小于或等于其任何子节点。这个实现使用数组,其中 heap[k] <= heap[2*k+1]heap[k] <= heap[2*k+2] 用于所有 k,从零计数元素。为了比较起见,不存在的元素被认为是无限的。堆的有趣的属性是它的最小的元素总是根,heap[0]

下面的API与教科书堆算法在两个方面不同:(a)我们使用基于零的索引。这使得节点的索引和其子节点的索引之间的关系稍微不那么明显,但是更合适,因为Python使用基于零的索引。 (b)我们的pop方法返回最小的项,而不是最大的(在教科书中称为“最小堆”;因为它适合于就地排序,所以“最大堆”在文本中更常见)。

这两个使得有可能作为一个普通的Python列表查看堆没有惊喜:heap[0] 是最小的项目,heap.sort() 维护堆不变!

要创建堆,请使用初始化为 [] 的列表,或者可以通过函数 heapify() 将填充列表转换为堆。

提供以下功能:

heapq.heappush(heap, item)

将值 item 推送到 heap,保持堆不变。

heapq.heappop(heap)

heap 弹出并返回最小的项,保持堆不变。如果堆是空的,则 IndexError 被引发。要访问最小的项而不弹出它,请使用 heap[0]

heapq.heappushpop(heap, item)

在堆上推送 item,然后弹出并返回 heap 中的最小项。组合操作比 heappush() 更有效地运行,然后单独调用 heappop()

heapq.heapify(x)

将列表 x 转换为堆,就地,在线性时间。

heapq.heapreplace(heap, item)

heap 弹出并返回最小的项目,并推送新的 item。堆大小不变。如果堆是空的,则 IndexError 被引发。

这一步操作比 heappop() 跟随 heappush() 更有效,并且在使用固定大小的堆时可能更合适。 pop/push组合总是从堆中返回一个元素,并用 item 替换它。

返回的值可能大于添加的 item。如果不需要,请考虑使用 heappushpop()。它的push/pop组合返回两个值中的较小值,在堆上留下较大的值。

该模块还提供了基于堆的三个通用功能。

heapq.merge(*iterables, key=None, reverse=False)

将多个排序的输入合并到单个排序的输出(例如,合并来自多个日志文件的带时间戳的条目)。返回 iterator 超过排序值。

sorted(itertools.chain(*iterables)) 类似,但返回一个可迭代,不会一次将数据拉入内存,并假定每个输入流已经排序(从最小到最大)。

有两个可选参数,必须指定为关键字参数。

key 指定一个参数的 key function,用于从每个输入元素提取比较键。默认值为 None (直接比较元素)。

reverse 是一个布尔值。如果设置为 True,则输入元素将合并,就好像每个比较都相反。

在 3.5 版更改: 添加了可选的 keyreverse 参数。

heapq.nlargest(n, iterable, key=None)

iterable 定义的数据集中返回包含 n 最大元素的列表。 key (如果提供)指定一个参数的函数,用于从迭代中的每个元素提取比较键:key=str.lower 等效于:sorted(iterable, key=key, reverse=True)[:n]

heapq.nsmallest(n, iterable, key=None)

iterable 定义的数据集中返回包含 n 最小元素的列表。 key (如果提供)指定一个参数的函数,用于从迭代中的每个元素提取比较键:key=str.lower 等效于:sorted(iterable, key=key)[:n]

后两个函数对于较小的 n 值执行得最好。对于较大的值,使用 sorted() 函数更有效。此外,当 n==1 时,使用内置的 min()max() 函数更高效。如果需要重复使用这些函数,请考虑将iterable转换为实际堆。

8.5.1. 基本示例

可以通过将所有值推送到堆上,然后一次一个地弹出最小值来实现:

>>> def heapsort(iterable):
...     h = []
...     for value in iterable:
...         heappush(h, value)
...     return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

这与 sorted(iterable) 类似,但与 sorted() 不同,此实现不稳定。

堆元素可以是元组。这对于在跟踪的主记录旁边分配比较值(例如任务优先级)很有用:

>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')

8.5.2. 优先级队列实现说明

优先级队列 是堆的常见用法,它提出了几个实现挑战:

  • 排序稳定性:如何使两个具有相同优先级的任务按照它们最初添加的顺序返回?

  • 如果优先级相等并且任务没有默认比较顺序,则(优先级,任务)对的元组比较中断。

  • 如果任务的优先级改变,你如何将它移动到堆中的新位置?

  • 或者,如果一个挂起的任务需要删除,你如何找到它并将其从队列中删除?

前两个挑战的解决方案是将条目存储为包括优先级,条目计数和任务的3元素列表。条目计数用作tie-breaker,以便按照添加的顺序返回具有相同优先级的两个任务。由于没有两个条目计数是相同的,元组比较永远不会尝试直接比较两个任务。

其余的挑战包括找到一个待完成的任务,并改变其优先级或完全删除它。可以使用指向队列中的条目的字典来完成查找任务。

删除条目或更改其优先级更加困难,因为它会破坏堆结构不变量。因此,一个可能的解决方案是将该条目标记为已删除,并添加具有已修改优先级的新条目:

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

8.5.3. 理论

堆是所有 ka[k] <= a[2*k+1]a[k] <= a[2*k+2] 的数组,从0开始计数元素。为了比较起见,不存在的元素被认为是无限的。堆的有趣的属性是 a[0] 总是其最小的元素。

上面的奇怪的不变式意味着是一个有效的内存表示为比赛。下面的数字是 k,而不是 a[k]:

                               0

              1                                 2

      3               4                5               6

  7       8       9       10      11      12      13      14

15 16   17 18   19 20   21 22   23 24   25 26   27 28   29 30

在上面的树中,每个单元格 k 是顶部 2*k+12*k+2。在一个通常的二元比赛,我们看到在体育,每个单元格是两个单元格的冠军,它的顶部,我们可以跟踪赢家在树上查看所有的对手/他有。然而,在这样的比赛的许多计算机应用中,我们不需要追踪获胜者的历史。为了提高内存效率,当优胜者被提升时,我们尝试用更低级别的其他东西取代它,规则变成一个单元格,并且其顶部的两个单元格包含三个不同的项目,但是顶部单元格“胜利”在两个顶端的细胞。

如果这个堆不变量在任何时候都受到保护,索引0显然是整体的赢家。删除它并找到“下一个”赢家的最简单的算法方法是将一些失败者(让我们说上面的图中的单元格30)移动到0位置,然后沿着树向下渗透这个新的0,交换值,直到不变量重新建立。这显然是对树中项目总数的对数。通过遍历所有项,你得到一个O(n log n)排序。

这种类型的一个好处是,您可以在排序进行时有效地插入新项目,前提是插入的项目不比您提取的最后第0个元素“更好”。这在模拟上下文中特别有用,其中树保存所有传入事件,并且“赢”条件意味着最小的调度时间。当事件计划其他事件以供执行时,它们将被调度到将来,以便他们可以轻松地进入堆。所以,堆是一个很好的结构实现调度器(这是我用于我的MIDI音序器:-)。

已经广泛地研究了用于实现调度器的各种结构,并且堆是有益的,因为它们合理地快,速度几乎恒定,并且最坏的情况与平均情况没有太多不同。然而,有其他表示,整体更有效,但最坏的情况可能是可怕的。

堆在大磁盘排序中也非常有用。你很可能都知道一个大排序意味着生成“运行”(这是预排序的序列,其大小通常与CPU内存量相关),然后是这些运行的合并传递,合并通常非常聪明组织 [1]。非常重要的是,初始排序产生尽可能长的运行。比赛是一个很好的方式来实现。如果使用所有可用的内存来举办锦标赛,你可以替换和渗透那些适合当前运行的项目,你会产生两倍于随机输入内存大小的运行,并且更好的输入模糊排序。

此外,如果你在磁盘上输出第0项,并得到一个可能不适合当前比赛的输入(因为值“胜过”最后一个输出值),它不能放在堆中,所以大小堆减少。释放的内存可以立即被巧妙地重用,用于逐步构建第二个堆,第二个堆以与第一个堆融化的速率完全相同的速率增长。当第一个堆完全消失时,您切换堆并开始新的运行。聪明而相当有效!

总之,堆是有用的内存结构要知道。我在一些应用程序中使用它们,我认为保持一个’heap’模块是很好的。 :-)

脚注

[1]

当今的磁盘平衡算法比现在更烦人,这是磁盘搜索能力的结果。在不能寻找的设备,如大磁带驱动器,故事是完全不同的,一个人必须非常聪明,以确保(提前),每个磁带运动将是最有效的可能(也就是说,最好参与“进行“合并)。一些磁带甚至能够向后读,这也用于避免重绕时间。相信我,真正好的磁带类别是相当壮观的观看!从任何时候,排序一直是一个伟大的艺术! :-)