无法使用Golang SDK在AWS athena上执行查询

qybjjes1  于 2023-04-18  发布在  Go
关注(0)|答案(3)|浏览(162)

我是AWS和Golang的新手,我正在尝试创建一个lambda函数,它将触发AWS Athena查询并使用AWS SES服务通过电子邮件发送结果。即使在搜索了一个小时后,我也找不到lambda函数(Golang中)的工作示例来执行对Athena的查询并获得查询的输出。
在搜索时,我在Java、Python和Node Js中找到了相同的代码,但在Golang中没有。
甚至Go-SDK页面也会重定向到Java示例。但不幸的是,我甚至不懂Java。
我也看过这个AWS SDK for Go API Reference页面,但是我不明白程序的流程是什么,选择哪个操作。
我已经试着为此创建程序,这可能是完全错误的,我不知道下一步该怎么办。下面是代码-

package main

import (
    "fmt"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/athena"
)

func main() {

    // Create a new session in the us-west-2 region.
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("us-east-1")},
    )

    // Create an Athena session.
    client := athena.New(sess)

    // Example sending a request using the StartQueryExecutionRequest method.
    query := "SELECT * FROM table1 ;"
    params := query
    req, resp := client.StartQueryExecutionRequest(params)

    err1 := req.Send()
    if err1 == nil { // resp is now filled
        fmt.Println(resp)
    }
}

如果有人可以帮助我执行Athena查询并在Golang中获得结果(最好),或者可以共享一些资源,我将不胜感激。一旦我得到了它,我就可以使用AWS SES发送电子邮件。

uqcuzwp8

uqcuzwp81#

用这个开始吧。

// run as: go run main.go
package main

import (
    "context"
    "fmt"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/aws/endpoints"
    "github.com/aws/aws-sdk-go-v2/aws/external"
    "github.com/aws/aws-sdk-go-v2/service/athena"
)

const table = "textqldb.textqltable"
const outputBucket = "s3://bucket-name-here/"

func main() {

    cfg, err := external.LoadDefaultAWSConfig()
    if err != nil {
        fmt.Printf("config error: %v\n", err)
        return
    }

    cfg.Region = endpoints.UsEast2RegionID

    client := athena.New(cfg)

    query := "select * from " + table

    resultConf := &athena.ResultConfiguration{
        OutputLocation: aws.String(outputBucket),
    }

    params := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(query),
        ResultConfiguration: resultConf,
    }

    req := client.StartQueryExecutionRequest(params)

    resp, err := req.Send(context.TODO())
    if err != nil {
        fmt.Printf("query error: %v\n", err)
        return
    }

    fmt.Println(resp)
}
kgsdhlau

kgsdhlau2#

@埃弗顿的代码正在Athena上执行查询,其响应被保存在S3 bucket上并且没有返回。因此,我添加了执行Athena查询并获取响应的代码。希望这可以帮助其他人。

// run as: go run main.go
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/aws/endpoints"
    "github.com/aws/aws-sdk-go-v2/aws/external"
    "github.com/aws/aws-sdk-go-v2/service/athena"
)

const table = "<Database_Name>.<Table_Name>"
const outputBucket = "s3://bucket-name-here/"

// Execute the query and return the query ID
func executeQuery(query string) *string {

    cfg, err := external.LoadDefaultAWSConfig()
    if err != nil {
        fmt.Printf("config error: %v\n", err)
    }

    cfg.Region = endpoints.UsEast2RegionID

    client := athena.New(cfg)

    resultConf := &athena.ResultConfiguration{
        OutputLocation: aws.String(outputBucket),
    }

    params := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(query),
        ResultConfiguration: resultConf,
    }

    req := client.StartQueryExecutionRequest(params)

    resp, err := req.Send(context.TODO())
    fmt.Println("Response is: ", resp, " Error is:", err)

    if err != nil {
        fmt.Printf("Query Error: %v\n", err)
    }

    fmt.Println("Query Execution Response ID:", resp.QueryExecutionId)
    return resp.QueryExecutionId
}

// Takes queryId as input and returns its response
func getQueryResults(QueryID *string) (*athena.GetQueryResultsResponse, error) {

    cfg, err := external.LoadDefaultAWSConfig()
    if err != nil {
        panic("config error")
    }

    cfg.Region = endpoints.UsEast2RegionID
    client := athena.New(cfg)
    params1 := &athena.GetQueryResultsInput{
        QueryExecutionId: QueryID,
    }
    req := client.GetQueryResultsRequest(params1)

    resp, err := req.Send(context.TODO())

    if err != nil {
        fmt.Printf("Query Response Error: %v\n", err)
        return nil, err
    }
    return resp, nil

}

func main() {

    query := "select * from " + table

    // Execute an Athena Query
    QueryID := executeQuery(query)

    // Get the response of the query

    // Wait for some time for query completion
    time.Sleep(15 * time.Second) // Otherwise create a loop and try for every x seconds
    Resp, err := getQueryResults(QueryID)

    if err != nil {
        fmt.Printf("Error getting Query Response: %v\n", err)
    } else {
        fmt.Println(" \nRows:", Resp.ResultSet.Rows)
    }

}
lrpiutwd

lrpiutwd3#

以防有人怀疑

  • 正在等待查询执行
  • 如何从查询结果中获取数据
  • 如何处理页面较多的查询结果

示例代码,使用aws-sdk-go-v2

package main

import (
  "fmt"
  "context"
  "time"
  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/athena"
  "github.com/aws/aws-sdk-go-v2/service/athena/types"
  "github.com/aws/aws-sdk-go-v2/config"
)

const REGION = "ap-southeast-1"
const QUERY = "SELECT * FROM sampletable"
const OUTPUT_BUCKET = "s3://athena-result-output-bucket"

func main() {

  cfg, err := config.LoadDefaultConfig(context.TODO(), func(o *config.LoadOptions) error {
    o.Region = REGION
    return nil
  })

  if err != nil {
    fmt.Println(err)
    return
  }

  client := athena.NewFromConfig(cfg)

  resultConfig := &types.ResultConfiguration{
    OutputLocation: aws.String(OUTPUT_BUCKET),
  }

  executeParams := &athena.StartQueryExecutionInput{
    QueryString: aws.String(QUERY),
    ResultConfiguration: resultConfig,
  }

  // Start Query Execution
  athenaExecution, err := client.StartQueryExecution(context.TODO(), executeParams)

  if err != nil {
    fmt.Println(err)
    return
  }

  // Get Query execution and check for the Query state constantly every 2 second
  executionId := *athenaExecution.QueryExecutionId

  for {
    time.Sleep(2 * time.Second)

    status, stateErr := client.GetQueryExecution(context.TODO(), &athena.GetQueryExecutionInput{
      QueryExecutionId: &executionId,
    })

    if stateErr != nil {
      fmt.Println(stateErr)
      return
    }

    state := status.QueryExecution.Status.State

    if state == types.QueryExecutionStateSucceeded || state == types.QueryExecutionStateCancelled {
      break
    } else if state == types.QueryExecutionStateFailed {
      fmt.Println("Query Execution failed")
      return
    }
  }

  // Get Query result 

  queryParams := &athena.GetQueryResultsInput{
    QueryExecutionId: &executionId,
  }

  for {

    result, getQueryErr := client.GetQueryResults(context.TODO(), queryParams)

    if getQueryErr != nil {
      fmt.Println(getQueryErr)
      return
    }

    // Get data from query result
    for rowIndex, row := range result.ResultSet.Rows {
      // first row are the Column name
      if rowIndex == 0 {
        continue
      }

      // Display Column data
      fmt.Printf("Col1: %v, Col2: %v \n", *row.Data[0].VarCharValue, *row.Data[1].VarCharValue)
    }
    

    // If NO more next page result set
    if result.NextToken == nil {
      break
    }

    queryParams.NextToken = result.NextToken
  }
}

相关问题