我正在使用一些第三方API,每个API都有自己的速率限制。端点1的速率限制为10/s,端点2的速率限制为20/s。
我需要通过端点1处理我的数据,它将返回一个对象数组(2-3000个对象之间)。然后我需要将每个对象并将其中的一些数据发送到第二个端点,同时遵守第二个端点的速率限制。
我计划在go例程中发送第一个端点的请求,一次发送10个,确保如果所有10个请求都在〈1秒内完成,我不会在1秒内发送更多请求。
最终,我希望能够限制每个端点一次发出多少并发响应,特别是如果我必须为由于服务器500+响应等而失败的请求构建重试。
出于提问的目的,我使用httpbin请求来模拟以下场景:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid string `json:"uuid"`
StatusCode int
}
type HttpBinPostRequest struct {
url string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data string `json:"data"`
StatusCode int
}
func main() {
// Prepare GET requests for 500 requests
var requests []*HttpBinGetRequest
for i := 0; i < 500; i++ {
uri := "https://httpbin.org/uuid"
request := &HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
for i := 0; i < cap(getRate); i++ {
getRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/10th of a second
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate <- struct{}{}
// Add a token to the semaphore
getSemaphore <- struct{}{}
// Remove token when function is complete
defer func() {
<-getSemaphore
}()
resp, _ := get(r)
fmt.Printf("%+v\n", resp)
}(request)
}
wg.Wait()
// I need to add code that obtains the response data from the above for loop
// then sends the UUID it to its own go routines for a POST request, following a similar pattern above
// To not violate the rate limit of the second endpoint which is 20 calls per second
// postSemaphore := make(chan struct{}, 20)
// postRate := make(chan struct{}, 20)
// for i := 0; i < cap(postRate); i++ {
// postRate <- struct{}{}
// }
}
func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
httpResp := &HttpBinGetResponse{}
client := &http.Client{}
req, err := http.NewRequest("GET", hbgr.url, nil)
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
httpResp := &HttpBinPostResponse{}
client := &http.Client{}
req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("error getting response")
return httpResp, err
}
if resp.StatusCode == 429 {
fmt.Println(resp.Header.Get("Retry-After"))
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
fmt.Printf("%+v", httpResp)
return httpResp, nil
}
1条答案
按热度按时间juud5qan1#
这是一个生产者/消费者模式。您可以使用chan来连接它们。
关于速率限制器,我会使用
golang.org/x/time/rate
包。既然我们已经决定使用chan来连接生产者和消费者,那么很自然地将失败的任务发送到同一个chan,以便消费者可以重试。
我已经将逻辑封装到类型
Scheduler[T]
中。请参阅下面的演示。请注意,演示只是为了说明这个想法而匆忙编写的。它没有经过彻底的测试。输出如下所示: