xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (23) - メソッドチェイン中の例外

今回はメソッドチェイン中に例外が発生した場合について見ていきます。これまでの連載中は基本的に例外は起こらないものとして解説してきたので、OnErrorメソッドはほとんど使っていませんでした。とは言え例外発生時の挙動について知っておくことは重要なので、今回取り上げてみることにしました。

サンプルコード

まず、メソッドチェイン中に例外が発生する簡単なサンプルを以下に示します。ものすごく意図的なサンプルになっていますが、そこは目をつぶってください。

using System;
using System.Reactive.Linq;
 
namespace Sample42_ErrorOnMethodChain
{
    class Program
    {
        static void Main()
        {
            Observable.Range(1, 10)
            .Where(value =>
            {
                Console.WriteLine("Where({0})", value);
                if (value >= 3)
                    throw new Exception("エラー発生!");
                return value <= 5;
            })
            .Select(value =>
            {
                Console.WriteLine("\\tSelect({0})", value);
                return value * value;
            })
            .Subscribe
            (
                value => Console.WriteLine("\\t\\tOnNext({0})", value),
                error => Console.WriteLine("\\t\\tOnError({0})", error.Message),
                ()    => Console.WriteLine("\\t\\tOnCompleted")
            );
        }
    }
}
 
//----- 結果
/*
Where(1)
        Select(1)
                OnNext(1)
Where(2)
        Select(2)
                OnNext(4)
Where(3)
                OnError(エラー発生!)
*/

Whereメソッドでエラーが発生したとき、Subscribeメソッドに渡したOnErrorが処理されていることが分かります。このとき、Selectメソッドがスキップされていることと、OnCompletedが実行されずにシーケンスが終了していることも確認できます。

内部実装で確認する

上記の挙動はRx入門 (9) - Where/Selectの自作のWhere/Selectの内部実装を見ていただければ分かると思います。内部実装を以下に再掲します。

using System;
 
namespace Sample12_ImplementWhereSelect
{
    static class ObservableHelper
    {
        /// <summary>
        /// オブザーバーが通知を受け取ることをプロバイダーに通知します。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <param name="source">対象プロバイダー</param>
        /// <param name="onNext">OnNextのときに呼び出されるデリゲート</param>
        /// <param name="onError">OnErrorのときに呼び出されるデリゲート</param>
        /// <param name="onCompleted">OnCompletedのときに呼び出されるデリゲート</param>
        /// <returns>プロバイダーが通知の送信を完了する前に、オブザーバーが通知の受信を停止できるインターフェイスへの参照</returns>
        public static IDisposable MySubscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            return source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));
        }
 
        /// <summary>
        /// 指定の条件を用いてフィルタリングを行います。
        /// </summary>
        /// <typeparam name="T">データの型</typeparam>
        /// <param name="source">対象プロバイダー</param>
        /// <param name="predicate">通過条件</param>
        /// <returns>結果のプロバイダー</returns>
        public static IObservable<T> MyWhere<T>(this IObservable<T> source, Func<T, bool> predicate)
        {
            return new AnonymousObservable<T>(observer =>
            {
                IDisposable disposer = null;
                disposer = source.MySubscribe(value =>
                {
                    try
                    {
                        if (!predicate(value))
                            return;
                    }
                    catch (Exception error)
                    {
                        observer.OnError(error);
                        disposer.Dispose();
                        return;
                    }
                    observer.OnNext(value);
                },
                observer.OnError,
                observer.OnCompleted);
                return disposer;
            });
        }
 
        /// <summary>
        /// 指定の変換器を用いてデータの変換を行います。
        /// </summary>
        /// <typeparam name="T">変換前のデータの型</typeparam>
        /// <typeparam name="TResult">変換後のデータの型</typeparam>
        /// <param name="source">対象プロバイダー</param>
        /// <param name="selector">データ変換関数</param>
        /// <returns>結果のプロバイダー</returns>
        public static IObservable<TResult> MySelect<T, TResult>(this IObservable<T> source, Func<T, TResult> selector)
        {
            return new AnonymousObservable<TResult>(observer =>
            {
                IDisposable disposer = null;
                disposer = source.MySubscribe(value =>
                {
                    var result = default(TResult);
                    try
                    {
                        result = selector(value);
                    }
                    catch (Exception error)
                    {
                        observer.OnError(error);
                        disposer.Dispose();
                        return;
                    }
                    observer.OnNext(result);
                },
                observer.OnError,
                observer.OnCompleted);
                return disposer;
            });
        }
    }
}

Where/SelectメソッドをMyWhere/MySelectメソッドに置き換えたと思ってください。MyWhereメソッドの引数に渡されたデリゲートの評価はtryで囲われています。ここで発生した例外はすべて捕捉され、関連付けられている購読者のOnErrorメソッドを呼び出し、シーケンスとの関連付けを切っています。ここで呼び出されたOnErrorメソッドは、MySelectメソッドのOnErrorメソッドです。そしてMySelectメソッドのOnErrorメソッドはSubscribeメソッドのOnErrorメソッドを呼び出すようになっています。このように、発生した例外は以降最後のSubscribeメソッドまで素通しするようになっているため、途中のメソッドに渡したデリゲートが評価されないというわけです。また、例外発生時にシーケンスとの接続を強制的に解除しているため、OnCompletedメソッドが呼び出されずに終了します。

IObservable<T>シーケンスの終了

Rxでは、IObservable<T>シーケンスの終了はOnCompletedもしくはOnErrorが発生したとき、と決まっています。Rxのライブラリ内ではすべてそうなっているので、独自の拡張メソッドを作る場合もそのルールに従う必要があります。なぜこのようなルールになっているかは連載:Reactive Extensions(Rx)入門 - 第2回 イベント・プログラミングとRxに分かりやすく書いてあるので、ぜひ参考にしてください。