使用Snowflake & Go的大批量插入

3pvhb19x  于 2023-02-01  发布在  Go
关注(0)|答案(1)|浏览(107)

我正在从REST API中检索负载,然后希望使用该负载将数据插入Snowflake表。
我当前的过程是使用Snowflake DB连接并迭代一个struct切片(包含来自API的数据)。然而,这似乎不是有效的或最优的。一切都在成功加载,但我试图找出如何优化可能有数千条记录的大量插入。也许需要有一个单独的插入通道,而不是同步插入?
通用代码流:

import (
    "database/sql"
    "fmt"
    "sync"
    "time"

    _ "github.com/snowflakedb/gosnowflake"
)

func ETL() {
     var wg sync.WaitGroup
     ch := make(chan []*Response)
     defer close(ch)
     
     // Create requests to API
     for _, req := range requests {
          // All of this flows fine without issue
          wg.Add(1)
          go func(request Request) {
               defer wg.Done()
               resp, _ := request.Get() 
               ch <- resp
          }(request)
     }

     // Connect to snowflake
     // This is not a problem
     connString := fmt.Sprintf(config...)
     db, _ := sql.Open("snowflake", connString)
     defer db.Close()

     // Collect responses from our channel
     results := make([][]*Response, len(requests))
     for i, _ := range results {
         results[i] <-ch
         for _, res :=  range results[i] {
             // transform is just a function to flatten my structs into entries that I would like to insert into Snowflake. This is not a bottleneck.
             entries := transform(res)

             // Load the data into snowflake, passing the entries that have been
             // Flattened as well as the db connection
             err := load(entries, db)
         }
     }
}

type Entry struct {
    field1 string
    field2 string
    statusCode int
}

func load(entries []*Entry, db *sql.DB) error {
    start := time.Now()
    for i, entry := range entries {
        fmt.Printf("Loading entry %d\n", i)

        stmt := `INSERT INTO tbl (field1, field2, updated_date, status_code)
             VALUES (?, ?, CURRENT_TIMESTAMP(), ?)`

        _, err := db.Exec(stmt, entry.field1, entry.field2, entry.statusCode)
        if err != nil {
            fmt.Println(err)
            return err
        }
    }
    fmt.Println("Load time: ", time.Since(start))
    return nil
}
qnakjoqk

qnakjoqk1#

不是INSERT ing单个行,而是将行收集到文件中,每次将其中一行推送到S3/GCS/Azure时,它都会立即加载。
我写了一篇文章详细介绍了这些步骤:

通过适当的存储集成,这将自动接收文件:

create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;

还要检查以下注意事项:

很快:如果您想发送单独的行并将它们真实的接收到Snowflake中--这还在开发中(https://www.snowflake.com/blog/snowflake-streaming-now-hiring-help-design-and-build-the-future-of-big-data-and-stream-processing/)。

相关问题