引用
译者注:想要程序的性能得到提升吗?尝试下通过找到慢的东西,然后用更快的东西来代替它。作者以寻找网状图的直径为例,仅仅七步性能就提升了100倍,值得学习。
我有一种热衷于让事情变得更快的强迫症。这里有一个故事,那就是利用编译器中的神奇工具,把GO语言的代码运行速度提高100倍。这并不是让代码变得怪异和不可维护来达到目的,而是通过找到慢的东西,然后用更快的东西来代替它。
为了进行这个练习,我想试着确定一个图形的
“直径”是很有趣的。这与工作中所做的事情很相似(比如在ravelin.com网站上用
图表来捕捉坏人),而且要花很长时间才能计算出来。
这里的直径是图中所有点对间最短路径长度的最大值。图中的任意两点之间有一条最短路径。如果一对点的最短路径要比其他所有点对长,那么这条路径的长度就是直径。
首先每个节点都由一个字符串来标识。
type nodeId string
每个节点都有许多与之直接相连的相邻节点。当把这些放在map集合中,就可以很容易地添加相邻的节点,而不会有重复的风险。
type node struct {
id nodeId
adj map[nodeId]*node
}
通过id找到的所有的节点(至少当从输入列表的边缘构建结构时)使用map集合保存。
type nodes map[nodeId]*node
为了找到两个节点之间的最短路径,可以使用
广度优先搜索(BFS)算法。现在,从任何节点开始的BFS将找到从该节点到其他所有节点的最短路径,因此只需要在每个节点上运行BFS,就能找到它的直径。下面是执行此操作的代码。当访问每个节点时,关键元素在某个地方进行跟踪,以获取已经完成了多少步骤,以及接下来要考虑的节点队列。
// diameter is the maximum length of a shortest path in the network
func (nodes nodes) diameter() int {
var diameter int
for id := range nodes {
df := nodes.longestShortestPath(id)
if df > diameter {
diameter = df
}
}
return diameter
}
type bfsNode struct {
// bfs tracking data
parent *node
depth int
}
func (nodes nodes) longestShortestPath(start nodeId) int {
q := list.New()
bfsData := make(map[nodeId]bfsNode, len(nodes))
n := nodes.get(start)
bfsData[n.id] = bfsNode{parent: n, depth: 0}
q.PushBack(n)
for {
elt := q.Front()
if elt == nil {
break
}
n = q.Remove(elt).(*node)
for id, m := range n.adj {
bm := bfsData[id]
if bm.parent == nil {
bfsData[id] = bfsNode{parent: n, depth: bfsData[n.id].depth + 1}
q.PushBack(m)
}
}
}
return bfsData[n.id].depth
}
第一个版本的这段代码在这里:
https://github.com/philpearl/graphblog/commit/f4742fb1c65a896562052990780fe27b9ce85e3f
如果为一个有9997条边的网状图运行一个基准调用diameter(),将得到以下结果。
BenchmarkDiameter/diameter-8 1 38108360293 ns/op 9170172832 B/op 82451888 allocs/op
1次迭代需要38s,分配8200万个对象。所以要让它更快一点是值得考虑的。因此使用cpu分析(go test -bench . -cpuprofile cpu.prof),然后运行profiler来创建a.svg显示时间的去向(go tool pprof -svg graphblog.test cpu.prof > cpu1.svg)。这里有一个有趣的部分。
许多CPU使用都是在longestShortestPath()的map中分配和迭代的。也有一部分时间消耗在将数据项插入到list。如果仔细查看这个配置文件,会看到其实更多的时间是花在垃圾回收上。
因此,如果能够删除或改进map和list,或者减少消耗,那就最好了。
对于改进的第一个设想是将字符串节点id转换为int32。int32应该更小,更容易散列和比较,因此可能会使map访问更快。这里保留了一个“符号表”,从字符串节点名映射到一个int32。第一个ID为0,并将为每个新节点增加一个ID。这是符号表。
type nodeName string
type symbolTable map[nodeName]nodeId
func (s symbolTable) getId(name nodeName) nodeId {
id, ok := s[name]
if !ok {
id = nodeId(len(s))
s[name] = id
}
return id
}
然后需要将节点和symbolTable封装成一个graph结构体。
type graph struct {
symbolTable
nodes
}
func New() *graph {
return &graph{
symbolTable: make(symbolTable),
nodes: make(nodes),
}
}
func (g *graph) addEdge(a, b nodeName) {
aid := g.symbolTable.getId(a)
bid := g.symbolTable.getId(b)
g.nodes.addEdge(aid, bid)
}
这是有对一系列改变的完整
提交。如果再次运行基准测试,我看到运行时间大约是30s,整整减少8s。
BenchmarkDiameter/diameter-8 1 29804044414 ns/op 7311829424 B/op 82451563 allocs/op
如果查看
CPU的profile,会发现并没有什么需要改变,map仍然是问题的主要部分。但现在nodeid是从0开始的连续整数。可以用切片来保存节点吗?nodeId作为slice中的索引,查找节点数据会非常快。如果有一个固定数目的节点(或一个合理的上界),这是一个可以想象的简单改变。可以预先分配适当大小的切片,然后只需要
改变一些类型。
type nodes []node
func (nl nodes) init() {
for i := range nl {
nl[i].id = nodeId(i)
nl[i].adj = make(map[nodeId]*node)
}
}
func (nl nodes) get(id nodeId) *node {
return &nl[id]
}
测试结果再次提高,时间从原来的38s降到14s。
BenchmarkDiameter/diameter-8 1 13927658892 ns/op 5563335728 B/op 81779354 allocs/op
这一次性能分析数据
发生了改变。
现在存在巨大的时间消耗有
- 在longestShortestPath中推送list时所做的分配
- 分配用于保存每个BFS运行状态信息的切片
- 还有一些与相邻节点使用的map相关的消耗
现在把最大的目标放在首位,然后把它替换掉。
下面将列出可以重用的从一个BFS运行到另一个BFS的list。该list需要支持从开始的节点出发并将节点保留到最后。它只需要知道如何存储指向节点结构的指针。如果它能重新使用list元素,从而减少分配的数量,那就更好了,所以对此添加了一个内部的免费元素list。
由于这个list比container/list更简单,所以在可能的情况下重新使用它的元素,并且不使用接口。甚至在BFS运行之间
尝试重用它,在此之前可能是一个更快的替换。
package graphblog
type listElt struct {
next *listElt
node *node
}
type list struct {
head *listElt
tail *listElt
free *listElt
}
func (l *list) getHead() *node {
elt := l.head
if elt == nil {
return nil
}
// Remove elt from the list
l.head = elt.next
if l.head == nil {
l.tail = nil
}
// Add elt to the free list
elt.next = l.free
l.free = elt
n := elt.node
elt.node = nil
return n
}
func (l *list) pushBack(n *node) {
// Get a free listElt to use to point to this node
elt := l.free
if elt == nil {
elt = &listElt{}
} else {
l.free = elt.next
elt.next = nil
}
// Add the element to the tail of the list
elt.node = n
if l.tail == nil {
l.tail = elt
l.head = elt
} else {
l.tail.next = elt
l.tail = elt
}
}
测试结果确实有所改善,从14s下降到了9.5s。
BenchmarkDiameter/diameter-8 1 9453521303 ns/op 1762047056 B/op 7737600 allocs/op
这个概要数据显示仍然是GC在浪费时间。这可能发生在后台CPU上,因此对计算直径所花费的时间几乎没有影响。但排序将降低这段代码的总体影响。
我们可以简单地
在diameter()中分配该list,并将其作为参数传递给longestShortestPath()。这不会对整体的时间造成很大的影响,但会极大地减少分配。
BenchmarkDiameter/diameter-8 1 9211307204 ns/op 1638428512 B/op 11301 allocs/op
现在map迭代是消耗
时间最多的,下面用切片替换相邻的节点map。我们不期望每个节点都有上百个相邻节点,因此就算不能比以前更快,对重复的线性搜索也至少和使用map一样快。而且只需要相邻的节点id,而不是指向节点的指针,所以在同一时间改变它,就能把时间降到了8s以下。
BenchmarkDiameter/diameter-8 1 7815493067 ns/op 1638427296 B/op 11273 allocs/op
下一个
改进的备选内容是针对longestShortestPath()中分配的状态数据,以跟踪第一次搜索的进度。可以重用它吗?可以的,通过将内存分配给diameter()而不是longestShortestPath()来重新使用内存,但是需要重置运行之间的内容,因此从表面上看可能正在做更多的工作,但
不妨一试。
事实证明,这是一个巨大的胜利,因为现在降到了1.8s.
BenchmarkDiameter/diameter-8 1 1840585891 ns/op 185584 B/op 1233 allocs/op
现在这个profile.svg显示的时间并不多,这些成果归功于编写的代码。
大多数时间消耗在longestShortestPath()中。如果仔细看看就会发并没有真正地在bfsNode结构中使用parent字段。只是用它来表示在搜索之前是否访问过这个节点。可以尝试
删除该字段,并使用-1的深度来表示节点没有被检查。这会将时间消耗降到1.5s(注意我在基准命令行中添加了-benchtime 10s)。
BenchmarkDiameter/diameter-8 10 1543700500 ns/op 101552 B/op 1229 allocs/op
如果假设直径不大于32000,那么可以用一个int16替换int深度。这减少了需要更新的内存数量,并将时间减少到1.4s以下。
BenchmarkDiameter/diameter-8 10 1389635165 ns/op 40112 B/op 1229 allocs/op
现在已经实现想法的关键点,最后可以做的一件事是让代码
更容易运行。现在是从一个没有变化的graph的每个结点上运行BFS。如果将这些节点拆分,那么就可以在不同的CPU核心上运行来自不同节点的BFS,这将会让执行时间从原来的38秒降到0.3s。
BenchmarkDiameter/diameter-8 5 303704709 ns/op 309097 B/op 8969 allocs/op
所有这些代码都可以在
https://github.com/philpearl/graphblog上看到,过程中的每一个阶段作为一次单独的提交,并在最后的提交中添加profiling.svgs文件。
如果你已经做到了这一点,那么恭喜你!如果你也喜欢这篇文章,请点击推荐中心,我的心情会因此像沐浴在阳光中。
如果还有最后一项改进可以将运行时间减半,希望能在评论区看到这一项改进的内容。