go ants pool 协程池学习笔记

作者:用户722786812344日期:2025/11/24

概述

使用 Go 开发并发程序很容易,一个 go 关键字就可以启动协程处理任务。Go 创建一个 goroutine 只需要 2K 内存空间,并且 go 协程上下文信息仅存储在两个寄存器中,对于 Go 运行时来说,切换上下文特别快。

不过凡事不加限制就会出问题,如果不加节制的滥用 goroutine 就可能导致内存泄露,增加 Go 运行时调度负担等。

下面看一段 Go http 标准库的代码:

1func (s *Server) Serve(l net.Listener) error {
2	for {
3	   ...
4	   c := s.newConn(rw)
5	   go c.serve(connCtx)  
6	}  
7}
8

当来一个 http 请求,http 会启动一个协程 Server.serve() 处理该请求。
这种方式对于请求的响应很快,但是如果有恶意大规模并发请求,就可能导致后端服务内存爆增,响应慢等问题。

测试千万并发的内存使用情况如下:

1const n = 10000000
2    
3func demoFunc() {  
4    time.Sleep(time.Duration(BenchParam) * time.Millisecond)  
5}
6
7func TestNoPool(t *testing.T) {  
8    var start, end runtime.MemStats  
9    runtime.ReadMemStats(&start)  
10    var wg sync.WaitGroup  
11    for i := 0; i < n; i++ {  
12       wg.Add(1)  
13       go func() {  
14          demoFunc()  
15          wg.Done()  
16       }()  
17    }  
18    wg.Wait()  
19    runtime.ReadMemStats(&end)  
20    usageMem := end.TotalAlloc/MiB - start.TotalAlloc/MiB  
21    t.Logf("memory usage:%d MB", usageMem)  
22}
23

测试结果:

1 go test -v -bench=. -benchmem -run=TestNoPool
2=== RUN   TestNoPool
3    ants_test.go:41: memory usage:1304 M
4--- PASS: TestNoPool (5.70s)
5

当并发量到千万时,内存使用率达到了 1 个多 G,看起来好像不多。不过可别忘了,这里处理的任务只是 time.Sleep,如果每个协程处理内存型或 IO 型任务,那该是多大内存或者 CPU 的消耗,并且调度如此多的协程也会让 Go 运行时(调度器,GC)的压力非常大。

因此,需要协程池来限制协程的使用,起到稳定内存使用率,减轻 Go 运行时负担的目的。

协程池

既然协程池的主要任务是限制协程的数量,那么作为池应该有几个需求是要实现的:

  • 协程池生命周期管理,包括创建,释放等操作;
  • 协程池的扩缩容,按需使用;
  • 协程池中过期协程的清理;

ants 是一个满足上述需求的高性能协程池。本文重点学习 ants 的源码了解协程池。

ants 协程池

结构

首先看代码的层次结构,作为一个协程池库,ants 很扁平,只有一层:

1tree
2.
3├── CODE_OF_CONDUCT.md
4├── CONTRIBUTING.md
5├── LICENSE
6├── README.md
7├── README_ZH.md
8├── ants.go
9├── ants_benchmark_test.go
10├── ants_test.go
11├── example_test.go
12├── go.mod
13├── go.sum
14├── multipool.go
15├── multipool_func.go
16├── multipool_func_generic.go
17├── options.go
18├── pkg
19   └── sync
20       ├── spinlock.go
21       ├── spinlock_test.go
22       └── sync.go
23├── pool.go
24├── pool_func.go
25├── pool_func_generic.go
26├── worker.go
27├── worker_func.go
28├── worker_func_generic.go
29├── worker_loop_queue.go
30├── worker_loop_queue_test.go
31├── worker_queue.go
32├── worker_stack.go
33└── worker_stack_test.go
34

从结构可以看出,主要分为三类:

  • pool:实现了协程池,是暴露给用户调用的对象;
  • workqueue:工作队列,负责装载工作协程,其接口实现是 worker_stackworker_loop_queue 对象;
  • worker:工作协程,是实际执行任务的协程;

这个结构还是很清晰的,就不画 UML 图表示了。

流程

设计

在看流程图之前,先自己想一下,如果让我设计协程池该如何做呢?

平时开发用的比较多的是类似这样的写法:

1for i := 0; i < workerNumber; i++ {
2	go worker()
3}
4
5func worker() {
6	for task := range taskC {
7		doTask()
8	}
9}
10

程序运行时启动 workerNumber 个协程监听同一个通道 taskC,如果 taskCtask,多个协程 worker 会抢 task 并调用 doTask() 处理任务。

这种写法的好处在于简单,缺点在于协程数固定,不能按需灵活使用,而且在程序启动时,workerNumber 个协程就开始运行,如果没有 task 会造成资源浪费。

这种写法对于数量少的协程还行,如果并发过大则需要上协程池了。

使用协程池,还是让 worker 去监听 taskC,不同的是每个 worker 都有自己的 taskC,这样设计的好处是协程 worker 不会打架。

让每个 worker 监听自己的通道,那谁来管理这些 worker 呢?我们可以将这些 worker 存入容器(堆或栈)中。让容器对象来管理 worker,包括插入 worker 到容器,弹出 worker 出容器等。

现在 worker 在容器中了,那什么时候该插入 worker 到容器,什么时候该弹出 worker 呢?

我们想每个 worker 都监听自己的通道,如果 worker 执行完任务就可以插入 worker 到容器,插入 worker 到容器的意思是,该 worker 可用,可以被拿出来执行任务。
这是插入逻辑,对于弹出,如果生产者需要往 worker 的通道中写任务,发现容器中有一个 worker 可用,就可以从容器中弹出 worker 给生产者使用。弹出的意思是,该 worker 我用了,别人不能在用了。

那么,谁是生产者呢?
生产者是客户端调用 pool 方法将任务写入 pool 对象,pool 对象从容器中获取可用的 worker,并往该 worker 的通道写入任务。

至此,主要的逻辑都走通了。

还有一点是容器该用栈还是用队列呢?
worker 写入容器后,如果长时间不用的话需要清理,清理需要按照时间排序。队列是符合这种需求的数据结构(按时间顺序,先进先出)。

流程图

ants 的流程图如下:

图片来源于 Github ants

结合图片看源码应该会很清晰,这里不在赘述了。

ants 源码实现

光了解流程还不够,ants 能在众多协程池中脱颖而出,肯定有它的过人之处。下面从细节出发看源码中有哪些过人之处,毕竟细节是魔鬼。

sync.Pool 对象复用

作为协程池,需要频繁创建 worker 对象。ants 使用 sync.Pool 作为对象缓存,复用 worker 对象,降低频繁创建销毁对象导致的内存开销。

1type poolCommon struct {
2	workerCache sync.Pool
3}
4
5func NewPool(size int, options ...Option) (*Pool, error) {  
6    ...
7    
8    // sync.Pool 创建对象
9    pool.workerCache.New = func() any {  
10       return &goWorker{  
11          pool: pool,  
12          task: make(chan func(), workerChanCap),  
13       }  
14    }  
15  
16    return pool, nil  
17}
18
19func (p *poolCommon) retrieveWorker() (w worker, err error) {
20	...
21	if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {  
22    p.lock.Unlock()  
23    //  sync.Pool 中取 worker 对象
24    w = p.workerCache.Get().(worker)  
25    w.run()  
26    return  
27}
28

sync.spinLock

ants 实现了指数退避锁,如下:

1type spinLock uint32  
2  
3const maxBackoff = 16  
4  
5func (sl *spinLock) Lock() {  
6    backoff := 1  
7    for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {  
8       // Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.  
9       for i := 0; i < backoff; i++ {  
10          runtime.Gosched(https://www.jiwenlaw.com/)  
11       }  
12       if backoff < maxBackoff {  
13          backoff <<= 1  
14       }  
15    }  
16}
17

作为高并发协程池,多个协程抢锁是常态。如果协程长期占有锁,其它协程一直在抢锁会造成资源浪费。ants 使用指数退避算法抢锁减少长时间占有锁的资源抢占。

并且,当抢不到锁时 ants 使用 runtime.Gosched 出让 CPU,而不是空转等待锁(空转协程会一直占用 CPU)。主动出让,而不是被动等待运行时请出 CPU 更加合理且快速,也让其它协程更快的执行抢占逻辑,雨露均沾。

执行基准测试看几种类型锁的执行性能:

1func BenchmarkMutex(b *testing.B) {  
2    m := sync.Mutex{}  
3    b.RunParallel(func(pb *testing.PB) {  
4       for pb.Next() {  
5          m.Lock()  
6          //nolint:staticcheck  
7          m.Unlock()  
8       }  
9    })  
10}  
11  
12type spinLock uint32  
13  
14func (s *spinLock) Lock() {  
15    for !atomic.CompareAndSwapUint32((*uint32)(s), 0, 1) {  
16    }  
17}  
18  
19func (s *spinLock) Unlock() {  
20    atomic.StoreUint32((*uint32)(s), 0)  
21}  
22  
23func newSpinLock() *spinLock {  
24    return (*spinLock)(new(uint32))  
25}  
26  
27func BenchmarkSpinLock(b *testing.B) {  
28    m := newSpinLock()  
29    b.RunParallel(func(pb *testing.PB) {  
30       for pb.Next() {  
31          m.Lock()  
32          //nolint:staticcheck  
33          m.Unlock()  
34       }  
35    })  
36}  
37  
38type backoffSpinLock uint32  
39  
40var backoff = 16  
41  
42func (s *backoffSpinLock) Lock() {  
43    i := 1  
44    for !atomic.CompareAndSwapUint32((*uint32)(s), 0, 1) {  
45       if i == backoff {  
46          continue  
47       }  
48       i <<= 1  
49    }  
50}  
51  
52func (s *backoffSpinLock) Unlock() {  
53    atomic.StoreUint32((*uint32)(s), 0)  
54}  
55  
56func newBackoffSpinLock() *backoffSpinLock {  
57    return (*backoffSpinLock)(new(uint32))  
58}  
59  
60func BenchmarkBackoffSpinLock(b *testing.B) {  
61    m := newBackoffSpinLock()  
62    b.RunParallel(func(pb *testing.PB) {  
63       for pb.Next() {  
64          m.Lock()  
65          //nolint:staticcheck  
66          m.Unlock()  
67       }  
68    })  
69}  
70  
71type spinSchedLock uint32  
72  
73func (s *spinSchedLock) Lock() {  
74    for !atomic.CompareAndSwapUint32((*uint32)(s), 0, 1) {  
75       runtime.Gosched()  
76    }  
77}  
78  
79func (s *spinSchedLock) Unlock() {  
80    atomic.StoreUint32((*uint32)(s), 0)  
81}  
82  
83func newSpinSchedLock() *spinSchedLock {  
84    return (*spinSchedLock)(new(uint32))  
85}  
86  
87func BenchmarkSpinSchedLock(b *testing.B) {  
88    m := newSpinSchedLock()  
89    b.RunParallel(func(pb *testing.PB) {  
90       for pb.Next() {  
91          m.Lock()  
92          //nolint:staticcheck  
93          m.Unlock()  
94       }  
95    })  
96}  
97  
98type backoffSpinSchedLock uint32  
99  
100var maxBackoff = 16  
101  
102func (s *backoffSpinSchedLock)https://www.jiwenlaw.com/ Lock() {  
103    backoff := 1  
104    for !atomic.CompareAndSwapUint32((*uint32)(s), 0, 1) {  
105       for i := 0; i < backoff; i++ {  
106          runtime.Gosched()  
107       }  
108       if backoff < maxBackoff {  
109          backoff <<= 1  
110       }  
111    }  
112}  
113  
114func (s *backoffSpinSchedLock) Unlock() {  
115    atomic.StoreUint32((*uint32)(s), 0)  
116}  
117  
118func newBackoffSpinSchedLock() *backoffSpinSchedLock {  
119    return (*backoffSpinSchedLock)(new(uint32))  
120}  
121  
122func BenchmarkBackoffSpinSchedLock(b *testing.B) {  
123    m := newBackoffSpinSchedLock()  
124    b.RunParallel(func(pb *testing.PB) {  
125       for pb.Next() {  
126          m.Lock()  
127          //nolint:staticcheck  
128          m.Unlock()  
129       }  
130    })  
131}
132

测试结果:

1 go test -v -bench=. -benchmem -run=^$
2goos: darwin
3goarch: arm64
4pkg: github.com/xiahuyun/go-library/ants
5cpu: Apple M3
6BenchmarkMutex
7BenchmarkMutex-8                        17903019                69.40 ns/op            0 B/op          0 allocs/op
8BenchmarkSpinLock
9BenchmarkSpinLock-8                     15894223               130.3 ns/op             0 B/op          0 allocs/op
10BenchmarkBackoffSpinLock
11BenchmarkBackoffSpinLock-8              10448190               124.0 ns/op             0 B/op          0 allocs/op
12BenchmarkSpinSchedLock
13BenchmarkSpinSchedLock-8                339347601                3.026 ns/op           0 B/op          0 allocs/op
14BenchmarkBackoffSpinSchedLock
15BenchmarkBackoffSpinSchedLock-8         400970906                2.641 ns/op           0 B/op          0 allocs/op
16

可以看到在众多锁中 ants 的指数退避锁表现最好。

purge worker

ants 使用一个清理协程定时清理 workerQueue 中的协程(ants 通过二分查找确定哪些协程是过期需要清理的,非常高效):

ticktock

antsworker 在放入队列时需要记录自己放入队列的时间,purgeWorker 会根据该时间确定哪些 worker 需要清理。

如果每个 worker 调用 time.Now() 记录时间,会造成系统资源浪费,time.Now 会执行系统调用,涉及内核态/用户态的切换。

ants 使用 ticktock 协程周期性的调用 time.Now 生成时间戳并存到 pool 对象池中。worker 在记录时间放入队列时,只需要从对象池拿时间戳就行,显著避免系统资源消耗:

1func (p *poolCommon) ticktock() {  https://www.jiwenlaw.com/
2    ticker := time.NewTicker(nowTimeUpdateInterval)  
3    defer func() {  
4       ticker.Stop()  
5       atomic.StoreInt32(&p.ticktockDone, 1)  
6    }()  
7  
8    ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot()  
9    for {  
10       select {  
11       case <-ticktockCtx.Done():  
12          return  
13       case <-ticker.C:  
14       }  
15  
16       if p.IsClosed() {  
17          break  
18       }  
19  
20	   // 记录当前时间到 pool 对象池
21       p.now.Store(time.Now())  
22    }  
23}
24

总结

ants 是一个被广泛使用的高性能协程池,特别适合学习,了解协程池的实现。本文从整体到细节介绍 ants 的实现,希望对大家有所帮助,感谢。


go ants pool 协程池学习笔记》 是转载文章,点击查看原文


相关推荐


Cursor 2.1 版本更新日志
是魔丸啊2025/11/22

转载 发布日期: 2025年11月21日 改进的计划模式、编辑器中的 AI 代码审查,以及即时 Grep 主要功能 改进的计划模式 (Improved Plan Mode) 当创建计划时,Cursor 会通过澄清性问题来回应,以提高计划质量。Cursor 现在显示一个交互式 UI,让回答问题变得简单。 你也可以使用 ⌘+F 在生成的计划内进行搜索。 AI 代码审查 (AI Code Reviews) 你现在可以直接在 Cursor 中通过 AI 代码审查来发现和修复 bug。它会检查你的更改并


JMeter 自动化测试 + 飞书通知完整指南
兔子蟹子2025/11/20

JMeter 自动化测试 + 飞书通知完整指南 从零开始搭建 JMeter 自动化测试系统,并通过飞书机器人实时推送测试报告 📖 目录 项目简介系统架构环境准备配置参数说明实施步骤代码实现常见问题最佳实践 项目简介 背景 在持续集成/持续部署(CI/CD)流程中,自动化测试是保证代码质量的重要环节。本项目实现了: ✅ 自动化执行:批量运行 JMeter 测试脚本✅ 智能分析:自动统计成功率、响应时间、失败详情✅ 实时通知:测试完成后立即推送飞书消息✅ 可视化报告:美


Gemini 3深夜来袭:力压GPT 5.1,大模型谷歌时代来了
机器之心2025/11/19

Gemini 3 还没现身,推特先崩为敬。 没有哪家模型的发布比 Gemini 3 更万众瞩目,根据 Gemini 之前 3 个月更新一次的频率,AI 社区自 9 月起便对 Gemini 3 翘首以盼。 今天,谷歌开发者关系负责人、Google AI Studio 负责人一条仅含「Gemini」一词的推文,积蓄了数月的期待终于迎来了爆发点,推特相关话题瞬间沸腾。 有趣的是,临近发布节点,推特竟「应景」地崩了几次。尽管「幕后黑手」是 Cloudflare,但这崩溃的时机简直精准得让人怀疑有人背后


AI 为啥能回答你的问题?大模型 5 步工作流程,看完秒懂!
印刻君2025/11/18

如今,大语言模型(LLM)已成为我们学习和工作中的常用工具。当你在对话框输入问题时,或许会好奇,这些模型为何能精准、迅速地生成回答?本文将用通俗易懂的语言,为你拆解背后的核心工作流程。 简单来说,大模型处理问题主要包含五个关键环节,分别是: 分词(Tokenization) 词嵌入(Word Embedding) 位置编码(Positional Encoding) 自注意力机制(Self-Attention) 自回归生成(Autoregressive Generation) 这些专业名词虽然


整数序列权重排序——基于深度优先搜索(DFS)以及记忆化搜索
w24167178402025/11/16

提示:本文适合对算法设计感兴趣的道友一起互相学习,如有疑问,欢迎评论区或者私信讨论。 文章目录 前言 一、题目介绍 二、前置知识扩展 1.深度优先遍历 1.1递归DFS 1.1非递归DFS 2.@cache装饰器 3.range()函数 4.sorted()函数 三、解题思路及代码解读


前端开发小技巧-【JavaScript】- 获取元素距离 document 顶部的距离
禁止摆烂_才浅2025/11/15

获取元素距离 document 顶部的距离 方案1:使用 offsetTop(最简单) const element = document.getElementById('myDiv') const distance = element.offsetTop console.log(distance) // 500(像素) 方案2:使用 getBoundingClientRect() + scrollY(最准确) const element = document.getElementById(


稳定边界层高度参数化方案的回归建模
mayubins2025/11/14

稳定边界层高度参数化方案的回归建模 为了发展一个适用于CAS-ESM气候系统模式的稳定边界层高度参数化方案,本研究基于湍流尺度分析理论,采用多元线性回归方法,对Zilitinkevich类型公式中的经验系数进行确定性拟合。该公式综合考虑了地表机械强迫、热力强迫以及自由大气静力稳定度的综合影响。 理论框架 我们所采用的参数化公式源于稳定层结下湍流动能的平衡关系,其函数形式如下: 1/h² = C1 * (f² / τ) + C2 * (N |f| / τ) + C3 * (|f β F₊| / τ


Python 的内置函数 isinstance
IMPYLH2025/11/13

Python 内建函数列表 > Python 的内置函数 isinstance Python 的内置函数 isinstance() 用于判断一个对象是否属于某个类或类型,或者是否属于由这些类型组成的元组中的一个。它是 Python 中类型检查的重要工具,相比于 type() 函数具有更灵活的类型检查能力。 其语法为: isinstance(object, classinfo) 其中: object 是要检查的对象classinfo 可以是一个类型对象,或者由类型对象组成的元组 is


[免费]基于Python的农产品可视化系统(Django+echarts)【论文+源码+SQL脚本】
java1234_小锋2025/11/11

大家好,我是java1234_小锋老师,看到一个不错的基于Python的农产品可视化系统(Django+echarts)【论文+源码+SQL脚本】,分享下哈。 项目视频演示 https://www.bilibili.com/video/BV1mYkoBLEju/ 项目介绍 本研究提出了一种基于Python的农产品可视化系统,结合Django框架和ECharts库,旨在为农产品数据的展示和分析提供便捷、高效的解决方案。系统通过Django框架构建后端服务,使用ECharts实现前端数提供数


用 PyQt 开发一个桌面计算器:从零到完整实战指南
Python私教2025/11/9

作者:张大鹏 时间:2025-11-05 标签:Python、PyQt5、GUI、桌面开发、实战教程 一、前言 在桌面应用开发中,计算器 是一个非常适合入门的练手项目。 它涉及到图形界面设计、事件绑定、信号槽机制、布局管理等核心概念。 今天我们将使用 PyQt5(同样适用于 PyQt6)一步步实现一个可用的计算器程序,从 UI 布局到功能逻辑完整讲解。 最终效果如下👇: 支持加减乘除和小数运算;按钮布局整齐;可通过按钮或键盘输入操作;界面美观,可打包为独立应用。 二、项目环境 项目依赖

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2025 聚合阅读