给定需要在类似sql的数据库中更新数据的大片。一个SQL查询中的更新可能很长,并导致问题。
如何将这些数据拆分成多个批次,以便使用小查询进行并发/并行更新?例如,UPDATE tablename SET uid=12345 WHERE url IN (...)
至少举个小例子。。我已经打破了我的头(
type Task struct {
URL string
UID string
}
type Pool struct {
numWorkers int
tasks chan Task
done chan bool
}
func New(numWorkers int, buffer int) *Pool {
return &Pool{
numWorkers: numWorkers,
tasks: make(chan Task, buffer),
done: make(chan bool),
}
}
func (p *Pool) Run() {
wg := &sync.WaitGroup{}
for i := 0; i < p.numWorkers; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for {
select {
case task := <-p.tasks:
// execute
case <-p.done:
close(p.tasks)
return
}
}
}(i)
}
wg.Wait()
p.done <- true
}
func (p *Pool) AddTask(urls []string, uid string) {
for _, url := range urls {
p.tasks <- Task{URL: url, UID: uid}
}
}
1条答案
按热度按时间ergxz8rk1#
让我们来看看这一大块:
你开始一群工人。每一个都将运行,直到它们从
p.done
读取。在返回(defer
)之后,它们将通过等待组发送信号Done
。然后等工人回来他们回来后,你发信号
p.done
。这里有两个问题。
1.死锁。父goroutine无法完成
wg.Wait
,直到工作线程从p.done
读取。工作线程在调用wg.Done
之前不会读取p.done。1.作为一种向工作者发出完成信号的机制,您必须发送到
p.done
p.numWorkers
次。您根本不需要
p.done
。相反,使用close(p.tasks)
向worker发出通道输入完成的信号,然后使用wg.Wait
发出所有任务完成的信号。close
一旦你完成了排队工作…wg.Wait
以确保工人完成所有工作。