Go:将数据拆分为多个批次,以便在DB中进行并发/并行更新

djmepvbi  于 2023-05-04  发布在  Go
关注(0)|答案(1)|浏览(171)

给定需要在类似sql的数据库中更新数据的大片。一个SQL查询中的更新可能很长,并导致问题。
如何将这些数据拆分成多个批次,以便使用小查询进行并发/并行更新?例如,UPDATE tablename SET uid=12345 WHERE url IN (...)
至少举个小例子。。我已经打破了我的头(

type Task struct {
    URL     string
    UID     string
}

type Pool struct {
    numWorkers  int
    tasks       chan Task
    done        chan bool
}

func New(numWorkers int, buffer int) *Pool {
    return &Pool{
        numWorkers: numWorkers,
        tasks:      make(chan Task, buffer),
        done:       make(chan bool),
    }
}

func (p *Pool) Run() {
    wg := &sync.WaitGroup{}
    for i := 0; i < p.numWorkers; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            for {
                select {
                case task := <-p.tasks:
                    // execute 
                case <-p.done:
                    close(p.tasks)
                    return
                }

            }
        }(i)
    }
    wg.Wait()
    p.done <- true
}

func (p *Pool) AddTask(urls []string, uid string) {
    for _, url := range urls {
        p.tasks <- Task{URL: url, UID: uid}
    }
}
ergxz8rk

ergxz8rk1#

让我们来看看这一大块:

wg := &sync.WaitGroup{}
    for i := 0; i < p.numWorkers; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            for {
                select {
                case task := <-p.tasks:
                    // execute 
                case <-p.done:
                    close(p.tasks)
                    return
                }

            }
        }(i)
    }
    wg.Wait()
    p.done <- true

你开始一群工人。每一个都将运行,直到它们从p.done读取。在返回(defer)之后,它们将通过等待组发送信号Done
然后等工人回来他们回来后,你发信号p.done
这里有两个问题。
1.死锁。父goroutine无法完成wg.Wait,直到工作线程从p.done读取。工作线程在调用wg.Done之前不会读取p.done。
1.作为一种向工作者发出完成信号的机制,您必须发送到p.donep.numWorkers次。
您根本不需要p.done。相反,使用close(p.tasks)向worker发出通道输入完成的信号,然后使用wg.Wait发出所有任务完成的信号。

go func() {
            defer wg.Done()
            for  task := range p.tasks {
               // execute 
            }
        }()

close一旦你完成了排队工作…

for _, url := range urls {
        p.tasks <- Task{URL: url, UID: uid}
    }
    close(p.tasks)

wg.Wait以确保工人完成所有工作。

wg.Wait()

相关问题