当前位置:首页 > 科技  > 软件

讲完Go并发控制,讲讲并发抑制

来源: 责编: 时间:2024-06-17 17:40:32 90观看
导读已知有一个函数search,能够按照关键词执行搜索,coSearch能够批量并发查询。让我们把目光定位到search上,search通过查询数据库或者调用其他api来完成搜索,这是一个相对耗时和消耗资源的操作。当多个相同的关键词并发查询(

已知有一个函数search,能够按照关键词执行搜索,coSearch能够批量并发查询。GAu28资讯网——每日最新资讯28at.com

让我们把目光定位到search上,search通过查询数据库或者调用其他api来完成搜索,这是一个相对耗时和消耗资源的操作。GAu28资讯网——每日最新资讯28at.com

当多个相同的关键词并发查询(调用search函数)时,我们希望只产生一次数据库调用(调用query),第一个查询未完成时后续的重复查询会等待,当第一个查询完成时则会与其他查询分享结果,这样一来虽然只执行了一次数据库调用但是所有查询都拿到了最终的结果。GAu28资讯网——每日最新资讯28at.com

图片图片GAu28资讯网——每日最新资讯28at.com

什么是并发抑制:GAu28资讯网——每日最新资讯28at.com

package mainimport ( "context" "fmt" "sync" "time" "golang.org/x/sync/errgroup")func query(ctx context.Context, word string) (string, error) { fmt.Println("searching: ", word) time.Sleep(5 * time.Second) return fmt.Sprintf("result: %s", word), nil // 模拟结果}// 实现search,在重复并发调用下仅执行一次query// 其他并发共享这次query的结果func search(ctx context.Context, word string) (string, error) {    return query(ctx, word)}func coSearch(ctx context.Context, words []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) results := make([]string, len(words)) for i, word := range words {  i, word := i, word  g.Go(func() error {   result, err := search(ctx, word)   if err != nil {    return err   }   results[i] = result   return nil  }) } err := g.Wait() return results, err}func main() { words := []string{"Go","Go", "Go", "Rust", "PHP", "JavaScript", "Java"} results, err := coSearch(context.Background(), words) if err != nil {  fmt.Println(err)  return } fmt.Println(results)}

好了,可以先暂停想想该如何实现search函数了。GAu28资讯网——每日最新资讯28at.com

一步一步实现并发抑制

我们先假设所有查询关键词都一样,那么问题简化成并发执行search时,只在第一次search时调用query,其他的search并发调用等待并共享这次的查询结果。GAu28资讯网——每日最新资讯28at.com

通过waiting变量,其他goroutine等待第一个goroutine数据库调用完成,那么如何让其他goroutine等待在这个位置呢?GAu28资讯网——每日最新资讯28at.com

func main() { words := []string{"Go", "Go", "Go", "Go", "Go"} results, err := coSearch(context.Background(), words) if err != nil {  fmt.Println(err)  return } fmt.Println(results)}var ( waiting bool resp    string err     error)func search(ctx context.Context, word string) (string, error) {  if waiting {    // 等待resp, err被赋值,即第一个query完成后再返回    // ...?      return resp, err  }  waiting = true  resp, err = query(ctx, word)  waiting = false  return resp, err}func query(ctx context.Context, word string) (string, error) { fmt.Println("searching: ", word) time.Sleep(5 * time.Second) return fmt.Sprintf("result: %s", word), nil // 模拟结果}

sync.WaitGroup{}并发控制

sync.WaitGroup{}是并发控制的核心,这里再次重申下用法:GAu28资讯网——每日最新资讯28at.com

  • 当新运行一个goroutine时,我们需要调用wg.Add(1)。
  • 当一个goroutine运行完成的时候,我们需要调用wg.Done()。
  • wg.Wait()让程序阻塞在此处,直到所有的goroutine运行完毕。

利用 sync.WaitGroup{}便可实现上文代码中等待的效果:GAu28资讯网——每日最新资讯28at.com

var ( wg      sync.WaitGroup waiting bool resp    string err     error)func search(ctx context.Context, word string) (string, error) { if waiting {  // 其他goroutine等待第一个goroutine执行完成  wg.Wait()  return resp, err } waiting = true     wg.Add(1) resp, err = query(ctx, word)    wg.Done() // 第一个goroutine执行完成     waiting = false return resp, err}

并发安全

当多个goroutine对同一个内存区域进行读写时,就会产生并发安全的问题,它会导致程序运行的结果不符合预期,而上文的程序并发的读写了waiting变量,需要给waiting变量加把锁。GAu28资讯网——每日最新资讯28at.com

释放锁的位置非常的有技巧,如果在在wg.Add(1)之前mu.Unlock(),可能 wg.Add(1)还未来得执行其他goroutine已经执行了wg.Wait(),并获取到了错误的数据。GAu28资讯网——每日最新资讯28at.com

unlock在add之前;GAu28资讯网——每日最新资讯28at.com

var (  wg      sync.WaitGroup  mu      sync.Mutex  waiting bool  resp    string  err     error)func search(ctx context.Context, word string) (string, error) { mu.Lock() if waiting {  mu.Unlock()  wg.Wait()  return resp, err } waiting = true     wg.Add(1)    // 在wg.Add(1)之后释放锁,保证其他goroutine被wg.Wait()阻塞 mu.Unlock() resp, err = query(ctx, word)    wg.Done() mu.Lock() waiting = false mu.Unlock()      return resp, err}

完整版本

现在可以针对不同的关键词做区分了,使用一个map来代替原有的waiting,并将每一个关键词查询的WaitGroup和结果打包到map的value中。GAu28资讯网——每日最新资讯28at.com

type call struct { wg   sync.WaitGroup resp string err  error}var (    mu sync.Mutex    m = make(map[string]*call))func search(ctx context.Context, word string) (string, error) { mu.Lock() if c, ok := m[word]; ok {  mu.Unlock()  c.wg.Wait()  return c.resp, c.err } c := &call{} m[word] = c c.wg.Add(1) // 在wg.Add(1)之后才释放锁,保证其他goroutine被wg.Wait()阻塞 mu.Unlock() c.resp, c.err = query(ctx, word) c.wg.Done() mu.Lock() delete(m, word) mu.Unlock() return c.resp, c.err}

开源库 golang.org/x/sync/singleflight

上面一步一步教大家手搓了一个并发抑制的逻辑,我们的基本逻辑和开源库golang.org/x/sync/singleflight没有区别,只是singleflight内部实现更加严谨GAu28资讯网——每日最新资讯28at.com

直接使用singleflight非常简单的就可以实现我们的诉求GAu28资讯网——每日最新资讯28at.com

  • singleflight.Group 创建一个需要并发控制的范围
  • Do函数

第一个参数接收一个key来判断否重复调用GAu28资讯网——每日最新资讯28at.com

第二个参数为要执行的函数,函数可以返回正常值或者errorGAu28资讯网——每日最新资讯28at.com

Do函数返回值除了闭包函数的返回值之外,还返回了此次返回值是否由其他goroutine共享GAu28资讯网——每日最新资讯28at.com

import ( "golang.org/x/sync/singleflight")var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) { resp, err, _ := g.Do(word, func() (interface{}, error) {  return query(ctx, word) }) return resp.(string), err}

错误处理

因为共享第一个goroutine的结果,因此如果第一次调用失败,那其他goroutine也都会失败GAu28资讯网——每日最新资讯28at.com

如果在某些场景下允许第一个调用失败后再次尝试调用该函数,那么可以通过调用Forget方法来忘记这个keyGAu28资讯网——每日最新资讯28at.com

var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) { resp, err, _ := g.Do(word, func() (interface{}, error) {  val, err := query(ctx, word)  // 当出错并且允许重试时  if err != nil && true {   g.Forget(word)   return "", err  }  return val, err }) return resp.(string), err}

超时控制

当使用Do函数时,如果query长时间未响应(这里假设qeury不具备超时能力),那么所有的goroutine都会被阻塞并等待,利用DoChan+select可以实现超时逻辑GAu28资讯网——每日最新资讯28at.com

var g = new(singleflight.Group)func search(ctx context.Context, word string) (string, error) {    ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel()     result := g.DoChan(word, func() (interface{}, error) {  return query(ctx, word) }) select { case r := <-result:  return r.Val.(string), r.Err case <-ctx.Done():  return "", ctx.Err() }}

使用场景

预防缓存穿透GAu28资讯网——每日最新资讯28at.com

在高并发的状态下,一般会给热点数据设置缓存。但数据第一次访问或者缓存失效的状态下,如果直接去查询数据库,会给数据库造成极大压力,甚至直接打爆数据库。GAu28资讯网——每日最新资讯28at.com

以上各种分享中被反复提到的场景,但!注意!使用singleflight就一劳永逸了么,不是的,在大规模集群下可能有数百台机器,当处在高并发状态时,即使每台机器只发起一个请求,也足以打爆你的数据库!结合实际,搭配适当的缓存策略、数据预热、限流等手段才能避免潜在的风险。挖个坑,以后有机会聊聊这些问题GAu28资讯网——每日最新资讯28at.com

总结

本篇作为一个例子,给你讲透典型的Go并发控制的姊妹篇,讲述了另外一种并发控制模型,并介绍了开源库golang.org/x/sync/singleflight。GAu28资讯网——每日最新资讯28at.com

当由一个goroutine并发向下发展成多个goroutine时,使用golang.org/x/sync/errgroupGAu28资讯网——每日最新资讯28at.com

当多个goroutine并发向下抑制成一个goroutine时,使用golang.org/x/sync/singleflightGAu28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-94296-0.html讲完Go并发控制,讲讲并发抑制

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 超火超实用的 10 个前端工具库,可能就是你一直在寻找的!

下一篇: 盘点历届 Java 语言的关键字,一定有你不认识的

标签:
  • 热门焦点
  • JavaScript 混淆及反混淆代码工具

    JavaScript 混淆及反混淆代码工具

    介绍在我们开始学习反混淆之前,我们首先要了解一下代码混淆。如果不了解代码是如何混淆的,我们可能无法成功对代码进行反混淆,尤其是使用自定义混淆器对其进行混淆时。什么是混
  • Rust中的高吞吐量流处理

    Rust中的高吞吐量流处理

    作者 | Noz编译 | 王瑞平本篇文章主要介绍了Rust中流处理的概念、方法和优化。作者不仅介绍了流处理的基本概念以及Rust中常用的流处理库,还使用这些库实现了一个流处理程序
  • 微信语音大揭秘:为什么禁止转发?

    微信语音大揭秘:为什么禁止转发?

    大家好,我是你们的小米。今天,我要和大家聊一个有趣的话题:为什么微信语音不可以转发?这是一个我们经常在日常使用中遇到的问题,也是一个让很多人好奇的问题。让我们一起来揭开这
  • 只需五步,使用start.spring.io快速入门Spring编程

    只需五步,使用start.spring.io快速入门Spring编程

    步骤1打开https://start.spring.io/,按照屏幕截图中的内容创建项目,添加 Spring Web 依赖项,并单击“生成”按钮下载 .zip 文件,为下一步做准备。请在进入步骤2之前进行解压。图
  • 一篇文章带你了解 CSS 属性选择器

    一篇文章带你了解 CSS 属性选择器

    属性选择器对带有指定属性的 HTML 元素设置样式。可以为拥有指定属性的 HTML 元素设置样式,而不仅限于 class 和 id 属性。一、了解属性选择器CSS属性选择器提供了一种简单而
  • 电视息屏休眠仍有网络上传 爱奇艺被质疑“薅消费者羊毛”

    电视息屏休眠仍有网络上传 爱奇艺被质疑“薅消费者羊毛”

    记者丨宁晓敏 见习生丨汗青出品丨鳌头财经(theSankei) 前不久,爱奇艺发布了一份亮眼的一季报,不仅营收和会员营收创造历史最佳表现,其运营利润也连续6个月实现增长。自去年年初
  • 中国家电海外掘金正当时|出海专题

    中国家电海外掘金正当时|出海专题

    作者|吴南南编辑|胡展嘉运营|陈佳慧出品|零态LT(ID:LingTai_LT)2023年,出海市场战况空前,中国创业者在海外纷纷摩拳擦掌,以期能够把中国的商业模式、创业理念、战略打法输出海外,他们依
  • 10天营收超1亿美元,《星铁》比《原神》差在哪?

    10天营收超1亿美元,《星铁》比《原神》差在哪?

    来源:伯虎财经作者:陈平安即便你没玩过《原神》,你一定听说过的它的大名。恨它的人把《原神》开服那天称作是中国游戏史上最黑暗的一天,有粉丝因为索尼在PS平台上线《原神》,怒而
  • iQOO Neo8 Pro评测:旗舰双芯加持 最强性能游戏旗舰

    iQOO Neo8 Pro评测:旗舰双芯加持 最强性能游戏旗舰

    【Techweb评测】去年10月,iQOO推出了一款Neo7手机,该机搭载了联发科天玑9000+,配备独显芯片Pro+,带来了同价位段最佳的游戏体验,一经上市便受到了诸多用
Top