从输入通道正确批处理项目

nwnhqdif  于 2021-06-15  发布在  Mysql
关注(0)|答案(2)|浏览(317)

用例
我想在mysql数据库中保存大量数据,这些数据是通过通道接收的。出于性能原因,我将它们分为10个项目进行处理。我每3小时才收到一次输入项。
问题
假设我得到了10004个项目,那么剩下4个项目,因为我的go例程等待10个项目,然后才能成批“清除它们”。我想确保它创建的批处理少于10个项目,以防该通道中没有更多的项目(该通道也被生产商关闭)。
代码:

// ProcessAudits sends the given audits in batches to SQL
func ProcessAudits(done <-chan bq.Audit) {
    var audits []bq.Audit
    for auditRow := range done {
        user := auditRow.UserID.StringVal
        log.Infof("Received audit %s", user)
        audits = append(audits, auditRow)

        if len(audits) == 10 {
            upsertBigQueryAudits(audits)
            audits = []bigquery.Audit{}
        }
    }
}

我是新来的,我不知道如何正确地实施这一点?

ckx4rj1h

ckx4rj1h1#

这是一个有效的例子。当通道关闭时,范围退出,因此您可以在循环之后处理任何剩余的项。

package main

import (
    "fmt"
    "sync"
)

type Audit struct {
    ID int
}

func upsertBigQueryAudits(audits []Audit) {
    fmt.Printf("Processing batch of %d\n", len(audits))
    for _, a := range audits {
        fmt.Printf("%d ", a.ID)
    }
    fmt.Println()
}

func processAudits(audits <-chan Audit, batchSize int) {
    var batch []Audit
    for audit := range audits {
        batch = append(batch, audit)
        if len(batch) == batchSize {
            upsertBigQueryAudits(batch)
            batch = []Audit{}
        }
    }
    if len(batch) > 0 {
        upsertBigQueryAudits(batch)
    }
}

func produceAudits(x int, to chan Audit) {
    for i := 0; i < x; i++ {
        to <- Audit{
            ID: i,
        }
    }
}

const batchSize = 10

func main() {
    var wg sync.WaitGroup
    audits := make(chan Audit)
    wg.Add(1)
    go func() {
        defer wg.Done()
        processAudits(audits, batchSize)
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        produceAudits(25, audits)
        close(audits)
    }()
    wg.Wait()
    fmt.Println("Complete")
}

输出:

Processing batch of 10
0 1 2 3 4 5 6 7 8 9
Processing batch of 10
10 11 12 13 14 15 16 17 18 19
Processing batch of 5
20 21 22 23 24
Complete
4uqofj5v

4uqofj5v2#

你也可以用定时器。在这里玩例子https://play.golang.org/p/0atlgvcl-px

相关问题