我正在从REST API中检索负载,然后希望使用该负载将数据插入Snowflake表。
我当前的过程是使用Snowflake DB连接并迭代一个struct切片(包含来自API的数据)。然而,这似乎不是有效的或最优的。一切都在成功加载,但我试图找出如何优化可能有数千条记录的大量插入。也许需要有一个单独的插入通道,而不是同步插入?
通用代码流:
import (
"database/sql"
"fmt"
"sync"
"time"
_ "github.com/snowflakedb/gosnowflake"
)
func ETL() {
var wg sync.WaitGroup
ch := make(chan []*Response)
defer close(ch)
// Create requests to API
for _, req := range requests {
// All of this flows fine without issue
wg.Add(1)
go func(request Request) {
defer wg.Done()
resp, _ := request.Get()
ch <- resp
}(request)
}
// Connect to snowflake
// This is not a problem
connString := fmt.Sprintf(config...)
db, _ := sql.Open("snowflake", connString)
defer db.Close()
// Collect responses from our channel
results := make([][]*Response, len(requests))
for i, _ := range results {
results[i] <-ch
for _, res := range results[i] {
// transform is just a function to flatten my structs into entries that I would like to insert into Snowflake. This is not a bottleneck.
entries := transform(res)
// Load the data into snowflake, passing the entries that have been
// Flattened as well as the db connection
err := load(entries, db)
}
}
}
type Entry struct {
field1 string
field2 string
statusCode int
}
func load(entries []*Entry, db *sql.DB) error {
start := time.Now()
for i, entry := range entries {
fmt.Printf("Loading entry %d\n", i)
stmt := `INSERT INTO tbl (field1, field2, updated_date, status_code)
VALUES (?, ?, CURRENT_TIMESTAMP(), ?)`
_, err := db.Exec(stmt, entry.field1, entry.field2, entry.statusCode)
if err != nil {
fmt.Println(err)
return err
}
}
fmt.Println("Load time: ", time.Since(start))
return nil
}
1条答案
按热度按时间qnakjoqk1#
不是
INSERT
ing单个行,而是将行收集到文件中,每次将其中一行推送到S3/GCS/Azure时,它都会立即加载。我写了一篇文章详细介绍了这些步骤:
通过适当的存储集成,这将自动接收文件:
还要检查以下注意事项:
很快:如果您想发送单独的行并将它们真实的接收到Snowflake中--这还在开发中(https://www.snowflake.com/blog/snowflake-streaming-now-hiring-help-design-and-build-the-future-of-big-data-and-stream-processing/)。