xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (15) - Unary 通信中にプッシュ配信

gRPC におけるサーバーからの Push 通知は Server Streaming 通信を用いて行う、というのを以前解説しました。

ですが、この方法では Server Streaming 専用の API を呼び出さないと Push 通知が行われません。一般的には、何か単発の API (gRPC だと Unary) 呼び出しを起点として Push 通知したいというケースがほとんどです。このようなことをしようとした場合、素の gRPC だと結構大変な実装をしなければならないのですが、MagicOnion は簡単に実現するための機能を搭載しているので安心です。

今回はそんな Unary 通信中に Push 配信するための実装方法について見ていきます。

サーバー側

チャット風なものを作る想定で、API の定義を以下のような感じにします。参加 / 退出 / 送信 / 受信の API があるイメージですね。

using System.Threading.Tasks;
using MagicOnion;
using MessagePack;

namespace MagicOnionSample.ServiceDefinition
{
    public interface IChatApi : IService<IChatApi>
    {
        UnaryResult<Nil> Join();  // チャットルームへの参加
        UnaryResult<Nil> Unjoin();  // チャットルームから退出
        UnaryResult<Nil> Send(string message);  // メッセージ送信
        Task<ServerStreamingResult<string>> OnReceive();  // push 配信によるメッセージ受信
    }
}

下はその実装例です。短い割に結構複雑...な印象かもしれません。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class ChatApi : ServiceBase<IChatApi>, IChatApi
    {
        //--- static に ServerStreamingContext をキャッシュ
        private static StreamingContextRepository<IChatApi> cache;

        public async UnaryResult<Nil> Join()
        {
            //--- ストリーミング情報をキャッシュする入れ物を作る
            var context = this.GetConnectionContext();
            cache = new StreamingContextRepository<IChatApi>(context);

            Console.WriteLine($"{context.ConnectionId} is joined.");
            return Nil.Default;
        }

        public async UnaryResult<Nil> Unjoin()
        {
            //--- ストリーミングを切断
            cache.Dispose();
            cache = null;

            //--- ここはメッセージを出したいだけなので重要じゃない
            var context = this.GetConnectionContext();
            Console.WriteLine($"{context.ConnectionId} is unjoined.");

            return Nil.Default;
        }

        public async UnaryResult<Nil> Send(string text)
        {
            //--- キャッシュしているストリーミング情報 (OnReceive) を使って push 配信
            Console.WriteLine($"ChatApi.Send : {text}");
            await cache.WriteAsync(x => x.OnReceive, text);
            return Nil.Default;
        }

        public async Task<ServerStreamingResult<string>> OnReceive()
        {
            //--- このメソッド (OnReceive) のストリーミング情報を登録
            Console.WriteLine($"Start receiving...");
            var result = await cache.RegisterStreamingMethod(this, this.OnReceive);

            //--- ここは cache.Dispose() されるまで通りません!
            Console.WriteLine($"Stop receiving...");
            return result;
        }
    }
}

ここで重要なのが StreamingContextRepository<TService> という静的キャッシュ機構です。なぜこのようなキャッシュが必要かというと、そもそも gRPC の Server Streaming 接続は Server Streaming API (今回の例でいうと OnReceive) が終わるまでの間 push 配信が可能だからです。裏を返せば OnReceive メソッドが終了したら push 配信はできなくなります

という基本/前提があるので、別 API から push 配信をするためには以下の 2 点を守る必要が出てきます。

  • ストリーミング API (OnReceive) を終了させないで待機させる
  • ストリーミング接続情報を静的キャッシュして、別 API (Send) からアクセスできるようにする

そして、これらの要件をサポートしているのが StreamingContextRepository<TService> です。先の OnReceive メソッド内に記述されている RegisterStreamingMethod でストリーミング接続を登録/記憶させます。このメソッドは StreamingContextRepository.Dispose (今回の例でいうと Unjoin) が呼び出されるまで待機し続けるため、ひとつ目の要件を満たすことができます。こうやって待機させている間に、静的キャッシュを介してサーバー push を行うというわけです。

クライアント側

難しいのはサーバー側だけで、クライアント側の実装は簡単です。サーバー側の API を叩く順番だけ気を付けましょう。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- いつもの前準備
            Console.Title = "MagicOnionSample.Client";
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var context = new ChannelContext(channel, () => "xin9le");
            await context.WaitConnectComplete();
            var client = context.CreateClient<IChatApi>();

            //--- チャットルームに参加
            Console.WriteLine("Step.1 : Join");
            await client.Join();

            //--- メッセージ受信の登録
            Console.WriteLine("Step.2 : Register receiving");
            var streaming = await client.OnReceive();
            var receiveTask = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Received : {x}"));

            //--- メッセージを投げてみる
            Console.WriteLine("Step.3 : Send 1st message");
            await client.Send("あいうえお");

            Console.WriteLine("Step.4 : Send 2nd message");
            await client.Send("ABCDE");

            //--- チャットルームから退出
            Console.WriteLine("Step.5 : Unjoin");
            await client.Unjoin();

            //--- ちゃんと終わるの待ちましょう
            Console.WriteLine("Step.6 : Waiting for completion");
            await receiveTask;

            Console.ReadLine();
        }
    }
}

実行してみる

実装したものを実行してみると、以下のようなログが表示されます。呼び出される順序を確認してみてください。

f:id:xin9le:20180319013751p:plain

私がこれを勉強したとき、腑に落ちるまで結構時間がかかりました...!けれど、gRPC の Server Streaming の特性とそれをサポートする StreamingContextRepository を押さえられれば大丈夫です。