xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (16) - 複数ユーザーへのプッシュ配信

前回は Unary 通信中にサーバープッシュをする方法を解説しました。しかしそれは 1 つのクライアントに対してプッシュ配信する例だったので、今回は複数クライアントに対してプッシュする方法について見ていきます。

勘のいい方にはパッと分かるかもしれませんが、前回静的キャッシュしていた StreamingContextRepository<TService> を複数キャッシュすればいいだけですね!

サーバー側

サービス側の実装を複数クライアントの接続をキャッシュできるように書き換えます。安直に実装すれば ConcurrentDictionary を利用することになりますが、MagicOnion はこれをラップして便利に扱えるような機能を提供しています。それが下記の例にある StreamingContextGroup<TKey, TService> です。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class ChatApi : ServiceBase<IChatApi>, IChatApi
    {
        //--- StreamingContextRepository をコレクション管理してくれるヤツ
        private static StreamingContextGroup<string, IChatApi> cache = new StreamingContextGroup<string, IChatApi>();

        public async UnaryResult<Nil> Join()
        {
            var context = this.GetConnectionContext();
            cache.Add(context.ConnectionId, new StreamingContextRepository<IChatApi>(context));  // コレクションに追加
            return Nil.Default;
        }

        public async UnaryResult<Nil> Unjoin()
        {
            var context = this.GetConnectionContext();
            cache.Get(context.ConnectionId).Dispose();  // 切断して
            cache.Remove(context.ConnectionId);  // 削除
            return Nil.Default;
        }

        public async UnaryResult<Nil> Send(string text)
        {
            //--- コレクションで管理されている全クライアントに送信
            Console.WriteLine(text);
            await cache.BroadcastAllAsync(x => x.OnReceive, text);
            return Nil.Default;
        }

        public async Task<ServerStreamingResult<string>> OnReceive()
        {
            var context = this.GetConnectionContext();
            var repo = cache.Get(context.ConnectionId);
            var result = await repo.RegisterStreamingMethod(this, this.OnReceive);  // サーバーストリーミング登録
            return result;
        }
    }
}

StreamingContextGroup は全ユーザーに送信する BroadcastAllAsync 以外にも、対象を絞って送信するメソッドも用意されています。用途に応じて使い分けてください。

また注意が必要なのが、StreamingContextGroup.Remove メソッドが内部で StreamingContextRepository.Dispose メソッドを呼び出さないことです。キッチリ自前で Dispose を呼び出し、切断の管理をしましょう。

クライアント側

クライアント側の実装は特に変更なく、デモ用に微調整している程度です。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            Console.Title = "MagicOnionSample.Client";
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);

            var name = Console.ReadLine();  // 分かりやすいように名前を入力
            var context = new ChannelContext(channel, () => name);
            await context.WaitConnectComplete();
            var client = context.CreateClient<IChatApi>();

            await client.Join();
            var streaming = await client.OnReceive();
            var receiveTask = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine(x));
            await Task.Delay(10000);  // デモ用にちょっと待つ
            await client.Send($"from {name}");
            await Task.Delay(10000);  // デモ用にちょっと待つ
            await client.Unjoin();
            await receiveTask;

            Console.ReadLine();
        }
    }
}

実行してみる

上記を実行してみると、以下のような感じの結果を得ることができます。ちゃんと複数人にメッセージが飛んでいますね!

f:id:xin9le:20180321010156p:plain

gRPC / MagicOnion 入門 (15) - Unary 通信中にプッシュ配信

gRPC におけるサーバーからの Push 通知は Server Streaming 通信を用いて行う、というのを以前解説しました。

ですが、この方法では Server Streaming 専用の API を呼び出さないと Push 通知が行われません。一般的には、何か単発の API (gRPC だと Unary) 呼び出しを起点として Push 通知したいというケースがほとんどです。このようなことをしようとした場合、素の gRPC だと結構大変な実装をしなければならないのですが、MagicOnion は簡単に実現するための機能を搭載しているので安心です。

今回はそんな Unary 通信中に Push 配信するための実装方法について見ていきます。

サーバー側

チャット風なものを作る想定で、API の定義を以下のような感じにします。参加 / 退出 / 送信 / 受信の API があるイメージですね。

using System.Threading.Tasks;
using MagicOnion;
using MessagePack;

namespace MagicOnionSample.ServiceDefinition
{
    public interface IChatApi : IService<IChatApi>
    {
        UnaryResult<Nil> Join();  // チャットルームへの参加
        UnaryResult<Nil> Unjoin();  // チャットルームから退出
        UnaryResult<Nil> Send(string message);  // メッセージ送信
        Task<ServerStreamingResult<string>> OnReceive();  // push 配信によるメッセージ受信
    }
}

下はその実装例です。短い割に結構複雑...な印象かもしれません。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class ChatApi : ServiceBase<IChatApi>, IChatApi
    {
        //--- static に ServerStreamingContext をキャッシュ
        private static StreamingContextRepository<IChatApi> cache;

        public async UnaryResult<Nil> Join()
        {
            //--- ストリーミング情報をキャッシュする入れ物を作る
            var context = this.GetConnectionContext();
            cache = new StreamingContextRepository<IChatApi>(context);

            Console.WriteLine($"{context.ConnectionId} is joined.");
            return Nil.Default;
        }

        public async UnaryResult<Nil> Unjoin()
        {
            //--- ストリーミングを切断
            cache.Dispose();
            cache = null;

            //--- ここはメッセージを出したいだけなので重要じゃない
            var context = this.GetConnectionContext();
            Console.WriteLine($"{context.ConnectionId} is unjoined.");

            return Nil.Default;
        }

        public async UnaryResult<Nil> Send(string text)
        {
            //--- キャッシュしているストリーミング情報 (OnReceive) を使って push 配信
            Console.WriteLine($"ChatApi.Send : {text}");
            await cache.WriteAsync(x => x.OnReceive, text);
            return Nil.Default;
        }

        public async Task<ServerStreamingResult<string>> OnReceive()
        {
            //--- このメソッド (OnReceive) のストリーミング情報を登録
            Console.WriteLine($"Start receiving...");
            var result = await cache.RegisterStreamingMethod(this, this.OnReceive);

            //--- ここは cache.Dispose() されるまで通りません!
            Console.WriteLine($"Stop receiving...");
            return result;
        }
    }
}

ここで重要なのが StreamingContextRepository<TService> という静的キャッシュ機構です。なぜこのようなキャッシュが必要かというと、そもそも gRPC の Server Streaming 接続は Server Streaming API (今回の例でいうと OnReceive) が終わるまでの間 push 配信が可能だからです。裏を返せば OnReceive メソッドが終了したら push 配信はできなくなります

という基本/前提があるので、別 API から push 配信をするためには以下の 2 点を守る必要が出てきます。

  • ストリーミング API (OnReceive) を終了させないで待機させる
  • ストリーミング接続情報を静的キャッシュして、別 API (Send) からアクセスできるようにする

そして、これらの要件をサポートしているのが StreamingContextRepository<TService> です。先の OnReceive メソッド内に記述されている RegisterStreamingMethod でストリーミング接続を登録/記憶させます。このメソッドは StreamingContextRepository.Dispose (今回の例でいうと Unjoin) が呼び出されるまで待機し続けるため、ひとつ目の要件を満たすことができます。こうやって待機させている間に、静的キャッシュを介してサーバー push を行うというわけです。

クライアント側

難しいのはサーバー側だけで、クライアント側の実装は簡単です。サーバー側の API を叩く順番だけ気を付けましょう。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- いつもの前準備
            Console.Title = "MagicOnionSample.Client";
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var context = new ChannelContext(channel, () => "xin9le");
            await context.WaitConnectComplete();
            var client = context.CreateClient<IChatApi>();

            //--- チャットルームに参加
            Console.WriteLine("Step.1 : Join");
            await client.Join();

            //--- メッセージ受信の登録
            Console.WriteLine("Step.2 : Register receiving");
            var streaming = await client.OnReceive();
            var receiveTask = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Received : {x}"));

            //--- メッセージを投げてみる
            Console.WriteLine("Step.3 : Send 1st message");
            await client.Send("あいうえお");

            Console.WriteLine("Step.4 : Send 2nd message");
            await client.Send("ABCDE");

            //--- チャットルームから退出
            Console.WriteLine("Step.5 : Unjoin");
            await client.Unjoin();

            //--- ちゃんと終わるの待ちましょう
            Console.WriteLine("Step.6 : Waiting for completion");
            await receiveTask;

            Console.ReadLine();
        }
    }
}

実行してみる

実装したものを実行してみると、以下のようなログが表示されます。呼び出される順序を確認してみてください。

f:id:xin9le:20180319013751p:plain

私がこれを勉強したとき、腑に落ちるまで結構時間がかかりました...!けれど、gRPC の Server Streaming の特性とそれをサポートする StreamingContextRepository を押さえられれば大丈夫です。

gRPC / MagicOnion 入門 (14) - 接続ユーザーを特定する

gRPC で通信を行う際、サーバー側でアクセスしてきているユーザーを特定したいケースは多々あります。これを実現する最も基本的で素朴な方法が、gRPC / MagicOnion 入門 (10) - ヘッダーの利用 で解説したヘッダーにユーザー固有の ID を埋め込むことです。

そのままやろうとすると結構手間な実装になりますが、MagicOnion はこれを簡単に実現できるよう ConnectionId の概念をサポートしてくれています。今回はその機能について見ていきます。

クライアントから ConnectionId を送信する

クライアントから ConnectionId を送信する場合、これまで紹介してきた方法と若干違う方法で初期化する必要があります。以下のように ChannelContext でラップしつつ、通信を行います。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);

            //--- ChannelContext でチャンネルとユーザー固有の ID をラップ
            var connectionId = "なにかしらユーザー固有のID";
            var context = new ChannelContext(channel, () => connectionId);
            await context.WaitConnectComplete();  // 接続待ち

            //--- API クライアントを生成して通信
            var client = context.CreateClient<ISampleApi>();
            var result = await client.Sample();
            Console.ReadLine();
        }
    }
}

サーバー側で ConnectionId を取り出す

サーバー側で ConnectionId を取り出すのは非常に簡単で、以下のようにします。

using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async UnaryResult<Nil> Sample()
        {
            //--- こんな感じで取り出せます
            //--- たったこれだけ!
            var connectionId = this.GetConnectionContext().ConnectionId;
            return Nil.Default;
        }
    }
}

この ConnectionId の仕組みも、実はヘッダーを利用して行われています。MagicOnion が面倒な部分を上手くラップしてくれていることもあり、簡単に使えるのでオススメです。

ImageMagick でお手軽 TGA → PNG 変換

業務で .tga で納品される大量のファイルを一括して .png に変換したいということがあったので、ImageMagick Converter を使ってチャチャっとやってみた系のメモです。

ImageMagick をダウンロード

下記サイトからダウンロードできます。各々の OS 環境に合わせてダウンロードしてください。Windows 環境の場合は portable 版を落とすのがお手軽でよさそう。

f:id:xin9le:20180313004617p:plain

バッチ処理で一括変換

ダウンロードした ImageMagick の .zip ファイルを解凍して、中にある convert.exe を利用しましょう。特定フォルダに含まれる .tga ファイルを一括で .png にする場合、例えば以下のようなバッチファイルになるでしょう*1

@echo off

set CONVERTER=ImageMagick-7.0.7-26-portable-Q16-x64\convert.exe
set SOURCE=<tgaのあるフォルダパス>
set TARGET=<pngの出力先フォルダパス>

for /r %SOURCE% %%A in (*.tga) do (
    echo %%A
    %CONVERTER% %%A %TARGET%\%%~nA.png
)

一度作ってしまえば、あとはフォルダに入れて叩くだけ!また、この手法を使えば別の拡張子間での変換もお手軽にできるはずです。

*1:パスやフォルダは適宜読み替えてください

ASP.NET Core SignalR を試す

これまで ASP.NET Core には SignalR が提供されていませんでしたが、最近ようやく利用できるようになってきたということで実際に動かして試してみました。今回は、空プロジェクトから簡易チャット機能を動作させるところまでを紹介します。

以下に公式ブログに getting started がありますが、空プロジェクトからの開始ではないのでよりシンプルにしてみます。

Step.1 : 環境構築

まず、以下のセットアップを行いましょう。

Step.2 : プロジェクトの準備

次に Visual Studio で ASP.NET Core テンプレートから空プロジェクトを作成します。

f:id:xin9le:20180312011235p:plain

f:id:xin9le:20180312011620p:plain

続いて、npm (Node Package Manager) を用いてクライアント実装で利用する JavaScript ライブラリを取得します。以下のコマンドを入力しましょう。

npm install @aspnet/signalr 

上記コマンド実行によってインストールされた .js ファイル群を wwwroot 配下にコピーします。手コピが面倒ですが、そういうものということにしましょう。

コピー フォルダパス
C:\Users\<ユーザー名>\node_modules\@aspnet\signalr\dist\browser
<プロジェクトフォルダ>\wwwroot\libs\signalr

最終的なプロジェクト構造は以下のようになる想定です。

f:id:xin9le:20180312040050p:plain

Step.3 : Hub (サーバー側) を作成

チャット機能のサーバー側実装として Hub を作成します。Hub は MVC でいうところの Controller に当たる部分です。クライアントからのリクエストを処理し、接続されているユーザーにデータを返します。今回は簡易チャット機能なので、以下のような実装をします。

using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;

namespace SignalRSample.Hubs
{
    public class ChatHub : Hub
    {
        public Task Broadcast(string message)
        {
            //--- 接続されている全ユーザーにメッセージを配信
            var timestamp = DateTime.Now.ToString();
            return this.Clients.All.SendAsync("Receive", message, timestamp);
        }
    }
}

続いて、ASP.NET Core アプリケーションで SignalR の機能を有効にします。Startup.cs に以下のようなコードを記述します。

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using SignalRSample.Hubs;

namespace SignalRSample
{
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSignalR();
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseDefaultFiles();
            app.UseStaticFiles();
            app.UseSignalR(x => x.MapHub<ChatHub>("/chat"));
        }
    }
}

Step 4 : クライアント側を作成

メインとなるページ (index.html) を wwwroot 直下に作成します。中身は以下のような感じにします。

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8" />
    <title>SignalR Sample</title>
</head>
<body>
    <form>
        <input type="text" id="message" />
        <button type="submit" id="button">Send</button>
    </form>
    <div id="messages"></div>

    <script src="libs/signalr/signalr.js"></script>
    <script src="scripts/chat.js"></script>
</body>
</html>

次に、サーバーと接続してデータのやりとりをする処理を実装します。ここでは wwwroot\scripts\chat.js として作成します。

//--- ChatHub とのコネクションを生成
const connection = new signalR.HubConnection('/chat');

//--- 受信したときの処理
connection.on('Receive', (message, timestamp) => {
    const item = document.createElement('li');
    item.innerHTML = "<div>" + timestamp + " - " + message + "</div>";
    document.getElementById('messages').appendChild(item);
});

//--- ボタンをクリックしたらデータを送信
document.getElementById('button').addEventListener('click', event => {
    const message = document.getElementById('message').value;
    connection.invoke('Broadcast', message).catch(e => console.log(e));
    event.preventDefault();
});

//--- 接続を確立
connection.start().catch(e => console.log(e));

Step.5 : 実行

実装が完了したので実行してみましょう。複数のブラウザ間でメッセージのやり取りができることが確認できると思います。

f:id:xin9le:20180312035722p:plain

細かい説明は省略していますが、とりあえず Hello World ができれば大きな一歩!