読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

ReactiveSignalR

Happy Merry X'mas!! C# Advent Calendar 2013、23日目担当の@xin9leです。誰が何と言おうと23日目です。先の3連休を高校の同級生と夜通しカタンで遊び過ぎたせいで担当記事の公開が間に合わなかったなんてことは...あるようなないような...。関係各位、誠に申し訳ありません...><。

今年のOne ASP.NET Advent Calendar 2013では4回も登場し、うち3回がSignalRの記事と現在SignalR全力プッシュ中です。12日目に担当した記事にDynamicSignalRというのがありますが、そのときに「もう少しマシな解法」と書いたヤツが今回ご紹介するReactiveSignalRです。ちなみにRx (= Reactive Extensions) についてはある程度の理解がある前提で書いております。その点はご了承下さい。Rxについて学習したいと思われる方は「Rx入門」をご一読頂けると幸いです。すでに古い内容になっている箇所もありますが、考え方は十分に身に付けられるのではないかと思います。

ソースコードとNuGetパッケージ

ReactiveSignalRのソースコードはGitHubで公開しています。今回の記事投稿のために急いで準備したこともあり、まだ全然体裁などを整えられておりません。ReadMeなどについては今後順次対応して行きたいと思っています。

ReactiveSignalR

また、まだβ版ではありますがNuGetパッケージも公開しています。まだちゃんとテストできていないですが、ある程度は動くとは思います。本当はリリース前パッケージとして公開したかったのですが、その設定の仕方を調べる時間すらなく...。もし使ってみてバグなど発見したらご連絡ください。ちなみに.NET Client版はSilverlightと.NET 4を除くPCLとして、Server版は.NET 4.5以上で動作するようになっています。

ReactiveSignalR for .NET Client
ReactiveSignalR for Server

Onメソッドのシーケンス化

SignalRはPub/Subスタイルの通信ライブラリです。最初にSignalRを見たときに感じたのが「Rxと同じ考え方なんかー」でした。同じ考え方ならいつかきっとどこかでRxとコラボするだろうと思っていました。それから実務でSignalRを使うようになったとき、面倒と思うことが出てきました。最も面倒だと思ったのが利用頻度の極めて高いOnメソッドです。以下はWPFなどのGUIアプリケーション上における典型的な利用例です。

var context = SynchronizationContext.Current;  //--- UIスレッドの同期コンテキストを記憶
this.MyHub.On<string>("Receive", message =>
{
    //--- 何かの条件で更新するかどうかを決めたり
    if (!condition())
        return;

    //--- Onメソッドは非UIスレッドでの処理なのでスレッドを戻す
    context.Post(_ =>
    {
        this.display.Text = message;
    }, null);
});

これの面倒な点はUIスレッドをイチイチ戻す処理が入ることです。ネストするのがすこぶる煩わしい。On~と表現されていることからも分かるようにまさにサーバーが発信源のイベント処理なので、これはIObservable<T>のイベントシーケンスとしてみなすことができます。一旦Rxに乗せてしまえばスレッド間の移動や更新判定はお手の物です。ReactiveSignalRではOnメソッドをIObservable<T>として扱うための拡張メソッドを用意しており、それを利用することで以下のようなシンプルで宣言的な記述ができるようになります。

this.MyHub.On<string>("Receive")
.Where(x => condition())
.ObserveOn(SynchronizationContext.Current)  //--- UIスレッドに戻す
.Subscribe(x => this.display.Text = x);

以下のように第2引数にtrueを指定すれば同期コンテキストをキャプチャして動作します。これにより煩わしいスレッドの切り替えとはオサラバできます。この引数を省略した場合の既定値はfalseですが、これは元々の挙動に準ずるようにしているためです。

this.MyHub.On<string>("Receive", true)
.Where(x => condition())  //--- 最初からUIスレッド上で実行されるのでスレッド切り替えの手間がない
.Subscribe(x => this.display.Text = x);

Rxに乗っかっているので、もちろんBufferThrottleなどを使った更新の間引きもできるようになります。たったこれだけの変化ですが、使い勝手は大幅に向上します。SignalRとRxの相性が非常にいいことを実感頂けるでしょう。

Connectionが持つイベントのシーケンス化

SignalRサーバーとの永続的な接続はConnectionクラスを利用して行われます。もう少し具体的に言うとPersistentConnectionへの接続にはConnectionクラスが、Hubへの接続にはConnectionクラスから派生したHubConnectionクラスが利用されます。このベースとなっているConnectionクラスにはサーバーとの接続に関するイベントがいくつか用意されていますが、これらをIObservable<T>シーケンスに変換する拡張メソッドを用意しています。メソッド名はすべて「イベント名 + AsObservable」です。

イベント名 変換する拡張メソッド名 説明
Closed ClosedAsObservable サーバーとの接続が切れたときに発生
Error ErrorAsObservable エラーが検出されたときに発生
Received ReceivedAsObservable サーバーからの送られてきたデータを受信したときに発生
Reconnected ReconnectedAsObservable タイムアウト後に正常に再接続したときに発生
Reconnecting ReconnectingAsObservable エラー発生後に再接続を開始すると発生
StateChanged StateChangedAsObservable 接続状態が変化したときに発生
this.Connection = new Connection("http://localhost:12345/MyConnection");
this.Connection.ReceivedAsObservable(true)  //--- これも引数にtrueを入れるだけで呼び出し元スレッドに戻す
.Subscribe(x => this.display.Text = x);

ちなみに、SignalRには標準でConnectionクラスの拡張メソッドしてAsObservableが提供されています。これはReceivedイベント時にOnNextを、Errorイベント時にOnErrorを、Closedイベント時にOnCompletedを発生させるIObservable<T>シーケンスを生成します。ただしこの機能はPCL環境下では利用できません。ReactiveSignalRが提供するイベントシーケンス群はこれをある程度補完する機能にもなるでしょう。

匿名HubPipelineModule

SignalRのサーバー側にはHubパイプラインという機能があります。これはHubに対するアクセスの前後やエラー発生をフックして処理するためのものです。通常、Hubパイプラインを利用した処理をしようとしたらHubPipelineModuleクラスを継承したクラスを作って対応することになります。例えば以下のような感じです。

//--- HubPipelineModuleクラスを継承したクラスを作成 (メソッドの実装は省略)
public class MyHubPipelineModule : HubPipelineModule
{
    protected override bool OnBeforeAuthorizeConnect(HubDescriptor hubDescriptor, IRequest request){}
    protected override bool OnBeforeConnect(IHub hub){}
    protected override void OnAfterConnect(IHub hub){}
    protected override bool OnBeforeReconnect(IHub hub){}
    protected override void OnAfterReconnect(IHub hub){}
    protected override bool OnBeforeDisconnect(IHub hub){}
    protected override void OnAfterDisconnect(IHub hub){}
    protected override bool OnBeforeIncoming(IHubIncomingInvokerContext context){}
    protected override object OnAfterIncoming(object result, IHubIncomingInvokerContext context){}
    protected override bool OnBeforeOutgoing(IHubOutgoingInvokerContext context){}
    protected override void OnAfterOutgoing(IHubOutgoingInvokerContext context){}
    protected override void OnIncomingError(ExceptionContext exceptionContext, IHubIncomingInvokerContext invokerContext){}
}

//--- アプリケーション起動時にGlobalHostに設定
public class Startup
{
    public void Configuration(IAppBuilder app)
    {
        GlobalHost.HubPipeline.AddModule(new MyHubPipelineModule());
        app.MapSignalR();
    }
}

これではちょっとした処理を入れたい場合でもイチイチ派生クラスを作らなければなりません。これは煩わしいので外部からデリゲートを設定できるようにしました。

GlobalHost.HubPipeline.AddModule(new AnonymousHubPipelineModule
(
    //--- インスタンス生成時に外部からデリゲートを設定
    onBeforeIncoming: context =>
    {
        if (context.MethodDescriptor.Hub.HubType == typeof(MyHub))
        {
            //--- 特定のHubに対する処理をしてみたり
        }
        return true;
    }
));

また、特定のHubに対してのみ処理を差し込みたい場合もあるかと思います。そんなときに毎度×2メソッド毎にどのHubかを判定するのは極めて煩わしいです。その解決策としてAnonymousHubPipelineModuleにはGenerics版も用意されていて、型パラメーターを指定するだけで特定のHubに対する処理だけを記述できるようになっています。

GlobalHost.HubPipeline.AddModule(new AnonymousHubPipelineModule<MyHub>
(
    onBeforeIncoming: context =>
    {
        //--- MyHubのメソッド呼び出し前処理
        return true;
    }
));

HubPipelineModuleをイベントシーケンス化

上で匿名HubPipelineModuleを紹介しましたが、Hubパイプラインのイベントだけを処理したい場合もあるかもしれません。そのようなときのためにReactiveHubPipelineModuleを用意しました。HubPipelineModuleの各メソッドに対応するIObservable<T>シーケンスが取得できるので、Rxを利用したイベント処理が容易に実現できます。これも自前フィルタリングなしで特定のHubに対する処理だけを実現できるよう、Generics版が提供されています。

var module = new ReactiveHubPipelineModule<MyHub>();  //--- MyHubに対する処理のみ
module.AfterIncoming.Subscribe(x =>
{
    //--- MyHubのメソッド呼び出しの後処理
});
GlobalHost.HubPipeline.AddModule(module);  //--- 登録

HubのActionFilter属性

One ASP.NET Advent Calendar 2013の19日目の記事でASP.NET MVCやASP.NET Web APIのActionFilter属性を利用したサンプルを紹介しました。ActionFilter属性は、MVCやWeb APIのアクションメソッドの呼び出し前後に処理を差し込むための機能です。アクションに対して属性として明示的に設定できるのが非常に分かりやすいです。SignalRのHubについて考えてみると、外部からメソッド呼び出しされるのは同じなのに提供されている機能はHubPipelineModuleのみ。これは同じ機能が提供されていてもイイのではないかと思って作成しました。使い方はMVCやWeb APIとほとんど同じですが、前準備だけは忘れないようにしましょう。

public class Startup
{
    public void Configuration(IAppBuilder app)
    {
        //--- これがないと動きません
        GlobalHost.HubPipeline.AddModule(new ActionFilterModule());
        app.MapSignalR();
    }
}
public class MyActionFilterAttribute : ActionFilterAttribute
{
    //--- メソッド呼び出しの前処理
    public override bool OnBeforeIncoming(IHubIncomingInvokerContext context)
    {
        return base.OnBeforeIncoming(context);
    }

    //--- メソッド呼び出しの後処理
    public override void OnAfterIncoming(object result, IHubIncomingInvokerContext context)
    {
        base.OnAfterIncoming(result, context);
    }

    //--- エラー発生時
    public override void OnIncomingError(ExceptionContext exceptionContext, IHubIncomingInvokerContext invokerContext)
    {
        base.OnIncomingError(exceptionContext, invokerContext);
    }
}
public class MyHub : Hub
{
    [MyActionFilter]
    public void Send(string message)
    {
        this.Clients.All.Receive(message);
    }
}

今後の予定/目標

最初はOnメソッドのRx化だけだった機能ですが、ここ数日でいろいろ実装して結構便利なものができたのではないかと思っています。ReactiveSignalRとかナントカ言っちゃってますが、結局Reactive感があるのはクライアント側だけでサーバー側は全然です。一応サーバー側も含めたRx実装にSignalR.Reactiveなるものがあるのですが、なんかちょっとシックリ来ていません。なんでもかんでもRxと組み合わせれば良いわけじゃないので、今後良い案が浮かべば順次追加して行こうと思っています。とりあえずボンヤリと考えている案は以下のような感じです。他にもすでにバグってるかもと思っている箇所もあるので、忘れないうちに対応してしまいたいところです。

  • クライアント実装のJavaScript/TypeScript版を提供 (SignalRのjs版の記法とか全然見てないので、もしかしたら不要かも?)
  • ReactiveなHubっぽい何か

作ったものの機能解説ばかりで長くなってしまいましたが、今年のC# Advent Calendarの記事はここまで!来年も良い年になりますように!Happy Merry X'mas!!