xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (16) - 複数ユーザーへのプッシュ配信

前回は Unary 通信中にサーバープッシュをする方法を解説しました。しかしそれは 1 つのクライアントに対してプッシュ配信する例だったので、今回は複数クライアントに対してプッシュする方法について見ていきます。

勘のいい方にはパッと分かるかもしれませんが、前回静的キャッシュしていた StreamingContextRepository<TService> を複数キャッシュすればいいだけですね!

サーバー側

サービス側の実装を複数クライアントの接続をキャッシュできるように書き換えます。安直に実装すれば ConcurrentDictionary を利用することになりますが、MagicOnion はこれをラップして便利に扱えるような機能を提供しています。それが下記の例にある StreamingContextGroup<TKey, TService> です。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class ChatApi : ServiceBase<IChatApi>, IChatApi
    {
        //--- StreamingContextRepository をコレクション管理してくれるヤツ
        private static StreamingContextGroup<string, IChatApi> cache = new StreamingContextGroup<string, IChatApi>();

        public async UnaryResult<Nil> Join()
        {
            var context = this.GetConnectionContext();
            cache.Add(context.ConnectionId, new StreamingContextRepository<IChatApi>(context));  // コレクションに追加
            return Nil.Default;
        }

        public async UnaryResult<Nil> Unjoin()
        {
            var context = this.GetConnectionContext();
            cache.Get(context.ConnectionId).Dispose();  // 切断して
            cache.Remove(context.ConnectionId);  // 削除
            return Nil.Default;
        }

        public async UnaryResult<Nil> Send(string text)
        {
            //--- コレクションで管理されている全クライアントに送信
            Console.WriteLine(text);
            await cache.BroadcastAllAsync(x => x.OnReceive, text);
            return Nil.Default;
        }

        public async Task<ServerStreamingResult<string>> OnReceive()
        {
            var context = this.GetConnectionContext();
            var repo = cache.Get(context.ConnectionId);
            var result = await repo.RegisterStreamingMethod(this, this.OnReceive);  // サーバーストリーミング登録
            return result;
        }
    }
}

StreamingContextGroup は全ユーザーに送信する BroadcastAllAsync 以外にも、対象を絞って送信するメソッドも用意されています。用途に応じて使い分けてください。

また注意が必要なのが、StreamingContextGroup.Remove メソッドが内部で StreamingContextRepository.Dispose メソッドを呼び出さないことです。キッチリ自前で Dispose を呼び出し、切断の管理をしましょう。

クライアント側

クライアント側の実装は特に変更なく、デモ用に微調整している程度です。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            Console.Title = "MagicOnionSample.Client";
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);

            var name = Console.ReadLine();  // 分かりやすいように名前を入力
            var context = new ChannelContext(channel, () => name);
            await context.WaitConnectComplete();
            var client = context.CreateClient<IChatApi>();

            await client.Join();
            var streaming = await client.OnReceive();
            var receiveTask = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine(x));
            await Task.Delay(10000);  // デモ用にちょっと待つ
            await client.Send($"from {name}");
            await Task.Delay(10000);  // デモ用にちょっと待つ
            await client.Unjoin();
            await receiveTask;

            Console.ReadLine();
        }
    }
}

実行してみる

上記を実行してみると、以下のような感じの結果を得ることができます。ちゃんと複数人にメッセージが飛んでいますね!

f:id:xin9le:20180321010156p:plain