読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (7) - IObservable<T>の生成

Rx入門 (4) - IObserver<T>の省略」では、受信側であるIObserver<T>を省略した書き方について取り上げました。その記事の冒頭で「送信側はさておき...」と送信側のことは棚に上げていましたが、そうは言ってもやはり送信側もIObservable<T>をいちいち実装するのは手間であり、現実的ではありません。そこで今回は、IObservable<T>を簡単に生成する方法について見ていきます。

解決へのアプローチ

オブザーバーパターンは、「山 (IObservable<T>) から水 (情報) が湧き出て、川となって流れて (OnNext) おり、それを汲み上げる (Subscribe)」という風にイメージすることができます。(非常に個人的なイメージです)

Mountain

このイメージ通りにコードを記述することできればより理解し易い形となりますが、重要なのは「山から水が湧き出る」という箇所をどう記述するかということです。「水の湧き出方」は山ごとに異なり、それぞれパターンがあります。つまり、この「水の湧き出る山」をパターン毎に作ってやれば良いわけです。ということは、パターン別にIObservable<T>を生成するファクトリーメソッドがあれば良さそうです。

単純な定型句

前回までにSubject<T>とその亜種について学びました。これらを利用することで状態の通知に関する動作検証を簡単に行うことができますが、応用すればIObservable<T>としての挙動も書くことができます。ここでは、それらを使って特に単純で典型的な定型句と思われるものを実装してみます。

using System;
using System.Linq;
using System.Reactive.Subjects;
 
namespace Sample08_ObservableFactory
{
    /// <summary>
    /// IObservable<T>のファクトリーメソッドを提供します。
    /// </summary>
    static class ObservableFactory
    {
        /// <summary>
        /// 何もしないシーケンスを生成します。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <returns>生成されたシーケンス</returns>
        public static IObservable<T> Never<T>()
        {
            return new Subject<T>();
        }
 
        /// <summary>
        /// 完了のみを通知するシーケンスを生成します。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <returns>生成されたシーケンス</returns>
        public static IObservable<T> Empty<T>()
        {
            var subject = new Subject<T>();
            subject.OnCompleted();
            return subject;
        }
 
        /// <summary>
        /// 指定の値を1度だけ通知するシーケンスを生成します。
        /// </summary>
        /// <param name="value">通知する値</param>
        /// <returns>生成されたシーケンス</returns>
        public static IObservable<int> Return(int value)
        {
            var subject = new ReplaySubject<int>();
            subject.OnNext(value);
            subject.OnCompleted();
            return subject;
        }
 
        /// <summary>
        /// 指定されたエラーのみを通知するシーケンスを生成します。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <param name="error">通知する例外</param>
        /// <returns>生成されたシーケンス</returns>
        public static IObservable<T> Throw<T>(Exception error)
        {
            var subject = new Subject<T>();
            subject.OnError(error);
            return subject;
        }
 
        /// <summary>
        /// 指定した範囲内の整数のシーケンスを生成します。
        /// </summary>
        /// <param name="start">シーケンス内の最初の整数の値</param>
        /// <param name="count">生成する連続した整数の数</param>
        /// <returns>生成されたシーケンス</returns>
        public static IObservable<int> Range(int start, int count)
        {
            var subject = new ReplaySubject<int>();
            foreach (var value in Enumerable.Range(start, count))
                subject.OnNext(value);
            subject.OnCompleted();
            return subject;
        }
 
        /// <summary>
        /// 繰り返される1つの値を含むシーケンスを生成します。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <param name="element">繰り返される値</param>
        /// <param name="count">生成されたシーケンスで値を繰り返す回数</param>
        /// <returns>生成されたシーケンス</returns>
        public static IObservable<T> Repeat<T>(T element, int count)
        {
            var subject = new ReplaySubject<T>();
            foreach (var item in Enumerable.Repeat(element, count))
                subject.OnNext(item);
            subject.OnCompleted();
            return subject;
        }
    }
}

上記で作成したファクトリーメソッドを利用して、次のコードを実行してみます。IObservable<T>の生成補助とIObserver<T>の省略のおかげで、非常に簡潔に記述できていることがわかります。

using System;
 
namespace Sample08_ObservableFactory
{
   static class Program
   {
       static void Main()
       {
           ObservableFactory.Never<double>().SubscribeTracer("Never");
           ObservableFactory.Empty<string>().SubscribeTracer("Empty");
           ObservableFactory.Return(3).SubscribeTracer("Return");
           ObservableFactory.Throw<char>(new Exception()).SubscribeTracer("Throw");
           ObservableFactory.Range(5, 3).SubscribeTracer("Range");
           ObservableFactory.Repeat(2, 4).SubscribeTracer("Repeat");
       }
 
       static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
       {
           Console.WriteLine("----- {0} : Subscribe Before -----", name);
           var disposer = source.Subscribe
           (
               value => Console.WriteLine("{0} : OnNext({1})", name, value),
               error => Console.WriteLine("{0} : OnError({1})", name, error),
               ()    => Console.WriteLine("{0} : OnCompleted", name)
           );
           Console.WriteLine("----- {0} : Subscribe After -----\\n\\n", name);
           return disposer;
       }
   }
}
 
//----- 結果
/*
----- Never : Subscribe Before -----
----- Never : Subscribe After -----
 
 
----- Empty : Subscribe Before -----
Empty : OnCompleted
----- Empty : Subscribe After -----
 
 
----- Return : Subscribe Before -----
Return : OnNext(3)
Return : OnCompleted
----- Return : Subscribe After -----
 
 
----- Throw : Subscribe Before -----
Throw : OnError(System.Exception: 種類 ‘System.Exception’ の例外がスローされました。)
----- Throw : Subscribe After -----
 
 
----- Range : Subscribe Before -----
Range : OnNext(5)
Range : OnNext(6)
Range : OnNext(7)
Range : OnCompleted
----- Range : Subscribe After -----
 
 
----- Repeat : Subscribe Before -----
Repeat : OnNext(2)
Repeat : OnNext(2)
Repeat : OnNext(2)
Repeat : OnNext(2)
Repeat : OnCompleted
----- Repeat : Subscribe After -----
*/

Rx標準機能を利用する

今回は学習のためにIObservable<T>のファクトリーメソッドをいくつか自作しましたが、当然ながらこのようなメソッドはRxのライブラリに標準搭載されています。定義されているところは、System.Reactive.Linq名前空間にあるObservableクラスです。ここでは紹介していない便利なメソッドも多数存在しますので、ぜひ積極的に利用してみてください。

参考記事

@okazuki先生が書いている下記のblog記事では、圧倒的なボリュームでファクトリーメソッドを紹介しています。使い方を知る上で非常に参考になるので、合わせて読むことを推奨します。

次回予告

今回はIObservable<T>の生成を簡単にするファクトリーメソッドについて見てきました。次回はLINQスタイルでの記述について触れてみたいと思います。どんどん面白くなってきましたね。