Go语言 通过grpc ServerStream发送分块数据

u3r8eeie  于 2023-06-19  发布在  Go
关注(0)|答案(1)|浏览(130)

我定义了这个RPC

rpc Download(DownloadRequest) returns (stream google.api.HttpBody)

我最终想用它来处理CSV和二进制数据,但我将从CSV开始。
在我的处理程序中,我正在执行以下操作,以重复地从存储中获取数据并将其发送到流:

if err := stream.SetHeader(metadata.Pairs("content-disposition", "attachment")); err != nil {
    return err
}
chunkSize := uint64(100) // arbitrary chunk size
for offset := uint64(0); offset < objLen; offset += chunkSize {
    contents, err := getContent(offset, chunkSize)
    if err != nil {
        return err
    }
    err = stream.Send(&httpbody.HttpBody{
        ContentType: "text/csv",
        Data:        contents,
    })
    if err != nil {
        return err
    }
}

这个实现的问题是CSV结果以每chunkSize个字符换行结束。如何避免在输出中的每一个块后面出现换行符?
我使用grpc-gateway通过REST来提供服务,以防万一。

im9ewurl

im9ewurl1#

当该方法具有流式响应时,gRPC-Gateway通过发出换行符分隔的“块”流来处理该响应。如docs中所述。
您可以使用您的自定义封送拆收器用于特定的内容类型(例如'text/csv')更改分隔符(默认为[]byte('\n'))

package marshaler

import (
    "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
)

type marshaler struct {
    mime      string
    delimiter []byte
    runtime.Marshaler
}

func NewMarshaler(mime string, delimiter []byte) runtime.Marshaler {
    return &marshaler{mime: mime, delimiter: delimiter}
}

func (m *marshaler) Delimiter() []byte {
    return m.delimiter
}

func (m *marshaler) ContentType(v interface{}) string {
    return m.mime
}

然后在新的复用服务器(网关)

mux := runtime.NewServeMux(
        ....

        // allow chunked stream responses to work with "Content-Type: text/csv" requests
        runtime.WithMarshalerOption("text/csv", marshaler.NewMarshaler("text/csv", nil)),
    )

并发送具有“Content-Type:text/csv”

curl --location 'https://your-api-gateway/download' \
--header 'Content-Type: text/csv'

与你的rpc服务器无关

csvbyte := getCSV()
chunkSize := 1024 * 1024 * 1 // 1MB
    for i := 0; i < len(csvbyte); i += chunkSize {
        end := i + chunkSize
        if end > len(csvbyte) {
            end = len(csvbyte)
        }
        if err := stream.Send(&httpbody.HttpBody{
            ContentType: "text/csv",
            Data:        csvbyte[i:end],
        }); err != nil {
            return err
        }
    }

相关问题