xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (9) - 明示的にステータスコードを返す

HTTP/1.1 にステータスコードというレスポンスの意味を表す数値があったように、HTTP/2 をベースとする gRPC にもステータスコードがあります。これまでの解説ではレスポンスとして正常値のみを返していたので、HTTP/1.1 のステータスコードで言うところの 200/OK にあたるものが返っていました。今回はエラーハンドリングなどをした際に利用する、独自のステータスコードを返す方法について見ていきます。

サーバー側でステータスコードを返す

サーバー側からステータスコードを返すのは非常に簡単で、ReturnStatusCode メソッドを利用するだけです。例えば以下のように、エラーが発生したときだけエラーステータスを返します。

using System;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async UnaryResult<Nil> Sample()
        {
            try
            {
                //--- 何かエラーが起こったことにする
                throw new Exception("エラーだよ ☆(ゝω・)vキャピ");
                return Nil.Default;  //--- 正常系
            }
            catch (Exception ex)
            {
                //--- 異常系ではステータスコード + エラー詳細を返す
                return this.ReturnStatusCode<Nil>((int)StatusCode.Internal, ex.Message);
            }
        }
    }
}

正常系の場合、ステータスコードは StatusCode.OK が戻されます。また、サーバー側で不意のエラーが発生した場合は適切にハンドリングされて StatusCode.Unknown が返されます。

public async UnaryResult<Nil> Sample()
    => throw new Exception("エラーだよ ☆(ゝω・)vキャピ");

// Status(StatusCode=Unknown, Detail="Exception was thrown by handler.")

クライアント側でステータスコードを取得する

サーバーから送信されてきたステータスコードをクライアントで取得する場合は、以下のように GetStatus メソッドを呼び出します。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- 重要 : ヘッダーを取得してからステータスコードを読む
            var call = client.Sample();
            var header = await call.ResponseHeadersAsync;
            var status = call.GetStatus();

            Console.WriteLine(status);
            // Status(StatusCode=Internal, Detail="エラーだよ☆(ゝω・)vキャピ")

            Console.ReadLine();
        }
    }
}

重要なポイントは ResponseHeadersAsyncawait してから GetStatus をする必要があるということです。ResponseHeadersAsync を呼び出すことなしに GetStatus を呼び出すと例外が発生します。また、OK 以外のステータスコードが送られているレスポンスに対して値を読み出そうとしても例外が発生します。

try
{
    var result = await client.Sample();

    //--- 上の書き方は以下のショートカット記法
    //var call = client.Sample();
    //var result = await call.ResponseAsync;
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
    // Status(StatusCode=Internal, Detail="エラーだよ☆(ゝω・)vキャピ")
}

ステータスコードの種類

gRPC が提供している定義済みステータスコードは (執筆時点で) 全 17 種類あります。

コード 説明
OK 0 正常終了
Cancelled 1 操作がキャンセルされた
(通常、クライアント側からの要求で)
Unknown 2 不明なエラー
InvalidArgument 3 クライアント側が不適切な引数を与えた
DeadlineExceeded 4 操作完了前に期限切れ
NotFound 5 要求されたものが見つからなかった
AlreadyExists 6 作成しようとしたものが既に存在する
PermissionDenied 7 操作を実行する権限がない
ResourceExhausted 8 リソースが使い果たされている / 容量不足
FailedPrecondition 9 操作が実行可能な状態にないため拒否された
Aborted 10 中止された
OutOfRange 11 有効な範囲を超えて操作しようとした
Unimplemented 12 指定された操作が未実装 / 未サポート
Internal 13 内部的なエラー
Unavailable 14 現在サービスを利用できない
DataLoss 15 回復不能なデータの損失または破損
Unauthenticated 16 有効な認証資格がない

もちろんこれらは定義済みというだけなので、それ以外の独自コードを定義して送信することも可能です。ReturnStatusCode メソッドの引数が int になっているのはそのためです。

また、gRPC のフレームワークが返す可能性のあるステータスコードも以下にまとまっています。もし見たことのないステータスコードを目撃したら、gRPC 自体が原因かどうかを調べるために使ってみてください。

gRPC / MagicOnion 入門 (8) - 独自型を送受信する

ここまで gRPC / MagicOnion を基本的な通信方法について見てきました。しかし、送受信に利用した型はすべて Primitive 型ばかりでした。独自に定義した型で通信したいと思うのは当然!ということで、今回は独自型でサーバー / クライアント間のやり取りをする方法について解説していきます。

MessagePack for C# のルールに従う

MagicOnion で通信されるデータのシリアライズには MessagePack for C# が利用されています。これにより MessagePack for C# が提供する超高速なシリアライズの恩恵を受けられるだけでなく IDL も不要になるため、.NETer が gRPC を使う環境としては最高に快適なものとなっています。

MagicOnion のシリアライザとして MessagePack for C# が利用されているということは、独自型をやり取りする場合も MessagePack for C# のルールに則るということになります。

独自型を定義する

通信でやりとりする独自型はサーバーとクライアントで型が共有されなければなりません。なので、サービス定義のプロジェクト内 (もしくはそれに類するサーバーとクライアントで共有可能なプロジェクト) に型を定義します。例えば以下のようにします。

using MessagePack;

namespace MagicOnionSample.ServiceDefinition
{
    //--- MessagePack for C# によるシリアライズ対象であることマーク
    [MessagePackObject]
    public struct Vector2
    {
        //--- バイナリのレイアウトの順番を設定
        [Key(0)]
        public float X { get; }

        [Key(1)]
        public float Y { get; }

        //--- デシリアライズに使うコンストラクタであることをマーク
        //--- ※ルールに沿っていれば必須ではない
        [SerializationConstructor]
        public Vector2(float x, float y)
        {
            this.X = x;
            this.Y = y;
        }
    }
}

MessagePack for C# のルールに則った記述をソラでするのはなかなかに厳しいと思います。そんなときに便利なのが MessagePackAnalyzer です。サービス定義のプロジェクトに NuGet からアナライザーを取得して追加しておきましょう。

PM> Install-Package MessagePackAnalyzer

MessagePack for C# に関する詳細は README を参照ください。

独自型を利用して通信する

独自型の定義ができたので、これを利用して実装してみましょう。例えば以下のようにすれば OK です。

using MagicOnion;
using MessagePack;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        //--- 独自型を使ったサービス定義
        UnaryResult<Nil> Sample(Vector2 point);
    }
}
using System;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        //--- そのまま受け取れます
        public async UnaryResult<Nil> Sample(Vector2 point)
        {
            Console.WriteLine($"(x, y) = ({point.X}, {point.Y})");
            return Nil.Default;  //--- 独自型を返せます
        }
    }
}
using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- そのまま送受信できます
            var point = new Vector2(3, 5);
            var result = await client.Sample(point);
            Console.WriteLine(result.GetType().FullName);

            Console.ReadLine();
        }
    }
}

Nil という初出の型がありますが、これは MessagePack for C# に定義されている「何もない」を表現する型です。Rx を知っている方であれば Unit 型と同じようなものと言えば良いでしょうか。実行結果は以下のようになります。

f:id:xin9le:20170612013336p:plain

まとめ

MessagePack for C# のルールに則って独自型を定義するだけで難なくサーバー/クライアント間でやりとりすることができるようになりました。今回は説明していませんが、独自型のネストやコレクションにも対応しています。結局のところ gRPC は HTTP/2 上でバイナリデータを送受信しているだけなので、MessagePack for C# でシリアライズ / デシリアライズできるものであれば何だって通信できます。

gRPC / MagicOnion 入門 (7) - Duplex Streaming 通信

前回前々回と Server Streaming 通信と Client Streaming 通信について解説してきました。今回はこれらを合わせた通信方法である Duplex Streaming 通信について見ていきます。

f:id:xin9le:20170604153558p:plain

Step.1 - サービス定義

いつも通り、まずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<DuplexStreamingResult<int, int>> DuplexSample();
      //Task<DuplexStreamingResult<int, int>> DuplexSample(string arg);  // 引数はダメ
    }
}

戻り値の型を Task<DuplexStreamingResult<TRequest, TResponse>> とするのがポイントです。また、Client Streaming のときと同様にメソッドに引数を設定できないので注意が必要です。

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<DuplexStreamingResult<int, int>> DuplexSample()
        {
            var streaming = this.GetDuplexStreamingContext<int, int>();
            var task = streaming.ForEachAsync(async x =>
            {
                //--- クライアントから送信された値が偶数だったら 2 倍にして返してみたり
                Console.WriteLine($"Received : {x}");
                if (x % 2 == 0)
                    await streaming.WriteAsync(x * 2);
            });

            //--- サーバー側から任意のタイミングで送信してみたり
            await Task.Delay(100);  // テキトーにずらしたり
            await streaming.WriteAsync(123);
            await streaming.WriteAsync(456);

            //--- メッセージの受信がすべて終わるまで待つ
            await task;

            //--- サーバーからの送信が完了したことを通知
            return streaming.Result();
        }
    }
}

ポイントは以下の 5 点です。これまで Server Streaming と Client Streaming で見てきたもののミックスになっているので結構複雑です。

  • GetDuplexStreamingContext<TRequest, TResponse> からストリーミング通信するためのコンテキストを取得
  • クライアント側から送信されるメッセージを ForEachAsync で受信
  • クライアント側から送信完了通知が来るまでメッセージを受信し続ける
  • WriteAsync でサーバー側からメッセージを送信
  • Result メソッドで完了通知を送信

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。こちらもサーバー側と同様に、受信と送信が入り乱れるので難易度が高いです。

using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- ForEachAsync でサーバーからのメッセージを受信
            var streaming = await client.DuplexSample();
            var task = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Response : {x}"));

            //--- WriteAsync でサーバー側にメッセージを送信
            //--- CompleteAsync で送信完了を通知
            foreach (var x in Enumerable.Range(0, 5))
                await streaming.RequestStream.WriteAsync(x);
            await streaming.RequestStream.CompleteAsync();

            //--- メッセージの受信完了を待機
            await task;

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

実行してみる

実行すると以下のような結果が得られます。結構に複雑なので、実際にデバッガーでひとつずつステップ実行して試してみることをおすすめします。

f:id:xin9le:20170611235710p:plain

gRPC / MagicOnion 入門 (6) - Client Streaming 通信

前回は Server Streaming 通信について見ていきました。今回は Client 側から連続的にデータ送信を行う Client Streaming について解説していきます。

f:id:xin9le:20170604153550p:plain

Step.1 - サービス定義

今回もまずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<ClientStreamingResult<int, int>> SplitUpload();
      //Task<ClientStreamingResult<int, int>> SplitUpload(string arg);  // 引数はダメ
    }
}

戻り値の型を Task<ClientStreamingResult<TRequest, TResponse>> とするのがポイントです。ちなみに、これは gRPC の制限によるものであったと記憶していますが、Client Streaming の際にはメソッドに引数を設定することができないので注意が必要です。

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<ClientStreamingResult<int, int>> SplitUpload()
        {
            //--- クライアント側が WriteAsync するたびに呼び出される
            //--- CompleteAsync されるまでメッセージを受信し続ける
            var streaming = this.GetClientStreamingContext<int, int>();
            var sum = 0;
            await streaming.ForEachAsync(x =>
            {
                Console.WriteLine($"Received = {x}");
                sum += x;
            });

            //--- 結果を返す
            return streaming.Result(sum);
        }
    }
}

ポイントは以下の 4 点です。

  • GetClientStreamingContext<TRequest, TResponse> からストリーミング通信するためのコンテキストを取得
  • クライアント側から送信されるメッセージを ForEachAsync で受信
  • クライアント側から送信完了通知が来るまでメッセージを受信し続ける
  • Result メソッドで結果を送信

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。

using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- WriteAsync するとサーバー側の ForEachAsync が動く
            var streaming = await client.SplitUpload();
            foreach (var x in Enumerable.Range(1, 4))
                await streaming.RequestStream.WriteAsync(x);

            //--- 完了通知
            //--- これによりサーバー側の ForEachAsync が終了する
            await streaming.RequestStream.CompleteAsync();

            //--- サーバーからの結果を取得
            var response = await streaming.ResponseAsync;
            Console.WriteLine($"Response = {response}");

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

RequestStreamWriteAsync メソッドでサーバー側へメッセージを送信します。これによってサーバー側に ForEachAsync メソッドが呼び出されます。クライアント側は、必要なデータの送信がすべて完了したら CompleteAsync メソッドを呼び出してやる必要があります。そうしないとサーバー側はいつまでメッセージの受信待機を止めてよいか分からないからです。このような呼び出しのルールだけ守ってやれば難しくないでしょう。

実行してみる

実行すると以下のような結果が得られます。思い通りの動きをしていますね。

f:id:xin9le:20170611180036p:plain

gRPC / MagicOnion 入門 (5) - Server Streaming 通信

前回は最も簡単な Unary 通信について解説しました。今回はサーバーからのプッシュ配信を行うための Server Streaming 通信について見ていきます。

f:id:xin9le:20170604153542p:plain

Step.1 - サービス定義

まずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<ServerStreamingResult<int>> Repeat(int value, int count);
    }
}

ポイントは以下の通りですが、Unary 通信のときと比べ戻り値以外の違いはありません。

  • IService<T> インターフェースを実装する
  • IService<T> の型引数には自身のインターフェースを入れる
  • メソッドの戻り値を Task<ServerStreamingResult<T>> にする

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Linq;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<ServerStreamingResult<int>> Repeat(int value, int count)
        {
            Console.WriteLine($"(value, count) = ({value}, {count})");

            //--- WriteAsync するたびにレスポンスが返る
            var streaming = this.GetServerStreamingContext<int>();
            foreach (var x in Enumerable.Repeat(value, count))
                await streaming.WriteAsync(x);

            //--- 完了信号を返す
            return streaming.Result();
        }
    }
}

ポイントは以下の 4 点です。

  • GetServerStreamingContext<T> からストリーミング通信するためのコンテキストを取得
  • WriteAsync メソッドで値をクライアント側に返す (= プッシュ配信)
  • Result メソッドで完了通知を送る
  • プッシュ配信はサーバー側のメソッドが呼び出されてから終了するまでの間に行う

SignalR などでの開発をしたことがある方は、ここでひとつ疑問に思うかもしれません。たとえば「Unary 通信をしている途中で Server Streaming で複数のユーザーに結果を返したいときはどうしたら良いのか」などです。今回は説明を割愛しますが、ちょこっと手間をかければ可能です。これについては別途解説したいと思います。

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- サーバーが WriteAsync すると ForEachAsync が動く
            //--- サーバーから完了信号が送られると ForEachAsync が終了する
            var streaming = await client.Repeat(3, 5);
            await streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Result : {x}"));

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

サーバーからの送信されたメッセージは ResponseStream から流れてきます。標準ではこれを MoveNext / Current で読み出すのですが、これは結構に煩雑な書き方です。なので MagicOnion では簡略化した方法として ForEachAsync メソッドを用意しています。ForEachAsync メソッドを利用すれば、await が完了するまでストリームを読み続けることができます。

実行してみる

実行すると以下のような結果が得られます。思い通りの動きをしていますね。

f:id:xin9le:20170606011225p:plain