読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (14) - Cold to Hot変換

Rx

Push型配信の特徴は情報の分配 (同じデータを複数の購読者に送信すること) です。しかし、前回の内容の通り、ColdなObservableでは分配をすることができません。Rxの旨みを最大限に引き出すためには、ColdなObservableをHotなObservableに変換する必要があります。今回はこの変換の方法と挙動について触れてみたいと思います。

変換のイメージ

ColdなObservableは購読対象者と1対1の関係を持ちます。以前も出てきた「山から湧き出る水」イメージで言えば、湧き出た水が1本の川 (購読者に続く道 = Subscribe) となって下っているような状態です。HotなObservableは1対多の関係を持つので、詰まるところHotなObservableに変換するということは「川を複数の支流に分岐」させれば良いわけです。

Mountain

ここでポイントになるのは、「1本の川を分岐して支流を作るための猶予」を作ってあげることです。分岐工事中、一滴の水も漏らすわけにはいきません。そのために、工事前にまず水を貯めておくダムを準備します。ダムで水を堰き止めている間に支流を作り、支流が完成したらダムを解放して水を流す、と言ったイメージです。

これらを実現するのが、IConnectableObservable<T>インターフェースとそれを返す変換メソッドです。以降、これらについて詳しく見ていきます。

変換メソッドとその挙動

IConnectableObservable<T>は、IObservable<T>の機能に加えConnectメソッドを持っています。変換メソッドからIConnectableObservable<T>が返された段階では未接続 (ダムが解放されていない) 状態です。この間にSubscribe (支流作成) を行い、流す先が確定したらConnect (ダムを解放) します。

Observable.Publish

まず、最も基本的なCold to Hot変換メソッドであるPublishメソッドを紹介します。以下に簡単なサンプルを示します。

using System;
using System.Reactive.Linq;
 
namespace Sample27_Publish
{
    static class Program
    {
        static void Main()
        {
            //--- ↓↓ IConnectableObservable<int>が返される
            var sequence = Observable.Range(1, 3).Publish();
            sequence.SubscribeTracer("A");
            sequence.SubscribeTracer("B");
            sequence.Connect(); //--- 放流!
            sequence.SubscribeTracer("C");
        }
 
        static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
        {
            Console.WriteLine("----- {0} : Subscribe Before -----", name);
            var unsbscriber = source.Subscribe
            (
                value => Console.WriteLine("{0} : OnNext({1})", name, value),
                ()    => Console.WriteLine("{0} : OnCompleted", name)
            );
            Console.WriteLine("----- {0} : Subscribe After -----", name);
            return unsbscriber;
        }
    }
}
 
//----- 結果
/*
----- A : Subscribe Before -----
----- A : Subscribe After -----
----- B : Subscribe Before -----
----- B : Subscribe After -----
A : OnNext(1)
B : OnNext(1)
A : OnNext(2)
B : OnNext(2)
A : OnNext(3)
B : OnNext(3)
A : OnCompleted
B : OnCompleted
----- C : Subscribe Before -----
C : OnCompleted
----- C : Subscribe After -----
*/

Cold ObservableであるObservable.Rangeメソッドに対してPublishメソッドをチェインしています。ここでは省略のためにvarを使っていますが、戻り値としてはIConnectableObservable<int>が返っています。ここからConnectメソッドを呼び出して放流するまでの間がHot Observableとなります。

結果にもあるように、Subscribeメソッドを呼び出した段階では値が流れていません。そしてConnectメソッドを呼び出したとき、すべての値が一気にが流れています。このとき、値はA、Bの両方に配信されていることが確認できます。また、Connectメソッド呼び出し後にSubscribeした場合 (C) はOnCompletedしか呼び出されません。(すでにすべての値が流出してダムが空になっているので)

Observable.Publish(initialValue)

Publishメソッドには、引数に初期値を受けるものがあります。その例を以下に示します。Mainメソッド以外は同じなので割愛します。

static void Main()
{
    //--- 初期値はDoを通らず、かつSubscribe時に流れる
    var sequence = Observable.Range(1, 3)
                 .Do(value => Console.WriteLine("Do({0})", value))
                 .Publish(-1); //--- 初期値を指定
    sequence.SubscribeTracer("A");
    sequence.SubscribeTracer("B");
    sequence.Connect();
    sequence.SubscribeTracer("C");
}
 
//----- 結果
/*
----- A : Subscribe Before -----
A : OnNext(-1)
----- A : Subscribe After -----
----- B : Subscribe Before -----
B : OnNext(-1)
----- B : Subscribe After -----
Do(1)
A : OnNext(1)
B : OnNext(1)
Do(2)
A : OnNext(2)
B : OnNext(2)
Do(3)
A : OnNext(3)
B : OnNext(3)
A : OnCompleted
B : OnCompleted
----- C : Subscribe Before -----
C : OnCompleted
----- C : Subscribe After -----
*/

コメントと結果の通り、Subscribe実行時に引数で指定した初期値が渡っています。Publishメソッドが堰き止めるのはPublishメソッドより前から流れてくるものだけで、初期値は対象外です。また、初期値はPublishメソッドから流れてくるので、Publishメソッドのひとつ前のDoメソッドを通過することはありません。初期値が渡ってくること以外は、引数なしのPublishメソッドと同じです。

ここで初めて利用したDoメソッドは、Subscribeメソッドと同じく流れてくる値を受けて処理を行う関数です。流れてきた値をそのまま値を通過させる点がSubscribeメソッドとの違いです。シーケンスの途中でトレースなどの処理を挟み込むのに便利です。

Observable.Replay

Observable.Replayメソッドは、Connectメソッド呼び出し後にSubscribeした場合に、Connectメソッド呼び出し前に流した値を再生するものです。以下にサンプルを示します。

static void Main()
{
    var sequence = Observable.Range(1, 3).Replay();
    sequence.SubscribeTracer("A");
    sequence.SubscribeTracer("B");
    sequence.Connect();
    sequence.SubscribeTracer("C");
}
 
//----- 結果
/*
----- A : Subscribe Before -----
----- A : Subscribe After -----
----- B : Subscribe Before -----
----- B : Subscribe After -----
A : OnNext(1)
B : OnNext(1)
A : OnNext(2)
B : OnNext(2)
A : OnNext(3)
B : OnNext(3)
A : OnCompleted
B : OnCompleted
----- C : Subscribe Before -----
C : OnNext(1)
C : OnNext(2)
C : OnNext(3)
C : OnCompleted
----- C : Subscribe After -----
*/

Publishメソッドの違いは1点で、購読者A、Bに流した値を購読者Cにも配信しているところです。上記のサンプルではすべての値を再配信しましたが、これ以外にも再生する値の個数を制限したり、直近の一定時間内に配信したものだけを再生するといった機能を持つオーバーロードがあります。

Observable.PublishLast

Observable.PublishLastメソッドは、その名の通り最新の値のみを配信するものです。以下にサンプルを示します。

static void Main()
{
    var sequence = Observable.Range(1, 3).PublishLast();
    sequence.SubscribeTracer("A");
    sequence.SubscribeTracer("B");
    sequence.Connect();
    sequence.SubscribeTracer("C");
}
 
//----- 結果
/*
----- A : Subscribe Before -----
----- A : Subscribe After -----
----- B : Subscribe Before -----
----- B : Subscribe After -----
A : OnNext(3)
A : OnCompleted
B : OnNext(3)
B : OnCompleted
----- C : Subscribe Before -----
C : OnNext(3)
C : OnCompleted
----- C : Subscribe After -----
*/

これはこれまでのサンプルと違い、Connectメソッドの呼び出し前に登録した購読者A、Bに対しても最新の値のみを通知します。購読者Cについては言わずもがなです。また、Rx入門 (12) - 非同期処理のシーケンス化で見た非同期処理の挙動とそっくりです。

Observable.Multicast

最後に紹介するのは、Cold to Hot変換の中でも最も汎用的なメソッドであるObservable.Multicastメソッドです。このメソッドは引数にISubject<TSource, TResult>インターフェースを取ります。ですので、以前Rx入門 (5) - Subjectの利用Rx入門 (6) - Subjectの亜種で紹介した各種Subject<T>を与えることができます。以下にその例を示します。

var s0 = Observable.Range(1, 3);
var s1 = s0.Multicast(new Subject<int>());           //--- Publish()
var s2 = s0.Multicast(new BehaviorSubject<int>(-1)); //--- Publish(-1)
var s3 = s0.Multicast(new ReplaySubject<int>());     //--- Replay()
var s4 = s0.Multicast(new AsyncSubject<int>());      //--- PublishLast()

上記のコメントにもありますが、実は先に紹介した変換メソッドはMulticastメソッドに各種Subject<T>を与えたものです。「全部ショートカットメソッドがあるならMulticastなんて要らないじゃないか!」と思いたくもなりますが、カスタムSubjectに対する汎用的な道を残しているのだと思います。(どのようなケースでカスタマイズが必要になるのかは不明ですが...)

次回予告

今回はColdなObservableをHotなObservableに変換するメソッドについて見てきました。ますます便利な感じがしてきましたね!次回はスケジューラーによるスレッドの切り替えについて触れてみようと思います。一応次回で入門の連載はおしまいのつもりです。(その後にのんびり落ち穂拾いを楽しむつもりですが...w)