読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (15) - スケジューラの利用

Rx

今回で入門記事の連載も (一応名ばかりの) 最後です。今回はスケジューラの利用について見ていきます。残念ながら、これについてはだいぶ自信がありません。今わかっている範囲、そうだろうと思っている範囲で書きます。

スケジューラ

スケジューラとは、処理をいつ/どこで実行するのかを振り分ける役割を持ったものです。スケジューラが間に入ることで、スレッドの切り替えや実行順序の変更などを柔軟に行うことができます。もちろん、Rxはこのスケジューラ上で動作しています。

RxのスケジューラはISchedulerインターフェースで表され、このインターフェースを実装したクラスがいくつも用意されています。以下にいくつか挙げてみます。

スケジューラの種類 説明
CurrentThreadScheduler 現在実行中のスレッド上で処理を行います。処理はキューに登録されたものから順に処理されます。
ImmediateScheduler 現在実行中のスレッド上で処理を行います。処理は即座に実行されます。
NewThreadScheduler 処理をそれぞれ別スレッドで行います。
EventLoopScheduler 指定されたスレッド上で処理を行います。
ThreadPoolScheduler 処理をスレッドプール上で行います。
TaskPoolScheduler 指定されたTaskFactoryを利用して処理を行います。
SynchronizationContextScheduler 指定されたSynchronizationContextに同期して処理を行います。
ControlScheduler 指定されたWindows Formsコントロールのあるスレッドのメッセージループに同期して処理を行います。
DispatcherScheduler 指定されたDispatcherに同期して処理を行います。

上記のように、実行順序を変えたり実行スレッドを同期させたりするためのスケジューラがあります。中にはTaskPoolSchedulerのようにTPLが利用できる環境でのみ使えるものや、ControlSchedulerのようにWindows Formsでしか使えないもの、DispatcherSchedulerのようにWPF/Silverlightでしか使えないものもあるので注意してください。また、いくつかは静的なSchedulerクラスにインスタンス取得プロパティが用意されているので、それを利用すると良いです。

スケジューラの指定

これまでの記事で紹介してきたIObservable<T>のファクトリーメソッドにも、ISchedulerインターフェースを指定できるオーバーロードが含まれるものが多数あります。例えば以下のように指定します。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
 
namespace Sample32_Scheduler
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("Thread ID = {0}\\tMain", Thread.CurrentThread.ManagedThreadId);
            Observable.Range(1, 3, Scheduler.ThreadPool) //--- ThreadPool上で発信
            .Subscribe
            (
                value => Console.WriteLine("Thread ID = {0}\\tOnNext({1})", Thread.CurrentThread.ManagedThreadId, value),
                ()    => Console.WriteLine("Thread ID = {0}\\tOnCompleted", Thread.CurrentThread.ManagedThreadId)
            );
            Console.ReadLine(); //--- 非同期実行になるので終了を待機
        }
    }
}
 
//----- 結果
/*
Thread ID = 1   Main
Thread ID = 3   OnNext(1)
Thread ID = 4   OnNext(2)
Thread ID = 5   OnNext(3)
Thread ID = 5   OnCompleted
*/

この例では、メインスレッドとObservable.Rangeメソッドから発行される値のスレッドが違うことが確認できます。

暗黙に設定されるスケジューラ

IObservable<T>のファクトリーメソッドには、ISchedulerインターフェースを指定しないオーバーロードや、指定できないメソッドがあります。これらは、各メソッドの内部で暗黙にスケジューラが設定されています。例えば以下のような感じです。

ファクトリーメソッド 設定されるスケジューラ
Observable.Range CurrentThreadScheduler
Observable.Timer ThreadPoolScheduler
Observable.ToAsync ThreadPoolScheduler

基本的に、非同期処理用のメソッドThreadPoolSchedulerが、それ以外はCurrentThreadSchedulerが指定されていると考えて良いと思います。

CurrentThreadSchedulerとImmediateSchedulerの違い

CurrentThreadSchedulerImmediateSchedulerはどちらも現在実行中のスレッドで動作します。ふたつの違いは何でしょうか?挙動を知るための簡単なサンプルを示します。

using System;
using System.Reactive.Concurrency;
 
namespace Sample33_CurrentThreadVsImmediate
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("----- CurrentThread -----");
            ScheduleTasks(Scheduler.CurrentThread);
            Console.WriteLine("----- Immediate -----");
            ScheduleTasks(Scheduler.Immediate);
        }
 
        static void ScheduleTasks(IScheduler scheduler)
        {
            Action third = () =>
            {
                Console.WriteLine("#3");
            };
            Action second = () =>
            {
                Console.WriteLine("#2 : Start");
                scheduler.Schedule(third);
                Console.WriteLine("#2 : End");
            };
            Action first = () =>
            {
                Console.WriteLine("#1 : Start");
                scheduler.Schedule(second);
                Console.WriteLine("#1 : End");
            };
            scheduler.Schedule(first);
        }
    }
}
 
//----- 結果
/*
----- CurrentThread -----
#1 : Start
#1 : End
#2 : Start
#2 : End
#3
----- Immediate -----
#1 : Start
#2 : Start
#3
#2 : End
#1 : End
*/

結果より、CurrentThreadSchedulerの方は自身の処理が完了してから次の処理を始めていることが分かります。 (実際の内部実装はわかりませんが) Scheduleメソッドが呼び出されるたびに指定されたデリゲートをキューに登録し、それを順番に消化しているような動きです。片や、ImmediateSchedulerはScheduleメソッドに指定されたデリゲートを即時に実行しています。Scheduleメソッドの中でデリゲートをInvokeしているだけだと思われます。

スレッドの切り替え

Rx入門 (10) - 時間のシーケンス化Rx入門 (12) - 非同期処理のシーケンス化でも少し触れましたが、非同期処理中の通知や結果を受けてUIコンポーネントにアクセスする場合には注意が必要です。UIコンポーネントはUIスレッド上からしかアクセスできないためです。しかし、Observable.Timerなどの通知をそのまま購読した場合、OnNextなどはタイマーのスレッドで実行されます。つまり、そのままではUIコンポーネントにアクセスできないので、実行スレッドを切り替えてやる必要があるというわけです。そして、このスレッド切り替えを行うメソッドObservable.ObserveOnメソッドです。

以下にWPFで作成したデジタル時計のサンプルを示します。まずはXAMLでUIを作ります。時間を表示するためのTextBlockを置いているだけです。

<Window x:Class="Sample34_ObserveOn.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        Title="Sample34_ObserveOn" Height="300" Width="300">
    <TextBlock Name="display" HorizontalAlignment="Center" VerticalAlignment="Center" />
</Window>

次に、コードビハインドにあるコンストラクタでタイマーを実行します。このとき、ObserveOnメソッドをチェインとして挟み込みます。

using System;
using System.Reactive.Linq;
using System.Threading;
using System.Windows;
 
namespace Sample34_ObserveOn
{
    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            this.InitializeComponent();
            Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
            .ObserveOn(SynchronizationContext.Current) //--- 以降はUIスレッドに同期
            .Subscribe(_ => this.display.Text = DateTime.Now.ToString());
        }
    }
}

たったこれだけで完成です。ObserveOnメソッドにはスケジューラではなくSynchronizationContextを指定していますが、内部でSynchronizationContextSchedulerが生成されている (と思われる) ので、結局は同じことです。サンプル中のコメントにも書いてありますが、ObserveOnメソッドは、「以降の処理を指定のスケジューラ、もしくは同期コンテキスト上で実行させる」ものです。こんなに簡単にスレッドの切り替えができるなんて、魔法のようですね。

WPF/Silverlightにおける別記法

WPF/Silverlightの場合、DispatcherSchedulerを利用したスレッドの切り替えが可能です。これには、NuGetもしくはダウンロードサイトからRx-WPF (System.Reactive.Windows.Threading.dll) を取得し、参照設定に追加する必要があります。これにより、次のような記述もできるようになります。

.ObserveOnDispatcher()
.ObserveOn(App.Current.Dispatcher)
.ObserveOn(DispatcherScheduler.Instance)
.ObserveOn(new DispatcherScheduler(App.Current.Dispatcher))

Windows Formsにおける別記法

WPF/Silverlightだけでなく、Windows Formsにも同じように別解があります。これも同様に、NuGetもしくはダウンロードサイトからRx-WinForms (System.Reactive.Windows.Forms.dll) を取得し、参照設定に追加します。これにより、Controlオブジェクトに対して同期させる記述ができるようになります。

.ObserveOn(this) //--- ここでのthisはForm
.ObserveOn(new ControlScheduler(this))

スケジューラを介することによるパフォーマンス低下

スケジューラは高い柔軟性を確保できる反面、パフォーマンスが悪いのも事実です。以下のサンプルは、単純にEnumerable.RangeメソッドObservable.Rangeメソッドで100万回ループしたものです。

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
 
namespace Sample35_ObservableRangePerformance
{
    static class Program
    {
        static void Main()
        {
            int count      = 1000000;
            var collection = Enumerable.Range(1, count);
            var sequence   = Observable.Range(1, count);
            var stopwatch  = new Stopwatch();
 
            stopwatch.Restart();
            foreach (var value in collection);
            stopwatch.Stop();
            Console.WriteLine("Enumerable.Range : {0} [ms]", stopwatch.ElapsedMilliseconds);
 
            stopwatch.Restart();
            sequence.Subscribe();
            stopwatch.Stop();
            Console.WriteLine("Observable.Range : {0} [ms]", stopwatch.ElapsedMilliseconds);
        }
    }
}
 
//----- 結果(例)
/*
Enumerable.Range : 11 [ms]
Observable.Range : 6436 [ms]
*/

1計測結果の通り、Observable.Rangeメソッドの方が600倍程遅いのがわかります。分配やスケジューリングのメリットと、パフォーマンスが悪いというデメリットを十分に検討した上で使う必要がありそうです

参考記事

英語は読めないので参考記事に挙げておきながら本人あまり読んでないですが、きっと良い手助けになると思います。またパフォーマンスの問題については神様達が書いているので、そちらもぜひ参考にしてください。