http 请求 stream 响应时,response body 打印出来是正确的结果:
{"result":{"code":1,"msg":"1111"}}
{"result":{"code":2,"msg":"2222"}}
{"result":{"code":3,"msg":"3333"}}
{"result":{"code":4,"msg":"4444"}}
{"result":{"code":5,"msg":"5555"}}
{"result":{"code":6,"msg":"6666"}}
但是使用runtime.JSONPb.Decode
时,也会得到五个结果,但每个 decode 出来是个 nil... :
=== RUN TestHttpRespStream
service_test.go:147: resp: <nil>
service_test.go:147: resp: <nil>
service_test.go:147: resp: <nil>
service_test.go:147: resp: <nil>
service_test.go:147: resp: <nil>
service_test.go:147: resp: <nil>
service_test.go:149: EOF
--- PASS: TestHttpRespStream (0.62s)
这是 proto 文件:
// ./pb/test.proto
syntax = "proto3";
package pb;
option go_package = "/pb;pb";
import "google/api/annotations.proto";
message Req {
int32 id = 1;
string name = 2;
}
message Resp {
int32 code = 1;
string msg = 2;
}
service TestService {
rpc QueryStreamResp(Req) returns (stream Resp){
option (google.api.http) = {
post: "/query-stream-resp"
body: "*"
};
};
rpc QueryStreamReq(stream Req) returns (Resp){
option (google.api.http) = {
post: "/query-stream-req"
body: "*"
};
};
rpc Query(stream Req) returns (stream Resp){
option (google.api.http) = {
post: "/query"
body: "*"
};
};
}
grpc 服务端:
func (ts *TestService) QueryStreamResp(req *pb.Req, stream pb.TestService_QueryStreamRespServer) error {
log.Printf("QueryStreamResp|start...|req: %+v\n", req)
result := []*pb.Resp{
{Code: 1, Msg: "1111"},
{Code: 2, Msg: "2222"},
{Code: 3, Msg: "3333"},
{Code: 4, Msg: "4444"},
{Code: 5, Msg: "5555"},
{Code: 6, Msg: "6666"},
}
// header := make(metadata.MD)
// header.Append("content-type", "application/json")
// stream.SendHeader(header)
for i := range result {
log.Printf("resp: %+v", result[i])
if err := stream.Send(result[i]); err != nil {
log.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
}
log.Println("QueryStreamResp|stop...")
return nil
}
单元测试:
func TestHttpRespStream(t *testing.T) {
url := "http://127.0.0.1:8080/query-stream-resp"
reqData := &pb.Req{Id: 1, Name: "111"}
var buffer bytes.Buffer
encoder := (&runtime.JSONPb{}).NewEncoder(&buffer)
if err := encoder.Encode(reqData); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, &buffer)
if err != nil {
t.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// t.Fatal(err)
// }
// t.Logf("body: %s", string(body))
jsonb := new(runtime.JSONPb)
dencoder := jsonb.NewDecoder(resp.Body)
for {
var result *pb.Resp
err := dencoder.Decode(result)
if err == nil {
t.Logf("resp: %+v", result)
} else {
t.Logf("%+v", err)
break
}
}
}
1
HUNYXV OP |