xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (8) - 独自型を送受信する

ここまで gRPC / MagicOnion を基本的な通信方法について見てきました。しかし、送受信に利用した型はすべて Primitive 型ばかりでした。独自に定義した型で通信したいと思うのは当然!ということで、今回は独自型でサーバー / クライアント間のやり取りをする方法について解説していきます。

MessagePack for C# のルールに従う

MagicOnion で通信されるデータのシリアライズには MessagePack for C# が利用されています。これにより MessagePack for C# が提供する超高速なシリアライズの恩恵を受けられるだけでなく IDL も不要になるため、.NETer が gRPC を使う環境としては最高に快適なものとなっています。

MagicOnion のシリアライザとして MessagePack for C# が利用されているということは、独自型をやり取りする場合も MessagePack for C# のルールに則るということになります。

独自型を定義する

通信でやりとりする独自型はサーバーとクライアントで型が共有されなければなりません。なので、サービス定義のプロジェクト内 (もしくはそれに類するサーバーとクライアントで共有可能なプロジェクト) に型を定義します。例えば以下のようにします。

using MessagePack;

namespace MagicOnionSample.ServiceDefinition
{
    //--- MessagePack for C# によるシリアライズ対象であることマーク
    [MessagePackObject]
    public struct Vector2
    {
        //--- バイナリのレイアウトの順番を設定
        [Key(0)]
        public float X { get; }

        [Key(1)]
        public float Y { get; }

        //--- デシリアライズに使うコンストラクタであることをマーク
        //--- ※ルールに沿っていれば必須ではない
        [SerializationConstructor]
        public Vector2(float x, float y)
        {
            this.X = x;
            this.Y = y;
        }
    }
}

MessagePack for C# のルールに則った記述をソラでするのはなかなかに厳しいと思います。そんなときに便利なのが MessagePackAnalyzer です。サービス定義のプロジェクトに NuGet からアナライザーを取得して追加しておきましょう。

PM> Install-Package MessagePackAnalyzer

MessagePack for C# に関する詳細は README を参照ください。

独自型を利用して通信する

独自型の定義ができたので、これを利用して実装してみましょう。例えば以下のようにすれば OK です。

using MagicOnion;
using MessagePack;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        //--- 独自型を使ったサービス定義
        UnaryResult<Nil> Sample(Vector2 point);
    }
}
using System;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        //--- そのまま受け取れます
        public async UnaryResult<Nil> Sample(Vector2 point)
        {
            Console.WriteLine($"(x, y) = ({point.X}, {point.Y})");
            return Nil.Default;  //--- 独自型を返せます
        }
    }
}
using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- そのまま送受信できます
            var point = new Vector2(3, 5);
            var result = await client.Sample(point);
            Console.WriteLine(result.GetType().FullName);

            Console.ReadLine();
        }
    }
}

Nil という初出の型がありますが、これは MessagePack for C# に定義されている「何もない」を表現する型です。Rx を知っている方であれば Unit 型と同じようなものと言えば良いでしょうか。実行結果は以下のようになります。

f:id:xin9le:20170612013336p:plain

まとめ

MessagePack for C# のルールに則って独自型を定義するだけで難なくサーバー/クライアント間でやりとりすることができるようになりました。今回は説明していませんが、独自型のネストやコレクションにも対応しています。結局のところ gRPC は HTTP/2 上でバイナリデータを送受信しているだけなので、MessagePack for C# でシリアライズ / デシリアライズできるものであれば何だって通信できます。

gRPC / MagicOnion 入門 (7) - Duplex Streaming 通信

前回前々回と Server Streaming 通信と Client Streaming 通信について解説してきました。今回はこれらを合わせた通信方法である Duplex Streaming 通信について見ていきます。

f:id:xin9le:20170604153558p:plain

Step.1 - サービス定義

いつも通り、まずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<DuplexStreamingResult<int, int>> DuplexSample();
      //Task<DuplexStreamingResult<int, int>> DuplexSample(string arg);  // 引数はダメ
    }
}

戻り値の型を Task<DuplexStreamingResult<TRequest, TResponse>> とするのがポイントです。また、Client Streaming のときと同様にメソッドに引数を設定できないので注意が必要です。

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<DuplexStreamingResult<int, int>> DuplexSample()
        {
            var streaming = this.GetDuplexStreamingContext<int, int>();
            var task = streaming.ForEachAsync(async x =>
            {
                //--- クライアントから送信された値が偶数だったら 2 倍にして返してみたり
                Console.WriteLine($"Received : {x}");
                if (x % 2 == 0)
                    await streaming.WriteAsync(x * 2);
            });

            //--- サーバー側から任意のタイミングで送信してみたり
            await Task.Delay(100);  // テキトーにずらしたり
            await streaming.WriteAsync(123);
            await streaming.WriteAsync(456);

            //--- メッセージの受信がすべて終わるまで待つ
            await task;

            //--- サーバーからの送信が完了したことを通知
            return streaming.Result();
        }
    }
}

ポイントは以下の 5 点です。これまで Server Streaming と Client Streaming で見てきたもののミックスになっているので結構複雑です。

  • GetDuplexStreamingContext<TRequest, TResponse> からストリーミング通信するためのコンテキストを取得
  • クライアント側から送信されるメッセージを ForEachAsync で受信
  • クライアント側から送信完了通知が来るまでメッセージを受信し続ける
  • WriteAsync でサーバー側からメッセージを送信
  • Result メソッドで完了通知を送信

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。こちらもサーバー側と同様に、受信と送信が入り乱れるので難易度が高いです。

using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- ForEachAsync でサーバーからのメッセージを受信
            var streaming = await client.DuplexSample();
            var task = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Response : {x}"));

            //--- WriteAsync でサーバー側にメッセージを送信
            //--- CompleteAsync で送信完了を通知
            foreach (var x in Enumerable.Range(0, 5))
                await streaming.RequestStream.WriteAsync(x);
            await streaming.RequestStream.CompleteAsync();

            //--- メッセージの受信完了を待機
            await task;

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

実行してみる

実行すると以下のような結果が得られます。結構に複雑なので、実際にデバッガーでひとつずつステップ実行して試してみることをおすすめします。

f:id:xin9le:20170611235710p:plain

gRPC / MagicOnion 入門 (6) - Client Streaming 通信

前回は Server Streaming 通信について見ていきました。今回は Client 側から連続的にデータ送信を行う Client Streaming について解説していきます。

f:id:xin9le:20170604153550p:plain

Step.1 - サービス定義

今回もまずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<ClientStreamingResult<int, int>> SplitUpload();
      //Task<ClientStreamingResult<int, int>> SplitUpload(string arg);  // 引数はダメ
    }
}

戻り値の型を Task<ClientStreamingResult<TRequest, TResponse>> とするのがポイントです。ちなみに、これは gRPC の制限によるものであったと記憶していますが、Client Streaming の際にはメソッドに引数を設定することができないので注意が必要です。

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<ClientStreamingResult<int, int>> SplitUpload()
        {
            //--- クライアント側が WriteAsync するたびに呼び出される
            //--- CompleteAsync されるまでメッセージを受信し続ける
            var streaming = this.GetClientStreamingContext<int, int>();
            var sum = 0;
            await streaming.ForEachAsync(x =>
            {
                Console.WriteLine($"Received = {x}");
                sum += x;
            });

            //--- 結果を返す
            return streaming.Result(sum);
        }
    }
}

ポイントは以下の 4 点です。

  • GetClientStreamingContext<TRequest, TResponse> からストリーミング通信するためのコンテキストを取得
  • クライアント側から送信されるメッセージを ForEachAsync で受信
  • クライアント側から送信完了通知が来るまでメッセージを受信し続ける
  • Result メソッドで結果を送信

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。

using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- WriteAsync するとサーバー側の ForEachAsync が動く
            var streaming = await client.SplitUpload();
            foreach (var x in Enumerable.Range(1, 4))
                await streaming.RequestStream.WriteAsync(x);

            //--- 完了通知
            //--- これによりサーバー側の ForEachAsync が終了する
            await streaming.RequestStream.CompleteAsync();

            //--- サーバーからの結果を取得
            var response = await streaming.ResponseAsync;
            Console.WriteLine($"Response = {response}");

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

RequestStreamWriteAsync メソッドでサーバー側へメッセージを送信します。これによってサーバー側に ForEachAsync メソッドが呼び出されます。クライアント側は、必要なデータの送信がすべて完了したら CompleteAsync メソッドを呼び出してやる必要があります。そうしないとサーバー側はいつまでメッセージの受信待機を止めてよいか分からないからです。このような呼び出しのルールだけ守ってやれば難しくないでしょう。

実行してみる

実行すると以下のような結果が得られます。思い通りの動きをしていますね。

f:id:xin9le:20170611180036p:plain

gRPC / MagicOnion 入門 (5) - Server Streaming 通信

前回は最も簡単な Unary 通信について解説しました。今回はサーバーからのプッシュ配信を行うための Server Streaming 通信について見ていきます。

f:id:xin9le:20170604153542p:plain

Step.1 - サービス定義

まずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<ServerStreamingResult<int>> Repeat(int value, int count);
    }
}

ポイントは以下の通りですが、Unary 通信のときと比べ戻り値以外の違いはありません。

  • IService<T> インターフェースを実装する
  • IService<T> の型引数には自身のインターフェースを入れる
  • メソッドの戻り値を Task<ServerStreamingResult<T>> にする

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Linq;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<ServerStreamingResult<int>> Repeat(int value, int count)
        {
            Console.WriteLine($"(value, count) = ({value}, {count})");

            //--- WriteAsync するたびにレスポンスが返る
            var streaming = this.GetServerStreamingContext<int>();
            foreach (var x in Enumerable.Repeat(value, count))
                await streaming.WriteAsync(x);

            //--- 完了信号を返す
            return streaming.Result();
        }
    }
}

ポイントは以下の 4 点です。

  • GetServerStreamingContext<T> からストリーミング通信するためのコンテキストを取得
  • WriteAsync メソッドで値をクライアント側に返す (= プッシュ配信)
  • Result メソッドで完了通知を送る
  • プッシュ配信はサーバー側のメソッドが呼び出されてから終了するまでの間に行う

SignalR などでの開発をしたことがある方は、ここでひとつ疑問に思うかもしれません。たとえば「Unary 通信をしている途中で Server Streaming で複数のユーザーに結果を返したいときはどうしたら良いのか」などです。今回は説明を割愛しますが、ちょこっと手間をかければ可能です。これについては別途解説したいと思います。

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- サーバーが WriteAsync すると ForEachAsync が動く
            //--- サーバーから完了信号が送られると ForEachAsync が終了する
            var streaming = await client.Repeat(3, 5);
            await streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Result : {x}"));

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

サーバーからの送信されたメッセージは ResponseStream から流れてきます。標準ではこれを MoveNext / Current で読み出すのですが、これは結構に煩雑な書き方です。なので MagicOnion では簡略化した方法として ForEachAsync メソッドを用意しています。ForEachAsync メソッドを利用すれば、await が完了するまでストリームを読み続けることができます。

実行してみる

実行すると以下のような結果が得られます。思い通りの動きをしていますね。

f:id:xin9le:20170606011225p:plain

gRPC / MagicOnion 入門 (4) - Unary 通信

ここから実際に gRPC で API を作成し、クライアント/サーバー間で通信していきます。まず「gRPC / MagicOnion 入門 (2) - 4 種類の通信方式」で紹介した最も簡単な Unary 通信から見て行きます。

f:id:xin9le:20170604153513p:plain

Step.1 - サービス定義

最初のステップでは、サーバー側にどのような API を用意するのか、そのサービスのインターフェースを決定していきます。例えば以下のような感じになります。

using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        UnaryResult<int> Sum(int x, int y);
    }
}

非常にシンプルなインターフェースです。ポイントは以下の 3 点のみです。

  • IService<T> インターフェースを実装する
  • IService<T> の型引数には自身のインターフェースを入れる
  • メソッドの戻り値を UnaryResult<T> にする

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装する形でサービスの中身を書いていきます。今回は加算メソッドなので、例えば以下のような感じになります。

using System;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        //--- async/await をそのまま書ける
        public async UnaryResult<int> Sum(int x, int y)
        {
            Console.WriteLine($"(x, y) = ({x}, {y})");
            await Task.Delay(10);  // 何か非同期処理したり
            return x + y;
        }

        /*
        //--- 「await がないぞ!」と警告されるけれど、これでも OK
        //--- プロジェクトレベルで警告 CS1998 を抑制するのがオススメです
        public async UnaryResult<int> Sum(int x, int y)
            => x + y;

        //--- async を使わない書き方もできるけれどスマートじゃないので not recommend
        public UnaryResult<int> Sum(int x, int y)
            => new UnaryResult<int>(x + y);
        */
    }
}

実装も非常にシンプルだと思います。ポイントは以下の 3 点です。

  • ServiceBase<T> を継承する
  • サービス定義インターフェースを実装する
  • ServiceBase<T> の型引数にはサービス定義のインターフェースを入れる

また、UnaryResult<T> は C# 7.0 で追加された Task-like に対応しているため、C# 7.0 以降であれば Task<T> 抜きで自然に async/await を記述することができます。

作者である @neuecc さんによる解説記事はこちら。

neue cc - C# 7.0 custom task-like の正しいフレームワークでの利用法

ちなみに「async キーワードがあるのにメソッド本体に await がない」という警告 (CS1998) をプロジェクトレベルで抑制する場合は、プロジェクトのプロパティで以下のように設定します。

f:id:xin9le:20170605233428p:plain

Step.3 - gRPC サーバーを起動

ここまでで API 本体の実装が終わったので、次に gRPC サーバーを起動します。以下のようなコードを書けば OK です。

using System;
using Grpc.Core;
using MagicOnion.Server;

namespace MagicOnionSample.Service
{
    class Program
    {
        static void Main()
        {
            //--- ここで動的にアセンブリを解釈し、通信されてきたデータと API 本体とのマップを作る
            var service = MagicOnionEngine.BuildServerServiceDefinition();

            //--- API を公開する IP / Port / 認証情報などを決定し、gRPC サーバーを起動
            var port = new ServerPort("localhost", 12345, ServerCredentials.Insecure);
            var server = new Server{ Services = { service }, Ports = { port } };
            server.Start();

            //--- exe が終了しちゃわないように
            Console.ReadLine();
        }
    }
}

これでサーバー側の実装はすべて完了です。実行すれば localhost:12345 で gRPC サーバーが起動します。

Step.4 - クライアントの実装

最後に、Step.3 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API が公開されている IP / Port / 認証情報を設定して通信路を生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);

            //--- 指定されたサービス定義用のクライアントを作成
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- RPC 形式で超お手軽呼び出し
            var result = await client.Sum(1, 2);

            //--- 結果表示
            Console.WriteLine($"Result : {result}");
            Console.ReadLine();
        }
    }
}

見た通り、非常にシンプルな書き心地でサーバー側のメソッドを叩くことができます。

実行してみる

実行してみると (当然ですが) 以下のような結果が得られます。

f:id:xin9le:20170604231733p:plain

あとは同様の手順で API を増やしていくだけです。簡単ですね!