xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (4) - IObserver<T>の省略

前々回のオブザーバーパターンの例では、IObservable<T>IObserver<T>の具象クラスをそれぞれ実装しました。しかし、送信側はさておき、挙動が変わりやすい受信側がいちいちIObserver<T>を実装するのは現実的ではありません。そこで今回は、IObserver<T>の省略について見ていきます。

解決へのアプローチ

IObserver<T>は以下の3つのメソッドの実装が必要です。そして、これらの中にIObservable<T>の通知に呼応する処理が記述されます。とすれば、これらのメソッドの中に記述するはずの処理をデリゲートとして外部から渡すようなIObserver<T>の具象クラスがあれば、汎用クラスとして使いまわすことができるはずです。

メソッド 説明
OnNext 状態が変化したことを通知します
OnCompleted 状態変化の通知が完了したことを通知します
OnError 何らかのエラーが発生したことを通知します

IObservable<T>のSubscribeメソッドにデリゲートから生成したIObserver<T>の汎用具象クラスを渡せば良いのですが、もう一歩踏み込んで「IObserver<T>をSubscribeメソッドに渡す」という処理も隠蔽してしまいましょう。IObservable<T>を第1引数に取るSubscribe拡張メソッドを作成し、第2引数以降にデリゲートを渡すようにすれば、IObserver<T>の生成を隠蔽、省略することができます

サンプルで確認する

上記のアプローチを簡単なサンプルで確認してみましょう。まず、情報の発信源であるIObservable<T>の具象クラスのコード例を示します。手を抜いている箇所がありますが、今回のサンプルでは不要なので目をつぶってください。

using System;
using System.Collections.Generic;
 
namespace Sample02_AbbrevIObserver
{
    /// <summary>
    /// 情報の発信源としての機能を提供します。
    /// </summary>
    /// <typeparam name="T">発信するデータの型</typeparam>
    class Provider<T> : IObservable<T>
    {
        /// <summary>
        /// オブザーバーのコレクションを保持します。
        /// </summary>
        private readonly LinkedList<IObserver<T>> observers = new LinkedList<IObserver<T>>();
 
        /// <summary>
        /// オブザーバーが通知を受け取ることをプロバイダーに通知します。
        /// </summary>
        /// <param name="observer">通知を受け取るオブジェクト</param>
        /// <returns>プロバイダーが通知の送信を完了する前に、オブザーバーが通知の受信を停止できるインターフェイスへの参照</returns>
        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (!this.observers.Contains(observer))
                this.observers.AddLast(observer);
            return null;    //— 怠惰!
        }
 
        /// <summary>
        /// 情報を通知します。
        /// </summary>
        /// <param name="value">通知する情報</param>
        public void Notify(T value)
        {
            if (value == null || value.Equals(default(T)))
            {
                foreach (var observer in this.observers)
                    observer.OnError(new ArgumentException());
            }
            else
            {
                foreach (var observer in this.observers)
                    observer.OnNext(value);
            }
        }
 
        /// <summary>
        /// 通知を終了します。
        /// </summary>
        public void Completed()
        {
            foreach (var observer in this.observers)
                observer.OnCompleted();
            this.observers.Clear();
        }
    }
}

次に、今回の肝であるデリゲートベースのIObserver<T>の汎用具象クラスを示します。コンストラクタで各メソッド呼び出し時に実行されるデリゲートを設定する、という簡単なものです。

using System;
 
namespace Sample02_AbbrevIObserver
{
    /// <summary>
    /// 情報の受信者としての機能を提供します。
    /// </summary>
    /// <typeparam name="T">受信するデータの型</typeparam>
    class Observer<T> : IObserver<T>
    {
        private readonly Action onCompleted        = null;
        private readonly Action<Exception> onError = null;
        private readonly Action<T> onNext          = null;
 
        /// <summary>
        /// コンストラクタ
        /// </summary>
        /// <param name="onNext">OnNextのときに呼び出されるデリゲート</param>
        /// <param name="onError">OnErrorのときに呼び出されるデリゲート</param>
        /// <param name="onCompleted">OnCompletedのときに呼び出されるデリゲート</param>
        public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            this.onNext      = onNext;
            this.onError     = onError;
            this.onCompleted = onCompleted;
        }
 
        /// <summary>
        /// プロバイダーがプッシュベースの通知の送信を完了したことをオブザーバーに通知します。
        /// </summary>
        public void OnCompleted()
        {
            if (this.onCompleted != null)
                this.onCompleted();
        }
 
        /// <summary>
        /// プロバイダーでエラー状態が発生したことをオブザーバーに通知します。
        /// </summary>
        /// <param name="error">エラーに関する追加情報を提供するオブジェクト</param>
        public void OnError(Exception error)
        {
            if (this.onError != null)
                this.onError(error);
        }
 
        /// <summary>
        /// オブザーバーに新しいデータを提供します。
        /// </summary>
        /// <param name="value">現在の通知情報</param>
        public void OnNext(T value)
        {
            if (this.onNext != null)
                this.onNext(value);
        }
    }
}

最後に、IObserver<T>の生成を隠蔽するためのSubscribe拡張メソッドを示します。デリゲートを受け取り、IObserver<T>の具象クラスを生成して登録しているだけです。

using System;
 
namespace Sample02_AbbrevIObserver
{
    static class ObservableHelper
    {
        /// <summary>
        /// オブザーバーが通知を受け取ることをプロバイダーに通知します。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <param name="source">対象プロバイダー</param>
        /// <param name="onNext">OnNextのときに呼び出されるデリゲート</param>
        /// <param name="onError">OnErrorのときに呼び出されるデリゲート</param>
        /// <param name="onCompleted">OnCompletedのときに呼び出されるデリゲート</param>
        /// <returns>プロバイダーが通知の送信を完了する前に、オブザーバーが通知の受信を停止できるインターフェイスへの参照</returns>
        public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            return source.Subscribe(new Observer<T>(onNext, onError, onCompleted));
        }
    }
}

準備が整ったので、下記のコードを実行してみます。ポイントは、SubscribeメソッドにIObserver<T>の各メソッドに対応したデリゲートを渡しているところです。IObserver<T>が隠蔽され、簡潔に記述できていると思います。

using System;
 
namespace Sample02_AbbrevIObserver
{
    class Program
    {
        static void Main()
        {
            var provider = new Provider<string>();
            provider.Subscribe
            (
                value => Console.WriteLine("OnNext : {0}", value),
                error => Console.WriteLine("OnError : {0}", error.Message),
                ()    => Console.WriteLine("OnCompleted")
            );
            provider.Notify("Test Message 1");
            provider.Notify(null);
            provider.Completed();
            provider.Notify("Test Message 2");
        }
    }
}
 
//—– 結果
/*
OnNext : Test Message 1
OnError : 値が有効な範囲にありません。
OnCompleted
*/

Rx標準機能を利用する

上記のような素敵な汎用化ができましたが、プロジェクト毎にこれを自作するはナンセンスなのでライブラリ化すべきです。多くの方がすでに気付いているとは思いますが、Rxにはデリゲートを受け付けるSubscribe拡張メソッドが多く搭載されています。これらはSystem.Reactive.dllのSystem名前空間.aspx)に定義されていますので、積極的に利用してください。

public static IDisposable Subscribe<T>(this IObservable<T> source);
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext);
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted);
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError);
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted);

コードの見方を変える隠蔽化

上記のように登録するものをIObserver<T>からデリゲートに変更することで、「Subscribeで観測者を登録する」から「Subscribeで値を受けて処理する」というようにコードの見方を変えることができます。これにより、コードの流れとデータの流れ方をうまく一致させられるようになるため、このような記述方法には非常に意味があります。Rxを勉強していて「Subscribeで処理する」とか「Subscribeに値が流れてくる」と言った表現が出てくるのは、このような背景があるからかと思います。

次回予告

今回はSubscribeメソッドでの登録をIObserver<T>からデリゲートに変更することで、記述の簡略化を行う方法について見てきました。また、コードの視点が変わることについても考察しました。次回はSubject<T>に見ていきたいと思います。