xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (5) - Server Streaming 通信

前回は最も簡単な Unary 通信について解説しました。今回はサーバーからのプッシュ配信を行うための Server Streaming 通信について見ていきます。

f:id:xin9le:20170604153542p:plain

Step.1 - サービス定義

まずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<ServerStreamingResult<int>> Repeat(int value, int count);
    }
}

ポイントは以下の通りですが、Unary 通信のときと比べ戻り値以外の違いはありません。

  • IService<T> インターフェースを実装する
  • IService<T> の型引数には自身のインターフェースを入れる
  • メソッドの戻り値を Task<ServerStreamingResult<T>> にする

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Linq;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<ServerStreamingResult<int>> Repeat(int value, int count)
        {
            Console.WriteLine($"(value, count) = ({value}, {count})");

            //--- WriteAsync するたびにレスポンスが返る
            var streaming = this.GetServerStreamingContext<int>();
            foreach (var x in Enumerable.Repeat(value, count))
                await streaming.WriteAsync(x);

            //--- 完了信号を返す
            return streaming.Result();
        }
    }
}

ポイントは以下の 4 点です。

  • GetServerStreamingContext<T> からストリーミング通信するためのコンテキストを取得
  • WriteAsync メソッドで値をクライアント側に返す (= プッシュ配信)
  • Result メソッドで完了通知を送る
  • プッシュ配信はサーバー側のメソッドが呼び出されてから終了するまでの間に行う

SignalR などでの開発をしたことがある方は、ここでひとつ疑問に思うかもしれません。たとえば「Unary 通信をしている途中で Server Streaming で複数のユーザーに結果を返したいときはどうしたら良いのか」などです。今回は説明を割愛しますが、ちょこっと手間をかければ可能です。これについては別途解説したいと思います。

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- サーバーが WriteAsync すると ForEachAsync が動く
            //--- サーバーから完了信号が送られると ForEachAsync が終了する
            var streaming = await client.Repeat(3, 5);
            await streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Result : {x}"));

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

サーバーからの送信されたメッセージは ResponseStream から流れてきます。標準ではこれを MoveNext / Current で読み出すのですが、これは結構に煩雑な書き方です。なので MagicOnion では簡略化した方法として ForEachAsync メソッドを用意しています。ForEachAsync メソッドを利用すれば、await が完了するまでストリームを読み続けることができます。

実行してみる

実行すると以下のような結果が得られます。思い通りの動きをしていますね。

f:id:xin9le:20170606011225p:plain