xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (17) - 切断検知と自動再接続

gRPC はサーバーとクライアントが常時コネクションを張っている状態です。このコネクションが切断されたタイミングを検知して後処理や再接続処理をしたい、というのはよくパターンかと思います。実は、生の gRPC で切断検知をするのは実はかなり面倒です。MagicOnion はそのあたりを上手くラップ*1し、扱いやすい形として提供してくれています。

今回は、それらを利用した切断検知と再接続の手法について見ていきます。

クライアント側で切断を検知

例えば、サーバーがダウンしたりなどしてコネクションが切断されたことをクライアント側で検知する方法は以下のようにします。

static async Task MainAsync()
{
    var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
    var context = new ChannelContext(channel, () => "xin9le");

    //--- 切断検知を仕込む
    context.RegisterDisconnectedAction(() =>
    {
        Console.WriteLine("Disconnected");
    });

    //--- 接続待機
    await context.WaitConnectComplete();

    //--- 接続されてから 10 秒以内にサーバーを落とすと「Disconnected」と表示される
    await Task.Delay(10000);
}

ChannelContext.RegisterDisconnectedAction で切断のタイミングをフックすることができます!超簡単!

また、ここまで長らくお行儀悪く書いてこなかったのですが、ChannelContextDispose するのが良いです。以下の例のように Dispose しても切断検知が走ります。

static async Task MainAsync()
{
    var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
    var context = new ChannelContext(channel, () => "xin9le");

    //--- 切断検知を仕込む
    context.RegisterDisconnectedAction(() =>
    {
        Console.WriteLine("Disconnected");
    });

    //--- 接続待機
    await context.WaitConnectComplete();

    //--- 切断する
    Console.WriteLine("1");
    await Task.Delay(1000);
    Console.WriteLine("2");

    context.Dispose();  // 切断!

    Console.WriteLine("3");
    await Task.Delay(1000);
    Console.WriteLine("4");

    Console.ReadLine();
}

//--- 結果
/*
1
2
3
Disconnected
4
*/

チャンネルのシャットダウン

これまたお行儀悪くずっと書いてこなかったのですが、gRPC の Channel はプロセスを終了する前にシャットダウンすることが強く推奨されています。また、シャットダウンを検知して後処理を行うこともできるようになっています。以下のような感じです。

static async Task MainAsync()
{
    var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);

    //--- シャットダウン検知
    channel.ShutdownToken.Register(() =>
    {
        Console.WriteLine("Shutdown");
    });

    //--- チャンネルをシャットダウン
    Console.WriteLine("1");
    await channel.ShutdownAsync();
    Console.WriteLine("2");

    Console.ReadLine();
}


//--- 結果
/*
1
Shutdown
2
*/

MagicOnion が提供する ChannelContext を利用している場合、終了処理は以下のような感じになると思います。

static async Task MainAsync()
{
    var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
    var context = new ChannelContext(channel, () => "xin9le");

    //--- シャットダウン検知 / 切断検知
    channel.ShutdownToken.Register(() =>
    {
        Console.WriteLine("Shutdown");
    });
    context.RegisterDisconnectedAction(() =>
    {
        Console.WriteLine("Disconnected");
    });

    //--- 接続待機
    await context.WaitConnectComplete();
    await Task.Delay(1000);

    //--- 切断する
    Console.WriteLine("1");
    context.Dispose();
    Console.WriteLine("2");
    await channel.ShutdownAsync();
    Console.WriteLine("3");

    Console.ReadLine();
}

//--- 結果
/*
1
2
Shutdown
Disconnected
3
*/

サーバー側で切断検知

サーバー側でも接続していたクライアントがいなくなったことを検知して後処理を行いたいケースはよくあります。サーバー側での検知は以下のように行います。

public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
{
    public async UnaryResult<Nil> Sample()
    {
        this.GetConnectionContext().ConnectionStatus.Register(() =>
        {
            Console.WriteLine("Disconnect detected!!");
        });
        return Nil.Default;
    }
}

ConnectionContext.ConnectionStatusCancellationToken 型になっていて、クライアントの切断が検知されたときに Cancel が発行される仕組みになっています。その Cancel に反応できるように Register メソッドで事前に処理を登録しておく感じです。

例えばクライアント側を以下のように実装したとすると、ChannelContext.Dispose を呼び出したタイミングでサーバー側で切断検知され「Disconnected detected!!」が表示されます。

static async Task MainAsync()
{
    var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
    var context = new ChannelContext(channel, () => "xin9le");

    await context.WaitConnectComplete();
    var client = context.CreateClient<ISampleApi>();
    await client.Sample();  // サーバー側で切断検知できるようにする
    await Task.Delay(1000);

    context.Dispose();  // ここを呼び出すとサーバー側で切断検知が走る
    await channel.ShutdownAsync();
    Console.ReadLine();
}

クライアントの自動再接続を行う

トンネルに入って出たときや、サービスの一時的なダウンから復旧した場合などは、自動的に再接続して復旧してほしいものです。そう言った処理も先の切断検知のタイミングを利用すれば実現できます。例えば以下のような感じです。

static async Task MainAsync()
{
    var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
    var context = new ChannelContext(channel, () => "xin9le");

    //--- 再接続処理
    context.RegisterDisconnectedAction(async () =>
    {
        Console.WriteLine("Reconnecting...");
        await context.WaitConnectComplete();  // 再接続待ち
        Console.WriteLine("Reconnected");
    });

    //--- 接続待機
    Console.WriteLine("Connecting...");
    await context.WaitConnectComplete();
    Console.WriteLine("Connected");

    //--- この待ち時間にサーバーを落としたり立ち上げたりしてみましょう
    await Task.Delay(30000);

    //--- 切断する
    Console.WriteLine("Shutdown");
    context.Dispose();
    await channel.ShutdownAsync();
    Console.ReadLine();
}

//--- 実行例
/*
Connecting...
Connected
Reconnecting...
Reconnected
Shutdown
*/

再接続には多少時間がかかりますが、自動でコネクションを復旧できるメリットは非常に大きいので是非実装にチャレンジしてみてください。

*1:Deplex Streaming を使って死活監視をしている

gRPC / MagicOnion 入門 (16) - 複数ユーザーへのプッシュ配信

前回は Unary 通信中にサーバープッシュをする方法を解説しました。しかしそれは 1 つのクライアントに対してプッシュ配信する例だったので、今回は複数クライアントに対してプッシュする方法について見ていきます。

勘のいい方にはパッと分かるかもしれませんが、前回静的キャッシュしていた StreamingContextRepository<TService> を複数キャッシュすればいいだけですね!

サーバー側

サービス側の実装を複数クライアントの接続をキャッシュできるように書き換えます。安直に実装すれば ConcurrentDictionary を利用することになりますが、MagicOnion はこれをラップして便利に扱えるような機能を提供しています。それが下記の例にある StreamingContextGroup<TKey, TService> です。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class ChatApi : ServiceBase<IChatApi>, IChatApi
    {
        //--- StreamingContextRepository をコレクション管理してくれるヤツ
        private static StreamingContextGroup<string, IChatApi> cache = new StreamingContextGroup<string, IChatApi>();

        public async UnaryResult<Nil> Join()
        {
            var context = this.GetConnectionContext();
            cache.Add(context.ConnectionId, new StreamingContextRepository<IChatApi>(context));  // コレクションに追加
            return Nil.Default;
        }

        public async UnaryResult<Nil> Unjoin()
        {
            var context = this.GetConnectionContext();
            cache.Get(context.ConnectionId).Dispose();  // 切断して
            cache.Remove(context.ConnectionId);  // 削除
            return Nil.Default;
        }

        public async UnaryResult<Nil> Send(string text)
        {
            //--- コレクションで管理されている全クライアントに送信
            Console.WriteLine(text);
            await cache.BroadcastAllAsync(x => x.OnReceive, text);
            return Nil.Default;
        }

        public async Task<ServerStreamingResult<string>> OnReceive()
        {
            var context = this.GetConnectionContext();
            var repo = cache.Get(context.ConnectionId);
            var result = await repo.RegisterStreamingMethod(this, this.OnReceive);  // サーバーストリーミング登録
            return result;
        }
    }
}

StreamingContextGroup は全ユーザーに送信する BroadcastAllAsync 以外にも、対象を絞って送信するメソッドも用意されています。用途に応じて使い分けてください。

また注意が必要なのが、StreamingContextGroup.Remove メソッドが内部で StreamingContextRepository.Dispose メソッドを呼び出さないことです。キッチリ自前で Dispose を呼び出し、切断の管理をしましょう。

クライアント側

クライアント側の実装は特に変更なく、デモ用に微調整している程度です。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            Console.Title = "MagicOnionSample.Client";
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);

            var name = Console.ReadLine();  // 分かりやすいように名前を入力
            var context = new ChannelContext(channel, () => name);
            await context.WaitConnectComplete();
            var client = context.CreateClient<IChatApi>();

            await client.Join();
            var streaming = await client.OnReceive();
            var receiveTask = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine(x));
            await Task.Delay(10000);  // デモ用にちょっと待つ
            await client.Send($"from {name}");
            await Task.Delay(10000);  // デモ用にちょっと待つ
            await client.Unjoin();
            await receiveTask;

            Console.ReadLine();
        }
    }
}

実行してみる

上記を実行してみると、以下のような感じの結果を得ることができます。ちゃんと複数人にメッセージが飛んでいますね!

f:id:xin9le:20180321010156p:plain

gRPC / MagicOnion 入門 (15) - Unary 通信中にプッシュ配信

gRPC におけるサーバーからの Push 通知は Server Streaming 通信を用いて行う、というのを以前解説しました。

ですが、この方法では Server Streaming 専用の API を呼び出さないと Push 通知が行われません。一般的には、何か単発の API (gRPC だと Unary) 呼び出しを起点として Push 通知したいというケースがほとんどです。このようなことをしようとした場合、素の gRPC だと結構大変な実装をしなければならないのですが、MagicOnion は簡単に実現するための機能を搭載しているので安心です。

今回はそんな Unary 通信中に Push 配信するための実装方法について見ていきます。

サーバー側

チャット風なものを作る想定で、API の定義を以下のような感じにします。参加 / 退出 / 送信 / 受信の API があるイメージですね。

using System.Threading.Tasks;
using MagicOnion;
using MessagePack;

namespace MagicOnionSample.ServiceDefinition
{
    public interface IChatApi : IService<IChatApi>
    {
        UnaryResult<Nil> Join();  // チャットルームへの参加
        UnaryResult<Nil> Unjoin();  // チャットルームから退出
        UnaryResult<Nil> Send(string message);  // メッセージ送信
        Task<ServerStreamingResult<string>> OnReceive();  // push 配信によるメッセージ受信
    }
}

下はその実装例です。短い割に結構複雑...な印象かもしれません。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class ChatApi : ServiceBase<IChatApi>, IChatApi
    {
        //--- static に ServerStreamingContext をキャッシュ
        private static StreamingContextRepository<IChatApi> cache;

        public async UnaryResult<Nil> Join()
        {
            //--- ストリーミング情報をキャッシュする入れ物を作る
            var context = this.GetConnectionContext();
            cache = new StreamingContextRepository<IChatApi>(context);

            Console.WriteLine($"{context.ConnectionId} is joined.");
            return Nil.Default;
        }

        public async UnaryResult<Nil> Unjoin()
        {
            //--- ストリーミングを切断
            cache.Dispose();
            cache = null;

            //--- ここはメッセージを出したいだけなので重要じゃない
            var context = this.GetConnectionContext();
            Console.WriteLine($"{context.ConnectionId} is unjoined.");

            return Nil.Default;
        }

        public async UnaryResult<Nil> Send(string text)
        {
            //--- キャッシュしているストリーミング情報 (OnReceive) を使って push 配信
            Console.WriteLine($"ChatApi.Send : {text}");
            await cache.WriteAsync(x => x.OnReceive, text);
            return Nil.Default;
        }

        public async Task<ServerStreamingResult<string>> OnReceive()
        {
            //--- このメソッド (OnReceive) のストリーミング情報を登録
            Console.WriteLine($"Start receiving...");
            var result = await cache.RegisterStreamingMethod(this, this.OnReceive);

            //--- ここは cache.Dispose() されるまで通りません!
            Console.WriteLine($"Stop receiving...");
            return result;
        }
    }
}

ここで重要なのが StreamingContextRepository<TService> という静的キャッシュ機構です。なぜこのようなキャッシュが必要かというと、そもそも gRPC の Server Streaming 接続は Server Streaming API (今回の例でいうと OnReceive) が終わるまでの間 push 配信が可能だからです。裏を返せば OnReceive メソッドが終了したら push 配信はできなくなります

という基本/前提があるので、別 API から push 配信をするためには以下の 2 点を守る必要が出てきます。

  • ストリーミング API (OnReceive) を終了させないで待機させる
  • ストリーミング接続情報を静的キャッシュして、別 API (Send) からアクセスできるようにする

そして、これらの要件をサポートしているのが StreamingContextRepository<TService> です。先の OnReceive メソッド内に記述されている RegisterStreamingMethod でストリーミング接続を登録/記憶させます。このメソッドは StreamingContextRepository.Dispose (今回の例でいうと Unjoin) が呼び出されるまで待機し続けるため、ひとつ目の要件を満たすことができます。こうやって待機させている間に、静的キャッシュを介してサーバー push を行うというわけです。

クライアント側

難しいのはサーバー側だけで、クライアント側の実装は簡単です。サーバー側の API を叩く順番だけ気を付けましょう。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- いつもの前準備
            Console.Title = "MagicOnionSample.Client";
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var context = new ChannelContext(channel, () => "xin9le");
            await context.WaitConnectComplete();
            var client = context.CreateClient<IChatApi>();

            //--- チャットルームに参加
            Console.WriteLine("Step.1 : Join");
            await client.Join();

            //--- メッセージ受信の登録
            Console.WriteLine("Step.2 : Register receiving");
            var streaming = await client.OnReceive();
            var receiveTask = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Received : {x}"));

            //--- メッセージを投げてみる
            Console.WriteLine("Step.3 : Send 1st message");
            await client.Send("あいうえお");

            Console.WriteLine("Step.4 : Send 2nd message");
            await client.Send("ABCDE");

            //--- チャットルームから退出
            Console.WriteLine("Step.5 : Unjoin");
            await client.Unjoin();

            //--- ちゃんと終わるの待ちましょう
            Console.WriteLine("Step.6 : Waiting for completion");
            await receiveTask;

            Console.ReadLine();
        }
    }
}

実行してみる

実装したものを実行してみると、以下のようなログが表示されます。呼び出される順序を確認してみてください。

f:id:xin9le:20180319013751p:plain

私がこれを勉強したとき、腑に落ちるまで結構時間がかかりました...!けれど、gRPC の Server Streaming の特性とそれをサポートする StreamingContextRepository を押さえられれば大丈夫です。

gRPC / MagicOnion 入門 (14) - 接続ユーザーを特定する

gRPC で通信を行う際、サーバー側でアクセスしてきているユーザーを特定したいケースは多々あります。これを実現する最も基本的で素朴な方法が、gRPC / MagicOnion 入門 (10) - ヘッダーの利用 で解説したヘッダーにユーザー固有の ID を埋め込むことです。

そのままやろうとすると結構手間な実装になりますが、MagicOnion はこれを簡単に実現できるよう ConnectionId の概念をサポートしてくれています。今回はその機能について見ていきます。

クライアントから ConnectionId を送信する

クライアントから ConnectionId を送信する場合、これまで紹介してきた方法と若干違う方法で初期化する必要があります。以下のように ChannelContext でラップしつつ、通信を行います。

using System;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);

            //--- ChannelContext でチャンネルとユーザー固有の ID をラップ
            var connectionId = "なにかしらユーザー固有のID";
            var context = new ChannelContext(channel, () => connectionId);
            await context.WaitConnectComplete();  // 接続待ち

            //--- API クライアントを生成して通信
            var client = context.CreateClient<ISampleApi>();
            var result = await client.Sample();
            Console.ReadLine();
        }
    }
}

サーバー側で ConnectionId を取り出す

サーバー側で ConnectionId を取り出すのは非常に簡単で、以下のようにします。

using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;
using MessagePack;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async UnaryResult<Nil> Sample()
        {
            //--- こんな感じで取り出せます
            //--- たったこれだけ!
            var connectionId = this.GetConnectionContext().ConnectionId;
            return Nil.Default;
        }
    }
}

この ConnectionId の仕組みも、実はヘッダーを利用して行われています。MagicOnion が面倒な部分を上手くラップしてくれていることもあり、簡単に使えるのでオススメです。

ImageMagick でお手軽 TGA → PNG 変換

業務で .tga で納品される大量のファイルを一括して .png に変換したいということがあったので、ImageMagick Converter を使ってチャチャっとやってみた系のメモです。

ImageMagick をダウンロード

下記サイトからダウンロードできます。各々の OS 環境に合わせてダウンロードしてください。Windows 環境の場合は portable 版を落とすのがお手軽でよさそう。

f:id:xin9le:20180313004617p:plain

バッチ処理で一括変換

ダウンロードした ImageMagick の .zip ファイルを解凍して、中にある convert.exe を利用しましょう。特定フォルダに含まれる .tga ファイルを一括で .png にする場合、例えば以下のようなバッチファイルになるでしょう*1

@echo off

set CONVERTER=ImageMagick-7.0.7-26-portable-Q16-x64\convert.exe
set SOURCE=<tgaのあるフォルダパス>
set TARGET=<pngの出力先フォルダパス>

for /r %SOURCE% %%A in (*.tga) do (
    echo %%A
    %CONVERTER% %%A %TARGET%\%%~nA.png
)

一度作ってしまえば、あとはフォルダに入れて叩くだけ!また、この手法を使えば別の拡張子間での変換もお手軽にできるはずです。

*1:パスやフォルダは適宜読み替えてください