绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
ASP.NET Core 3.0 gRPC 双向流
2019-11-05 10:08:46

四.gRPC中使用双向流调用

我们在前文中编写的RPC属于简单RPC,没有包含流调用,下面直接讲一下双向流,根据第二小节列举的四种服务类型,如果我们掌握了简单RPC和双向流RPC,那么服务端流式 RPC和客户端流式 RPC自然也就没有问题了。

这里我们继续使用前文的代码,要实现的目标是一次给多个猫洗澡。

① 首先在 LuCat.proto 定义两个rpc,一个 Count 用于统计猫的数量,一个 双向流 RPC BathTheCat 用于给猫洗澡

syntax = "proto3";

option csharp_namespace = "AspNetCoregRpcService";

import "google/protobuf/empty.proto";

package LuCat; //定义包名

//定义服务

service LuCat{

//定义给猫洗澡双向流rpc

rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);

//定义统计猫数量简单rpc

rpc Count(google.protobuf.Empty) returns (CountCatResult);

}

message SuckingCatResult{

string message=1;

}

message BathTheCatReq{

int32 id=1;

}

message BathTheCatResp{

string message=1;

}

message CountCatResult{

int32 Count=1;

}

② 添加服务的实现

这里安利下Resharper,非常方便

private readonly ILogger _logger;

private static readonly List Cats=new List(){"英短银渐层","英短金渐层","美短","蓝猫","狸花猫","橘猫"};

private static readonly Random Rand=new Random(DateTime.Now.Millisecond);

public LuCatService(ILogger logger)

{

_logger = logger;

}

public override async Task BathTheCat(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context)

{

var bathQueue=new Queue();

while (await requestStream.MoveNext())

{

//将要洗澡的猫加入队列

bathQueue.Enqueue(requestStream.Current.Id);

_logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");

}

//遍历队列开始洗澡

while (bathQueue.TryDequeue(out var catId))

{

await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });

await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果

}

}

public override Task Count(Empty request, ServerCallContext context)

{

return Task.FromResult(new CountCatResult()

{

Count = Cats.Count

});

}

BathTheCat 方法会接收多个客户端发来的CatId,然后将他们加入队列中,等客户端发送完成后开始依次洗澡并返回给客户端。

③ 客户端实现

随机发送10个猫Id给服务端,然后接收10个洗澡结果。

var channel = GrpcChannel.ForAddress("https://localhost:5001");

var catClient = new LuCat.LuCatClient(channel);

//获取猫总数

var catCount = await catClient.CountAsync(new Empty());

Console.WriteLine($"一共{catCount.Count}只猫。");

var rand = new Random(DateTime.Now.Millisecond);

var bathCat = catClient.BathTheCat();

//定义接收吸猫响应逻辑

var bathCatRespTask = Task.Run(async() =>

{

await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())

{

Console.WriteLine(resp.Message);

}

});

//随机给10个猫洗澡

for (int i = 0; i < 10; i++)

{

await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});

}

//发送完毕

await bathCat.RequestStream.CompleteAsync();

Console.WriteLine("客户端已发送完10个需要洗澡的猫id");

Console.WriteLine("接收洗澡结果:");

//开始接收响应

await bathCatRespTask;

Console.WriteLine("洗澡完毕");

④ 运行

可以看到双向流调用成功,客户端发送了10个猫洗澡请求对象,服务端返回了10个猫洗澡结果对象。且是实时推送的,这就是 gRPC 的双向流调用。

这里大家可以自行改进来演变成客户端流式或者服务端流式调用。客户端发送一个猫Id列表,然后服务端返回每个猫洗澡结果,这就是服务端流式调用。客户端依次发送猫Id,然后服务端一次性返回所有猫的洗澡结果(给所有猫洗澡看做是一个业务,返回这个业务的结果),就是客户端流式调用。这里我就不再演示了。

五.流控制

gRPC 的流式调用支持对流进行主动取消的控制,进而可以衍生出流超时限制等控制。

在流式调用是,可以传一个 CancellationToken 参数,它就是我们用来对流进行取消控制的:

改造一下我们在第四小节的代码:

① 客户端

var cts = new CancellationTokenSource();

//指定在2.5s后进行取消操作

cts.CancelAfter(TimeSpan.FromSeconds(2.5));

var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);

//定义接收吸猫响应逻辑

var bathCatRespTask = Task.Run(async() =>

{

try

{

await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())

{

Console.WriteLine(resp.Message);

}

}

catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)

{

Console.WriteLine("Stream cancelled.");

}

});

② 服务端

//遍历队列开始洗澡

while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))

{

_logger.LogInformation($"Cat {catId} Dequeue.");

await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });

await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果

}

③ 运行

设置的是双向流式调用2.5s后取消流,从客户端调用结果看到,并没有收到全部10个猫的洗澡返回结果,流就已经被取消了,这就是 gRPC 的流控制。

六.结束

这里流式调用可以实现实时推送,服务端到客户端或者客户端到服务端短实时推送消息,但是这个和传统意义上的长连接主动推送、广播消息不一样,不管你是服务端流式、客户端流式还是双向流式,必须要由客户端进行发起,通过客户端请求来建立流通信。

七.参考资料

GRPC的四种服务类型 by twtydgo

HTTP/2笔记之流和多路复用 by 聂永

本文所用代码

出处:https://www.cnblogs.com/stulzq/p/11590088.html

作者:晓晨Master(李志强),版权归作者所有,如有侵权,请联系删除.

分享好友

分享这个小栈给你的朋友们,一起进步吧。

应用开发
创建时间:2020-06-17 15:31:04
应用软件开发是指使用程序语言C#、java、 c++、vb等语言编写,主要是用于商业、生活应用的软件的开发。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~