1. Grpc -- Client-side Straming RPC

第三种是client-side streaming RPC,其中客户端写一笸箩的消息发送给server, 当客户端发送完所有的消息后,客户端需要等待server端读取完所有的消息,然后 返回server端的response。通过在request类型的前面加上关键字stream,即可以 定义一个client-side streaming RPC,如下:

service RouteGuide {

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  //...
}

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

protoc编译该.proto文件生成的go源码客户端如下,相比于第一种simple-rpc,可以注意到 客户端如果调用RecordRoute方法的话,返回的会是接口RouteGuide_RecordRouteClient, 用户需要使用该接口的Send方法来给server端发送那笸箩的messages(Point类型), 然后使用CloseAndRecv()(*RouteSummary, error)方法来进行接收server最后的response:

type RouteGuideClient interface {
  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error)
  //...
}

type routeGuideClient struct {
  cc *grpc.ClientConn
}

func NewRouteGuideClient(cc *grpc.ClientConn) RouteGuideClient {
  return &routeGuideClient{cc}
}

func (c *routeGuideClient) RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error) {
  stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[1], c.cc, "/routeguide.RouteGuide/RecordRoute", opts...)
  if err != nil {
    return nil, err
  }
  x := &routeGuideRecordRouteClient{stream}
  return x, nil
}

type RouteGuide_RecordRouteClient interface {
  Send(*Point) error
  CloseAndRecv() (*RouteSummary, error)
  grpc.ClientStream
}

type routeGuideRecordRouteClient struct {
  grpc.ClientStream
}

func (x *routeGuideRecordRouteClient) Send(m *Point) error {
  return x.ClientStream.SendMsg(m)
}

func (x *routeGuideRecordRouteClient) CloseAndRecv() (*RouteSummary, error) {
  if err := x.ClientStream.CloseSend(); err != nil {
    return nil, err
  }
  m := new(RouteSummary)
  if err := x.ClientStream.RecvMsg(m); err != nil {
    return nil, err
  }
  return m, nil
}

protoc编译.proto文件生成的server端所需要的go代码如下,注意到RouteGuideServer接口中定义的方法 RecordRoute(RouteGuide_RecordRouteServer) error,由于是request类型前边带了关键字stream (对应client-side streaming RPC),因此,不同于前边那种simple rpc,传入的参数是RouteGuide_RecordRouteServer接口, 有Recv()(*Point, error)SendAndClose(*RouteSummary) error方法:

type RouteGuideServer interface {
  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  RecordRoute(RouteGuide_RecordRouteServer) error
  //...
}

func _RouteGuide_RecordRoute_Handler(srv interface{}, stream grpc.ServerStream) error {
  return srv.(RouteGuideServer).RecordRoute(&routeGuideRecordRouteServer{stream})
}

type RouteGuide_RecordRouteServer interface {
  SendAndClose(*RouteSummary) error
  Recv() (*Point, error)
  grpc.ServerStream
}

type routeGuideRecordRouteServer struct {
  grpc.ServerStream
}

func (x *routeGuideRecordRouteServer) SendAndClose(m *RouteSummary) error {
  return x.ServerStream.SendMsg(m)
}

func (x *routeGuideRecordRouteServer) Recv() (*Point, error) {
  m := new(Point)
  if err := x.ServerStream.RecvMsg(m); err != nil {
    return nil, err
  }
  return m, nil
}

因此,对于server端的话,需要实现RecordRoute方法,需要在for循环中 使用接口pb.RouteGuide_RecordRouteServer中定义的Recv方法, 当接收完毕的时候(err == io.EOF),则需要调用SendAndClose(m *RouteSummary)方法来向client端发送 最终的Response,

// RecordRoute records a route composited of a sequence of points.
//
// It gets a stream of points, and responds with statistics about the "trip":
// number of points,  number of known features visited, total distance traveled, and
// total time spent.
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

而对于client端,在调用完统一的RPC方法后 RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error), 需要使用接口RouteGuide_RecordRouteClient中定义的方法Send来向Server端发送消息(一笸箩的Points), 比如下面的用法:

// runRecordRoute sends a sequence of points to server and expects to get a RouteSummary from server.
func runRecordRoute(client pb.RouteGuideClient) {
  // Create a random number of random points
  r := rand.New(rand.NewSource(time.Now().UnixNano()))
  pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
  var points []*pb.Point
  for i := 0; i < pointCount; i++ {
    points = append(points, randomPoint(r))
  }
  grpclog.Printf("Traversing %d points.", len(points))
  stream, err := client.RecordRoute(context.Background())
  if err != nil {
    grpclog.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
  }
  for _, point := range points {
    if err := stream.Send(point); err != nil {
      grpclog.Fatalf("%v.Send(%v) = %v", stream, point, err)
    }
  }
  reply, err := stream.CloseAndRecv()
  if err != nil {
    grpclog.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
  }
  grpclog.Printf("Route summary: %v", reply)
}

results matching ""

    No results matching ""