xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (6) - Client Streaming 通信

前回は Server Streaming 通信について見ていきました。今回は Client 側から連続的にデータ送信を行う Client Streaming について解説していきます。

f:id:xin9le:20170604153550p:plain

Step.1 - サービス定義

今回もまずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<ClientStreamingResult<int, int>> SplitUpload();
      //Task<ClientStreamingResult<int, int>> SplitUpload(string arg);  // 引数はダメ
    }
}

戻り値の型を Task<ClientStreamingResult<TRequest, TResponse>> とするのがポイントです。ちなみに、これは gRPC の制限によるものであったと記憶していますが、Client Streaming の際にはメソッドに引数を設定することができないので注意が必要です。

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<ClientStreamingResult<int, int>> SplitUpload()
        {
            //--- クライアント側が WriteAsync するたびに呼び出される
            //--- CompleteAsync されるまでメッセージを受信し続ける
            var streaming = this.GetClientStreamingContext<int, int>();
            var sum = 0;
            await streaming.ForEachAsync(x =>
            {
                Console.WriteLine($"Received = {x}");
                sum += x;
            });

            //--- 結果を返す
            return streaming.Result(sum);
        }
    }
}

ポイントは以下の 4 点です。

  • GetClientStreamingContext<TRequest, TResponse> からストリーミング通信するためのコンテキストを取得
  • クライアント側から送信されるメッセージを ForEachAsync で受信
  • クライアント側から送信完了通知が来るまでメッセージを受信し続ける
  • Result メソッドで結果を送信

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。

using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- WriteAsync するとサーバー側の ForEachAsync が動く
            var streaming = await client.SplitUpload();
            foreach (var x in Enumerable.Range(1, 4))
                await streaming.RequestStream.WriteAsync(x);

            //--- 完了通知
            //--- これによりサーバー側の ForEachAsync が終了する
            await streaming.RequestStream.CompleteAsync();

            //--- サーバーからの結果を取得
            var response = await streaming.ResponseAsync;
            Console.WriteLine($"Response = {response}");

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

RequestStreamWriteAsync メソッドでサーバー側へメッセージを送信します。これによってサーバー側に ForEachAsync メソッドが呼び出されます。クライアント側は、必要なデータの送信がすべて完了したら CompleteAsync メソッドを呼び出してやる必要があります。そうしないとサーバー側はいつまでメッセージの受信待機を止めてよいか分からないからです。このような呼び出しのルールだけ守ってやれば難しくないでしょう。

実行してみる

実行すると以下のような結果が得られます。思い通りの動きをしていますね。

f:id:xin9le:20170611180036p:plain