読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (9) - Where/Selectの自作

前回はLINQの記述を用いて、流れてくるデータをフィルタリング (Where) したり、変換 (Select) したりしました。今回はこれらを自作することで、より理解を深めてみようと思います。

実装へのアプローチ

IObservable<T>IObserver<T>に情報を流すというのが、Rxで最も重要な基本です。伝達は常にこの1対多のペアで行われます。そして、下のわかりにくい図にあるように、WhereやSelectはこの間に差し込まれます。

WhereSelect

注目すべきは、IObservable<T>から流れてくる情報をWhere/Selectの中で一旦IObserver<T>として受け取り、フィルタリングや変換をしたのち、再度IObservable<T>にして後続に繋げていることです。つまり、Where/Select自身が受信者であり、送信者になる必要があるということです。「IObservable<T>はIObserver<T>に対して発信する」という基本に従えば、このような分解になることは想像できると思います。

もうひとつ重要なポイントがあります。それは、Where/Selectが情報を貰う相手 (Source) と、情報を流す相手 (Observer) をそれぞれ知る必要があるということです。通常はSubscribeでIObserver<T>が登録されるまで情報を流す相手は確定しませんが、これをうまく解決するのがのちに紹介するAnonymousObservable<T>です。

これらのポイントを押さえながら実装してみましょう。

事前準備

Where/Selectの実装の下準備として、AnonymousObserver<T>とAnonymousObservable<T>を用意します。AnonymousObserver<T>は、以前「Rx入門 (4) - IObserverの省略」でも紹介した、情報通知時に実行されるデリゲートからIObserver<T>を生成するための汎用クラスです。以下にコードを示します。

using System;
 
namespace Sample12_ImplementWhereSelect
{
    /// <summary>
    /// 情報の受信者としての機能を提供します。
    /// </summary>
    /// <typeparam name="T">受信するデータの型</typeparam>
    class AnonymousObserver<T> : IObserver<T>
    {
        private readonly Action onCompleted        = null;
        private readonly Action<Exception> onError = null;
        private readonly Action<T> onNext          = null;
 
        /// <summary>
        /// コンストラクタ
        /// </summary>
        /// <param name="onNext">OnNextのときに呼び出されるデリゲート</param>
        /// <param name="onError">OnErrorのときに呼び出されるデリゲート</param>
        /// <param name="onCompleted">OnCompletedのときに呼び出されるデリゲート</param>
        public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            if (onNext == null)      throw new ArgumentNullException("onNext");
            if (onError == null)     throw new ArgumentNullException("onError");
            if (onCompleted == null) throw new ArgumentNullException("onCompleted");
            this.onNext      = onNext;
            this.onError     = onError;
            this.onCompleted = onCompleted;
        }
 
        /// <summary>
        /// プロバイダーがプッシュベースの通知の送信を完了したことをオブザーバーに通知します。
        /// </summary>
        public void OnCompleted()
        {
            this.onCompleted();
        }
 
        /// <summary>
        /// プロバイダーでエラー状態が発生したことをオブザーバーに通知します。
        /// </summary>
        /// <param name="error">エラーに関する追加情報を提供するオブジェクト</param>
        public void OnError(Exception error)
        {
            this.onError(error);
        }
 
        /// <summary>
        /// オブザーバーに新しいデータを提供します。
        /// </summary>
        /// <param name="value">現在の通知情報</param>
        public void OnNext(T value)
        {
            this.onNext(value);
        }
    }
}

次はAnonymousObservable<T>です。考え方はAnonymousObserver<T>と同様です。Subscribeメソッドが呼び出されたときに実行するべきデリゲートから、IObservable<T>を生成する汎用クラスです。個人的には、LINQスタイルの拡張メソッドを実装する上での最大のポイントがこれだと思います。以下にその実装を示します。

using System;
 
namespace Sample12_ImplementWhereSelect
{
    /// <summary>
    /// 情報通知のプロバイダーとしての機能を提供します。
    /// </summary>
    /// <typeparam name="T">受信するデータの型</typeparam>
    class AnonymousObservable<T> : IObservable<T>
    {
        private readonly Func<IObserver<T>, IDisposable> subscribe = null;
 
        /// <summary>
        /// コンストラクタ
        /// </summary>
        /// <param name="subscribe">Subscribeのときに呼び出されるデリゲート</param>
        public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
        {
            if (subscribe == null)
                throw new ArgumentNullException("subscribe");
            this.subscribe = subscribe;
        }
 
        /// <summary>
        /// オブザーバーが通知を受け取ることをプロバイダーに通知します。
        /// </summary>
        /// <param name="observer">通知を受け取るオブジェクト</param>
        /// <returns>プロバイダーが通知の送信を完了する前に、オブザーバーが通知の受信を停止できるインターフェイスへの参照</returns>
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return this.subscribe(observer);
        }
    }
}

Whereの実装

下準備ができたので、Whereメソッドの実装を開始します。独自実装なので、ここではメソッド名をMyWhereとします。Whereメソッドのシグネチャは以下の通りで、判定条件のデリゲートを引数にとりIObservable<T>を返す、簡単な拡張メソッドです。

public static IObservable<T> MyWhere<T>(this IObservable<T> source, Func<T, bool> predicate)

Part 1

メソッド内部の実装を行なっていきます。まず、冒頭の図にもあるように、IObservable<T>型の戻り値の実態としてAnonymousObservable<T>を返すようにします。AnonymousObservable<T>のコンストラクタに渡すデリゲートは、Subscribe実行時に呼び出されるものです。ラムダ式の引数になっているobserverが後続の受信者となります。ラムダ式の内部では、クロージャのおかげでsourceとobserverの両方にアクセスすることができます。これが非常に重要なのですが、それを可能にしているのがAnonymousObservable<T>というわけです。とても上手いです。

public static IObservable<T> MyWhere<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    //--- AnonymousObservable<T>を返す
    return new AnonymousObservable<T>(observer =>
    {
        //--- ここはSubscribe実行時に呼び出される
        //--- ここでsourceとobserverに触ることができる!
        return null; //--- とりあえずnull
    });
}

Part 2

Part 1で作成したSubscribeラムダ式の内部を実装していきます。ラムダ式の内部ではsourceから発信される情報を受信しなければなりませんが、ここでも冒頭の図にあるように、AnonymousObserver<T>が受信するようにします。ラムダ式の戻り値はIDisposableなので、sourceのSubscribeメソッドの戻り値をそのまま返すようにします。

AnonymousObserver<T>のコンストラクタには、OnNext/OnError/OnCompleted時に呼び出されるデリゲートを指定します。この引数として入れるデリゲートの中でobserverの対応するメソッドを呼び出すようにします。このデリゲートの中に独自の処理も入れることができますが、とりあえずここではobserverの各メソッドをそのまま呼び出すようにします。

public static IObservable<T> MyWhere<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    return new AnonymousObservable<T>(observer =>
    {
        //--- sourceからの発信をAnonymousObserver<T>で受ける
        return source.Subscribe(new AnonymousObserver<T>(value =>
        {
            //--- ここはOnNext実行時に呼び出される
            //--- ここでobserverへの送信前に処理を差し込むことができる
            observer.OnNext(value);
        },
        observer.OnError,       //--- OnErrorと
        observer.OnCompleted)); //--- OnCompletedは何もせず素通し
    });
}

Part 3

Part 2で作成したAnonymousObserver<T>のOnNextのラムダ式の中で、判定条件をクリアしたとき場合だけobserver.OnNextを呼び出すようにします。ここまででほぼWhereメソッドとして挙動は完成しました。

public static IObservable<T> MyWhere<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    return new AnonymousObservable<T>(observer =>
    {
        return source.Subscribe(new AnonymousObserver<T>(value =>
        {
            //--- 条件に一致したときだけobserverに値を流す
            if (predicate(value))
                observer.OnNext(value);
        },
        observer.OnError,
        observer.OnCompleted));
    });
}

Part 4

最後に、フィルタリング条件判定中に例外が発生した場合の対応を行います。例外が発生した場合はobserverにエラー通知を行うようにしたいので、例外を捕捉したらOnErrorメソッドを呼び出すようにします。また、それ以降は情報の受信を行わないようにしたいので、sourceとAnonymousObserver<T>の関連付けを解除するようにします。解除にはSubscribeメソッドの戻り値を利用する必要がありますが、ここでもクロージャを利用して実装することができます。ちなみに例外処理を行うのはフィルタリング条件の判定デリゲートのみで、後続への通知であるobserver.OnNextメソッドは対象としません。

public static IObservable<T> MyWhere<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    return new AnonymousObservable<T>(observer =>
    {
        //--- OnNext内で関連付け解除ができるようにする (クロージャ万歳!
        IDisposable disposer = null;
        disposer = source.Subscribe(new AnonymousObserver<T>(value =>
        {
            try
            {
                //--- フィルタリング処理中の発生し得る例外に対応
                if (!predicate(value))
                    return;
            }
            catch (Exception error)
            {
                observer.OnError(error); //--- 例外が飛んだ場合はOnErrorを呼び出し
                disposer.Dispose();  //--- 関連付けを解除する
                return;
            }
            observer.OnNext(value); //--- こっちは例外処理をしない
        },
        observer.OnError,
        observer.OnCompleted));
        return disposer;
    });
}

完成形

ここまででWhereメソッドの実装はおしまいです。以下に完成形を示します。AnonymousObserver<T>は、「Rx入門 (4) - IObserverの省略」で紹介したデリゲートを受け取るSubscribeメソッドに置き換えることができるので、下記ではそのようにしています。また、SelectメソッドはWhereメソッドとほとんど同じなので説明は割愛します。

using System;
 
namespace Sample12_ImplementWhereSelect
{
    static class ObservableHelper
    {
        /// <summary>
        /// オブザーバーが通知を受け取ることをプロバイダーに通知します。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <param name="source">対象プロバイダー</param>
        /// <param name="onNext">OnNextのときに呼び出されるデリゲート</param>
        /// <param name="onError">OnErrorのときに呼び出されるデリゲート</param>
        /// <param name="onCompleted">OnCompletedのときに呼び出されるデリゲート</param>
        /// <returns>プロバイダーが通知の送信を完了する前に、オブザーバーが通知の受信を停止できるインターフェイスへの参照</returns>
        public static IDisposable MySubscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            return source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));
        }
 
        /// <summary>
        /// 指定の条件を用いてフィルタリングを行います。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <param name="source">対象プロバイダー</param>
        /// <param name="predicate">通過条件</param>
        /// <returns>結果のプロバイダー</returns>
        public static IObservable<T> MyWhere<T>(this IObservable<T> source, Func<T, bool> predicate)
        {
            return new AnonymousObservable<T>(observer =>
            {
                IDisposable disposer = null;
                disposer = source.MySubscribe(value =>
                {
                    try
                    {
                        if (!predicate(value))
                            return;
                    }
                    catch (Exception error)
                    {
                        observer.OnError(error);
                        disposer.Dispose();
                        return;
                    }
                    observer.OnNext(value);
                },
                observer.OnError,
                observer.OnCompleted);
                return disposer;
            });
        }
 
        /// <summary>
        /// 指定の変換器を用いてデータの変換を行います。
        /// </summary>
        /// <typeparam name="T">変換前のデータの型</typeparam>
        /// <typeparam name="TResult">変換後のデータの型</typeparam>
        /// <param name="source">対象プロバイダー</param>
        /// <param name="selector">データ変換関数</param>
        /// <returns>結果のプロバイダー</returns>
        public static IObservable<TResult> MySelect<T, TResult>(this IObservable<T> source, Func<T, TResult> selector)
        {
            return new AnonymousObservable<TResult>(observer =>
            {
                IDisposable disposer = null;
                disposer = source.MySubscribe(value =>
                {
                    var result = default(TResult);
                    try
                    {
                        result = selector(value);
                    }
                    catch (Exception error)
                    {
                        observer.OnError(error);
                        disposer.Dispose();
                        return;
                    }
                    observer.OnNext(result);
                },
                observer.OnError,
                observer.OnCompleted);
                return disposer;
            });
        }
    }
}

ここまで、順序立ててWhere/Selectメソッドの内部実装について説明してきました。Rxライブラリの中で本当に同じ実装が行われているかは不確かですが、いろいろなケースで挙動を確認してみてもすべて同じ結果になりますので、ほぼほぼ同じ実装だろうと推測します。

参考記事

今回の記事は@neueccさんの下記のblog記事の一部と全く同じ内容です。「自身の勉強のため」ということで、パクり & 1年半遅れではありますが記事にさせていただきました。下記の記事は本当に参考になるので、ぜひご覧ください。

次回予告

今回はWhere/Selectの内部実装について確認しました。非常に混乱しますが、分かってくると「上手いなー」と感嘆できるのではないかと思います。次回は一定時間間隔で情報を発行するIObservable<T>について見て行きたいと思います。