読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (13) - HotとCold

Rx

IObservable<T>には、HotとColdと呼ばれる2つの性質があります。実はこれまで触れてきた記事のサンプルにもHotなものとColdなものが出てきていました。これらの性質の違いを把握することはRxを利用する上で非常に重要です。今回はこれらについて見ていきます。

Hot Observable

まずはHot Observableと呼ばれる性質についてです。Hotな性質とは、同一のIObservable<T>シーケンスに関連付けたすべてのIObserver<T>に対して一度に同じ値を通知することを言います。

HotObservable

Hotな性質を持つものの代表はイベント関連です。以下にObservable.FromEventメソッドを利用したサンプルを示します。まず、Rx入門 (11) - イベントのシーケンス化のサンプルでも挙げた、キー入力が行われる度にKeyPressイベントを発行するだけの簡単なクラスを作成します。Ctrl + Qが押下されたらループを抜けるようになっています。

using System;
 
namespace Sample24_HotObservable
{
    class EventNotifier
    {
        public event Action<ConsoleKey> KeyPress = null;
        public void ObserveKeyInput()
        {
            while (true)
            {
                var info = Console.ReadKey(true);
                if (info.Modifiers == ConsoleModifiers.Control)
                if (info.Key == ConsoleKey.Q)
                    return;
 
                var handler = this.KeyPress;
                if (handler != null)
                    handler(info.Key);
            }
        }
    }
}

次に、上で作成した型のKeyPressイベントをIObservable<T>シーケンス化したものに対して、オブザーバーAだけを関連付けた場合とオブザーバーAとBを関連付けた場合の挙動を確認します。

using System;
using System.Reactive.Linq;
 
namespace Sample24_HotObservable
{
    static class Program
    {
        static void Main()
        {
            var notifier = new EventNotifier();
            var sequence = Observable.FromEvent<ConsoleKey>
            (
                handler => notifier.KeyPress += handler,
                handler => notifier.KeyPress -= handler
            );
            sequence.SubscribeTracer("A");
            notifier.ObserveKeyInput();
            sequence.SubscribeTracer("B"); //--- Subscribe時には何も通知されない
            notifier.ObserveKeyInput();    //--- キー押下時、A/Bとも同じ値が通知される
        }
 
        static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
        {
            Console.WriteLine("----- {0} : Subscribe Before -----", name);
            var disposer = source.Subscribe(value => Console.WriteLine("{0} : OnNext({1})", name, value));
            Console.WriteLine("----- {0} : Subscribe After -----", name);
            return disposer;
        }
    }
}
 
//----- 結果(例)
/*
----- A : Subscribe Before -----
----- A : Subscribe After -----
A : OnNext(F1)
A : OnNext(F2)
----- B : Subscribe Before -----
----- B : Subscribe After -----
A : OnNext(Enter)
B : OnNext(Enter)
A : OnNext(Spacebar)
B : OnNext(Spacebar)
*/

上記のサンプルでは、「F1 ⇒ F2 ⇒ Ctrl + Q ⇒ Enter ⇒ Space ⇒ Ctrl + Q」と入力しています。オブザーバーA、Bの両方を接続している場合に注目すると、キー押下時に同時に両方のOnNext処理が反応しています。オブザーバーAだけを接続している場合は言わずもがなです。

最初の繰り返しになりますが、一度の通知アクションで関連付けてあるすべてのIObserver<T>に対して同一の値を送信するものがHot Observableになります。イメージとしては、イベントの発生をそのまま待機しているという感じです。

Cold Observable

次にCold Observableと呼ばれる性質についてです。Coldな性質とは、IObservable<T>シーケンスに関連付けたそれぞれのIObserver<T>に対して個別に値を通知することを言います。

ColdObservable

例外はあるものの、ColdなものはSubscribeメソッドが呼び出されると同時に値を発行するものが多いです。代表的なファクトリーメソッドには、Observable.ReturnメソッドやObservable.Rangeメソッドなどがあります。以下にそのサンプルを示します。

using System;
using System.Reactive.Linq;
 
namespace Sample25_ColdObservable
{
    static class Program
    {
        static void Main()
        {
            var sequence = Observable.Range(1, 3);
            sequence.SubscribeTracer("A");
            sequence.SubscribeTracer("B");
        }
 
        static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
        {
            Console.WriteLine("----- {0} : Subscribe Before -----", name);
            var disposer = source.Subscribe
            (
                value => Console.WriteLine("{0} : OnNext({1})", name, value),
                ()    => Console.WriteLine("{0} : OnCompleted", name)
            );
            Console.WriteLine("----- {0} : Subscribe After -----", name);
            return disposer;
        }
    }
}
 
//----- 結果
/*
----- A : Subscribe Before -----
A : OnNext(1)
A : OnNext(2)
A : OnNext(3)
A : OnCompleted
----- A : Subscribe After -----
----- B : Subscribe Before -----
B : OnNext(1)
B : OnNext(2)
B : OnNext(3)
B : OnCompleted
----- B : Subscribe After -----
*/

上記のサンプルでは同一の変数に対してSubscribeしていますが、関連付けを行ったそれぞれのオブザーバーに対して即座にすべての値が送信されています。Hot Observableと違い、シーケンスに対してオブザーバーBの接続を追加してもオブザーバーAに値が発行されることはありません。

また、Observable.Timerメソッドは「一定時間ごとに値を通知するイベント」のようなものなのでHotのように思いますが、実はColdです。以下にRx入門 (10) - 時間のシーケンス化で紹介したObservable.Timerメソッドのサンプルを再掲します。

using System;
using System.Reactive.Linq;
using System.Threading;
 
namespace Sample13_Timer
{
    class Program
    {
        static void Main()
        {
            var timer = Observable.Timer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(1));
            Console.WriteLine("A : 3秒後に1秒間隔のタイマーが開始されます");
            var disposerA = timer.Subscribe(value => Console.WriteLine("A : OnNext({0})", value));
            Thread.Sleep(1500);    //--- 1.5秒タイミングをずらす
            Console.WriteLine("\\tB : 3秒後に1秒間隔のタイマーが開始されます");
            var disposerB = timer.Subscribe(value => Console.WriteLine("\\tB : OnNext({0})", value));
            Console.ReadLine();    //--- 何か押したらAを終了
            disposerA.Dispose();
            Console.ReadLine();    //--- 何か押したらBを終了
            disposerB.Dispose();
            Console.ReadLine();    //--- 何も流れないことを確認
        }
    }
}
 
//----- 結果
/*
A : 3秒後に1秒間隔のタイマーが開始されます
        B : 3秒後に1秒間隔のタイマーが開始されます
A : OnNext(0)
A : OnNext(1)
        B : OnNext(0)
A : OnNext(2)
        B : OnNext(1)
A : OnNext(3)
        B : OnNext(2)
 
        B : OnNext(3)
        B : OnNext(4)
        B : OnNext(5)
*/

Observable.Timerメソッドから返されたシーケンスを複数回の購読する場合の動きに注目してください。結果より、AのタイマーもBのタイマーも値が0から始まっていることが確認できます。これは、Subscribeメソッド呼び出し時にObservable.Timerの内部でタイマーが2つ生成されていることを示しています。タイマーAとタイマーBは相互に関係がなく、それぞれ個別に値が発行されていることがわかります。

両方の性質を持つもの

中にはHotとColdの両方の性質を持つものもあります。AsyncSubject<T>Observable.ToAsyncメソッドなどの非同期系のものは、その代表です。以下にAsyncSubject<T>を利用したサンプルを示します。

using System;
using System.Reactive.Subjects;
 
namespace Sample26_HotAndColdObservable
{
    static class Program
    {
        static void Main()
        {
            var subject = new AsyncSubject<int>();
            subject.OnNext(1);
            subject.SubscribeTracer("A");
            subject.OnNext(2);
            subject.SubscribeTracer("B");
            Console.WriteLine("----- OnCompleted Before -----");
            subject.OnCompleted(); //--- ここまではHot、これより後はCold
            Console.WriteLine("----- OnCompleted After -----");
            subject.OnNext(3);
            subject.SubscribeTracer("C");
            subject.OnNext(3);
            subject.SubscribeTracer("D");
        }
 
        static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
        {
            Console.WriteLine("----- {0} : Subscribe Before -----", name);
            var unsbscriber = source.Subscribe
            (
                value => Console.WriteLine("{0} : OnNext({1})", name, value),
                ()    => Console.WriteLine("{0} : OnCompleted", name)
            );
            Console.WriteLine("----- {0} : Subscribe After -----", name);
            return unsbscriber;
        }
    }
}
 
//----- 結果
/*
----- A : Subscribe Before -----
----- A : Subscribe After -----
----- B : Subscribe Before -----
----- B : Subscribe After -----
----- OnCompleted Before -----
A : OnNext(2)
A : OnCompleted
B : OnNext(2)
B : OnCompleted
----- OnCompleted After -----
----- C : Subscribe Before -----
C : OnNext(2)
C : OnCompleted
----- C : Subscribe After -----
----- D : Subscribe Before -----
D : OnNext(2)
D : OnCompleted
----- D : Subscribe After -----
*/

AsyncSubject<T>は、OnCompletedメソッドが呼び出されるまでが非同期処理中を表します。非同期処理中にシーケンスへの接続が行われたものはすべて、非同期処理終了時に同じ値が通知されます。上記の例ではオブザーバーAとオブザーバーBが該当します。非同期処理終了後にシーケンスへの接続が行われた場合は、その都度戻り値と完了通知が送信されます。サンプルではオブザーバーCとオブザーバーDが該当します。つまり、非同期処理終了まではHot Observable、それ以降はCold Observableとなります。

このように、状態に応じて性質が変わるものが存在することを覚えておきましょう。

IEnumerable<T>における性質の違い

「なんで同じIObservable<T>なのに2つも性質があるの?」と思ってしまいそうですが、きっとみなさんも大好きだと思われるIEnumerable<T>にも、実は無限リスト (yield) によるものと配列やリストによるものに性質の違いがあります。以下にそのサンプルを示します。

using System;
using System.Collections.Generic;
using System.Linq;
 
namespace ConsoleApplication
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("----- 無限リスト -----");
            Console.WriteLine(GetYieldEnumerable().First());
            Console.WriteLine("----- 配列/リスト -----");
            Console.WriteLine(GetListAsEnumerable().First());
        }
 
        static IEnumerable<int> GetYieldEnumerable()
        {
            Console.WriteLine("GetYieldEnumerable : 1");
            yield return 1;
            Console.WriteLine("GetYieldEnumerable : 2"); //--- 呼び出されない
            yield return 2;
        }
 
        static IEnumerable<int> GetListAsEnumerable()
        {
            var list = new List<int>();
            Console.WriteLine("GetListAsEnumerable : 1");
            list.Add(1);
            Console.WriteLine("GetListAsEnumerable : 2"); //--- 呼び出される
            list.Add(2);
            return list;
        }
    }
}
 
//----- 結果
/*
----- 無限リスト -----
GetYieldEnumerable : 1
1
----- 配列/リスト -----
GetListAsEnumerable : 1
GetListAsEnumerable : 2
1
*/

上記のように結果が異なるのは、GetYieldEnumerableメソッドの方が "yield return 1;" までしかイテレーションが進んでいないのに対し、GetListAsEnumerableメソッドの方はすべての値をリストに入れて返し、そのリストの要素をイテレーションしているからです。

参考記事

本内容については、以下も合わせてお読みください。特に後者の@neueccさんの記事にはSilverlightを利用したサンプルが付いているので、より理解が深まると思います。(ってゆーか、本入門記事では13回目でHotとColdを扱っているのに、@okazukiさんの記事では5回目なんですね...orz)

次回予告

今回はIObservable<T>の重要な性質であるHotとColdについて見てきました。Rx/Push型配信の特徴のひとつに分配があることをRx入門 (3) - Push型とPull型で書きましたが、今回確認した通りColdなObservableでは分配ができません。「それじゃ困る!」ということでColdなObservableををHotなObservableに変換する機能が用意されています。次回はこのCold to Hot変換について触れてみたいと思います。