我有一段在Go语言中的通道之间进行通信的代码。它似乎完成了所需的操作,但它在最后挂起。我试图诊断它挂起的原因。
代码使用httpbin.org
获取随机UUID,然后发布它,同时遵守我通过信号量通道和速率通道建立的并发性和速率限制。
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid string `json:"uuid"`
StatusCode int
}
type HttpBinPostRequest struct {
url string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data string `json:"data"`
StatusCode int
}
func main() {
// Prepare GET requests for n requests
var requests []*HttpBinGetRequest
for i := 0; i < 10; i++ {
uri := "https://httpbin.org/uuid"
request := &HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
defer close(getRate)
defer close(getSemaphore)
for i := 0; i < cap(getRate); i++ {
getRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/nth of a second
// where n = rate limit
// basically (1000 / rps) * time.Millisecond
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
respChan := make(chan HttpBinGetResponse)
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// cnt := c
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate <- struct{}{}
// fmt.Printf("Request #%d at: %s\n", cnt, time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"))
resp, _ := get(r, getSemaphore)
fmt.Printf("%+v\n", resp)
// Place our response into the channel
respChan <- *resp
// fmt.Printf("%+v,%s\n", resp, time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"))
}(request)
}
// Set up for POST requests 10/s
postSemaphore := make(chan struct{}, 10)
postRate := make(chan struct{}, 10)
defer close(postRate)
defer close(postSemaphore)
for i := 0; i < cap(postRate); i++ {
postRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/nth of a second
// where n = rate limit
// basically (1000 / rps) * time.Millisecond
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-postRate
if !ok {
return
}
}
}()
// Read responses as they become available
for ele := range respChan {
postReq := &HttpBinPostRequest{
url: "https://httpbin.org/post",
uuid: ele.Uuid,
}
go func(r *HttpBinPostRequest) {
postRate <- struct{}{}
postResp, err := post(r, postSemaphore)
if err != nil {
fmt.Println(err)
}
fmt.Printf("%+v\n", postResp)
}(postReq)
}
wg.Wait()
close(respChan)
}
func get(hbgr *HttpBinGetRequest, sem chan struct{}) (*HttpBinGetResponse, error) {
// Add a token to the semaphore
sem <- struct{}{}
// Remove token when function is complete
defer func() { <-sem }()
httpResp := &HttpBinGetResponse{}
client := &http.Client{}
req, err := http.NewRequest("GET", hbgr.url, nil)
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest, sem chan struct{}) (*HttpBinPostResponse, error) {
// Add a token to the semaphore
sem <- struct{}{}
defer func() { <-sem }()
httpResp := &HttpBinPostResponse{}
client := &http.Client{}
req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
1条答案
按热度按时间brc7rcf01#
您正在通过
range
语句从代码末尾的respChan
阅读。在通道关闭之前,此代码不会退出-这发生在该代码块之后。所以程序永远不会退出--因为所有这些逻辑都在同一个goroutine中。
为了修复-并确保所有记录在程序退出之前被处理-将通道阅读代码保留在主goroutine中,并将等待/关闭逻辑放在它自己的goroutine中:
range
循环中的任何子goroutine-可能有一种更干净的方法来使用一个waitgroup,但一个快速解决方案可能是: