xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (5) - Subject<T>の利用

前回IObserver<T>の生成を隠蔽し、より簡略化した記法を学びました。今回はさらにIObservable<T>も簡略化してみます。

一人二役のSubject<T>

Rxのライブラリ (System.Reactive.dll) にあるSystem.Reactive.Subjects名前空間の中に、Subject<T>というクラスがあります。これはIObservable<T>とIObserver<T>の両方を実装した、言わば一人二役なクラスです (実際にはISubject<T>を実装しています)。「ひとつのクラスでオブザーバーパターン?」と思われるかもしれませんが、まずはサンプルを見てください。

using System;
using System.Reactive.Subjects;
 
namespace Sample03_Subject
{
    static class Program
    {
        static void Main()
        {
            var subject   = new Subject<int>();
            var disposerA = subject.SubscribeTracer("A");
            subject.OnNext(1);
            var disposerB = subject.SubscribeTracer("B");
            subject.OnNext(2);
            disposerA.DisposeTracer("A");
            subject.OnNext(3);
            subject.OnCompleted();
            var disposerC = subject.SubscribeTracer("C");
            subject.OnNext(4);
        }
 
        static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
        {
            Console.WriteLine("----- {0} : Subscribe Before -----", name);
            var disposer = source.Subscribe
            (
                value => Console.WriteLine("{0} : OnNext({1})", name, value),
                ()    => Console.WriteLine("{0} : OnCompleted", name)
            );
            Console.WriteLine("----- {0} : Subscribe After -----", name);
            return disposer;
        }
 
        static void DisposeTracer(this IDisposable source, string name)
        {
            Console.WriteLine("----- {0} : Dispose Before -----", name);
            source.Dispose();
            Console.WriteLine("----- {0} : Dispose After -----", name);
        }
    }
}
 
//----- 結果
/*
----- A. Subscribe Before -----
----- A. Subscribe After -----
A. OnNext : 1
----- B. Subscribe Before -----
----- B. Subscribe After -----
A. OnNext : 2
B. OnNext : 2
----- A. Dispose Before -----
----- A. Dispose After -----
B. OnNext : 3
B. OnCompleted
----- C. Subscribe Before -----
C. OnCompleted
----- C. Subscribe After -----
*/

OnNextOnCompletedを呼び出すと、Subscribeメソッドで登録したデリゲートが実行されていることが確認できます。もちろん、Disposeで登録解除したものには通知が飛びませんし、OnCompletedが呼び出された後も通知は飛びません。また、サンプルには載せていませんが、OnErrorメソッドを呼び出すとSubscribeメソッドでの登録はすべて解除されます。

このように、Subject<T>を利用すると値の発行における挙動確認をお手軽に行うことができますので、ぜひ覚えておいてください。

Subject<T>の解剖

上のSubject<T>のサンプルを見て、「は?どーゆーこと?メダパニなう...」と思った方もいらっしゃるかもしれません。少なくとも私はそうでした。それは、前回紹介したIObserver<T>の省略が行われていることに気付いていなかったとこと、Subject<T>の内部実装を想像していなかったからでした。迷える仔羊がアハ体験できるように、Subject<T>の内部実装を見てみたいと思います。

以下に自前実装したSubject<T>を示します。大体下記のような実装になっているのではないかと推測します。

using System;
using System.Collections.Generic;
 
namespace Sample04_ImplementSubject
{
    /// <summary>
    /// 自作のSubjectを表します。
    /// </summary>
    /// <typeparam name="T">データの型</typeparam>
    class MySubject<T> : IObservable<T>, IObserver<T>
    {
        /// <summary>
        /// 購読を解除する機能を提供します。
        /// </summary>
        /// <typeparam name="U">データの型</typeparam>
        private class Unsubscriber<U> : IDisposable
        {
            /// <summary>
            /// オブザーバーのコレクションを保持します。
            /// </summary>
            private readonly LinkedList<IObserver<U>> observers = null;
 
            /// <summary>
            /// オブザーバーを保持します。
            /// </summary>
            private readonly IObserver<U> observer = null;
 
            /// <summary>
            /// コンストラクタ
            /// </summary>
            /// <param name="observers">オブザーバーを保持しているコレクション</param>
            /// <param name="observer">オブザーバー</param>
            public Unsubscriber(LinkedList<IObserver<U>> observers, IObserver<U> observer)
            {
                this.observers = observers;
                this.observer  = observer;
            }
 
            /// <summary>
            /// 使用していたリソースを解放します。
            /// </summary>
            public void Dispose()
            {
                if (this.observers.Contains(this.observer))
                    this.observers.Remove(this.observer);
            }
        }
 
        /// <summary>
        /// 完了したかどうかを保持します。
        /// </summary>
        private bool isCompleted = false;
 
        /// <summary>
        /// 発生した例外を保持します。
        /// </summary>
        private Exception error = null;
 
        /// <summary>
        /// オブザーバーのコレクションを保持します。
        /// </summary>
        private readonly LinkedList<IObserver<T>> observers = new LinkedList<IObserver<T>>();
 
        /// <summary>
        /// オブザーバーが通知を受け取ることをプロバイダーに通知します。
        /// </summary>
        /// <param name="observer">通知を受け取るオブジェクト</param>
        /// <returns>プロバイダーが通知の送信を完了する前に、オブザーバーが通知の受信を停止できるインターフェイスへの参照</returns>
        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (!this.isCompleted)
            {
                if (!this.observers.Contains(observer))
                    this.observers.AddLast(observer);
            }
            else if (this.error == null)
            {
                observer.OnCompleted();
            }
            else
            {
                observer.OnError(error);
            }
            return new Unsubscriber<T>(this.observers, observer);
        }
 
        /// <summary>
        /// プロバイダーがプッシュベースの通知の送信を完了したことをオブザーバーに通知します。
        /// </summary>
        public void OnCompleted()
        {
            if (this.isCompleted)
                return;
 
            foreach (var observer in this.observers)
                observer.OnCompleted();
            this.observers.Clear();
            this.isCompleted = true;
        }
 
        /// <summary>
        /// プロバイダーでエラー状態が発生したことをオブザーバーに通知します。
        /// </summary>
        /// <param name="error">エラーに関する追加情報を提供するオブジェクト</param>
        public void OnError(Exception error)
        {
            if (this.isCompleted)
                return;
 
            foreach (var observer in this.observers)
                observer.OnError(error);
            this.observers.Clear();
            this.error       = error;
            this.isCompleted = true;
        }
 
        /// <summary>
        /// オブザーバーに新しいデータを提供します。
        /// </summary>
        /// <param name="value">現在の通知情報</param>
        public void OnNext(T value)
        {
            if (!this.isCompleted)
                foreach (var observer in this.observers)
                    observer.OnNext(value);
        }
    }
}

上記のように、OnNextなどの通知メソッドの中で、Subscribeで登録されたすべてのIObserver<T>の通知メソッドを呼び出しているのがポイントです。非常にうまいやり方ですが、ちょっと混乱しますね。

次回予告

今回はSubject<T>の使い方を確認しました。また、Subject<T>を自作し、その内部実装について見てみました。次回はSubject<T>の亜種について見て行きたいと思います。