読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Rx入門 (12) - 非同期処理のシーケンス化

Rx

今回は、同期メソッドを非同期に実行し、それをIObservable<T>シーケンスとして受ける方法や、非同期処理として提供されているメソッドをIObservable<T>シーケンスに変換する方法ついて見ていきたいと思います。これが使えるようになれば、きっと非同期処理も怖くなくなります!

非同期シーケンスの生成

Rxには非同期シーケンスを作成する方法がいくつか用意されています。まず最初に、Observable.ToAsyncメソッドを利用した、通常の同期メソッドから非同期シーケンスを作成する方法を紹介します。以下にそのサンプルを示します。

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading;
 
namespace Sample19_ToAsync
{
   static class Program
   {
       static void Main()
       {
           //--- 分かりにくいので型を明記 & 分割して記述
           Func<IObservable<Unit>> asyncFunc = Observable.ToAsync(() =>
           {
               Console.WriteLine("lamda has been called");
               Thread.Sleep(1000); //--- 長い処理のつもり
           });
           IObservable<Unit> sequence = asyncFunc(); //--- 非同期処理開始!
           sequence.SubscribeTracer("A");
           Thread.Sleep(2000); //--- 非同期処理が終わるのを待機
           sequence.SubscribeTracer("B");
       }
 
       static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
       {
           Console.WriteLine("----- {0} : Subscribe Before -----", name);
           var unsbscriber = source.Subscribe
           (
               value => Console.WriteLine("{0} : OnNext({1})", name, value),
               ()    => Console.WriteLine("{0} : OnCompleted", name)
           );
           Console.WriteLine("----- {0} : Subscribe After -----", name);
           return unsbscriber;
       }
   }
}
 
//—– 結果(例)
/*
lamda has been called
----- A : Subscribe Before -----
----- A : Subscribe After -----
A : OnNext(System.Reactive.Unit)
A : OnCompleted
----- B : Subscribe Before -----
B : OnNext(System.Reactive.Unit)
B : OnCompleted
----- B : Subscribe After -----
*/

Observable.ToAsyncメソッドは、デリゲートを引数としてIObservable<T>を返すデリゲートを生成します。引数に渡すこのデリゲートが非同期的に実行されるメソッドです。ここでは単純なラムダ式を渡しています。戻り値のデリゲートを実行することで非同期処理が開始され、その処理に対するIObservable<T>シーケンスが取得できるようになります。IObservable<T>シーケンスが取得できれば、あとはいつも通りSubscribeメソッドで終了時の処理を記述するだけです。

非同期シーケンスの挙動

非同期シーケンスは、OnCompletedメソッドが呼び出されるまでを非同期処理として表します。非同期処理実行中にSubscribeメソッドが呼び出され完了通知先が決定している場合 (購読者A) は、OnCompletedメソッドによる完了通知が行われる直前に戻り値をOnNextメソッドで通知します。また、すでに非同期処理が完了している状態でSubscribeメソッドが呼び出された場合 (購読者B) は、即座にOnNextメソッドとOnCompletedメソッドが呼び出されます。このときOnNextメソッドで通知されるのは、非同期処理終了時に発行されたキャッシュされた戻り値となります。

上記のサンプルで言うと、Aは非同期処理実行中に、Bは非同期処理終了後にSubscribeされたものです。

Unit構造体

突如IObservable<Unit>として出てきたのはSystem.Reactive名前空間に定義されているUnit構造体です。この型は、このオブジェクトが「何でもない」ことを表します。つまり、意味的にはvoidと同じです。今回のサンプルでは、非同期処理の戻り値はありません。しかし、IObservable<T>IObserver<T>を用いたオブザーバーパターンの規約では、OnNextメソッドを利用して何らか戻り値としての情報を送信しなければなりません。ジェネリック型としてvoid型を定義できないという制約もあるため、void型を表す別の型を用意し、それを通知するようになっています。

引数と戻り値を利用する

非同期処理に引数でデータを渡したり非同期処理から戻り値を受ける場合は、Observable.ToAsyncメソッドオーバーロードを利用すると良いです。以下にサンプルを示します。非同期処理の実行時に引数を与えて実行し、非同期処理の結果として戻り値を返すだけの簡単なものです。

using System;
using System.Reactive.Linq;
using System.Threading;
 
namespace Sample20_ReturnableToAsync
{
    class Program
    {
        static void Main()
        {
            Observable.ToAsync<double, double, double>((x, y) =>
            {
                Thread.Sleep(1000);
                return Math.Pow(x, y);
            })
            .Invoke(2, 4) //--- InvokeメソッドでもOK
            .Subscribe
            (
                value => Console.WriteLine("OnNext({0})", value),
                ()    => Console.WriteLine("OnCompleted")
            );
            Thread.Sleep(2000);
        }
    }
}
 
//----- 結果
/*
OnNext(16)
OnCompleted
*/

最初の例ではデリゲートを直接「()」で実行しましたが、上記のようにInvokeメソッドを利用して書くこともできます。分かりやすいと思う方をお好みでご利用ください。

また、Observable.ToAsyncメソッドオーバーロードは非常に多く、引数が16個もあるメソッドにまで対応しているようです。(そんなに引数が多いのは設計としてオカシイ気がしないでもないので、個人的には存在意義が不明ですが...)

BeginXxx/EndXxxからの生成

よくあるBeginXxx/EndXxxメソッドのペアによる非同期処理からIObservable<T>シーケンスを生成する場合は、Observable.FromAsyncPatternメソッドを利用します。以下に簡単なサンプルを示します。

using System;
using System.Reactive.Linq;
using System.Threading;
 
namespace Sample21_FromAsyncPattern
{
    class Program
    {
        static void Main()
        {
            Func<double, double, double> AsyncAtan2 = (x, y) =>
            {
                Console.WriteLine("function has been called");
                Thread.Sleep(1000); //--- 長い処理のつもり
                return Math.Atan2(y, x);
            };
            Observable.FromAsyncPattern<double, double, double>(AsyncAtan2.BeginInvoke, AsyncAtan2.EndInvoke)
            .Invoke(1, Math.Sqrt(3)) //--- これもToAsync同様Invokeで実行するタイプ
            .Select(radian => radian * 180 / Math.PI) //--- [rad]から[deg]へ変換
            .Subscribe
            (
                degree => Console.WriteLine("OnNext({0})", degree),
                ()     => Console.WriteLine("OnCompleted")
            );
            Thread.Sleep(2000);
        }
    }
}
 
//----- 結果
/*
function has been called
OnNext(60)
OnCompleted
*/

上記のように、IAsyncResultインターフェースを隠蔽したスマートな形で記述することができます。ここでは簡単のためにBeginInvoke/EndInvokeを利用しましたが、例えばStreamクラスのBeginRead/EndReadメソッドなどを指定することもできます。

最も簡単な記述方法

Observable.ToAsyncメソッドは、戻り値がデリゲートということもあり非同期処理の開始タイミングを柔軟に制御することが可能です。ですが、特に開始のタイミングを制御する必要がない場合はObservable.Startメソッドを利用すると楽です。以下に簡単なサンプルを示します。

using System;
using System.Reactive.Linq;
using System.Threading;
 
namespace Sample22_Start
{
    class Program
    {
        static void Main()
        {
            Observable.Start(() =>
            {
                Console.WriteLine("lamda has been called");
                Thread.Sleep(1000);
            })
            .Subscribe
            (
                value => Console.WriteLine("OnNext({0})", value),
                ()    => Console.WriteLine("OnCompleted")
            );
            Thread.Sleep(2000);
        }
    }
}
 
//----- 結果
/*
lamda has been called
OnNext(System.Reactive.Unit)
OnCompleted
*/

すでにお分かりかもしれませんが、Observable.StartメソッドはObservable.ToAsync(...).Invoke()のショートカットです。内容が単純であるならば、これを使うと良いと思います。

非同期処理の開始タイミングをSubscribe実行時にする

上で「Observable.ToAsyncメソッドは非同期処理の開始タイミングを柔軟に制御可能」と書きましたが、Invokeメソッドで非同期処理を開始してからしかSubscribeメソッドの呼び出しができないという問題があります。非同期処理の開始が必ずSubscribeより先になって困るケースがあるかもしれません。Subscribeメソッド実行時まで非同期処理を遅延させたいという場合は、以下のようにObservable.DeferメソッドObservable.Startメソッドを組み合わせると良いです。

using System;
using System.Reactive.Linq;
using System.Threading;
 
namespace Sample23_DeferStart
{
    static class Program
    {
        static void Main()
        {
            var sequence = Observable.Defer(() => Observable.Start(() =>
            {
                Console.WriteLine("lamda has been called");
                Thread.Sleep(500);
                return 12345;
            }));
            sequence.SubscribeTracer("A");
            sequence.SubscribeTracer("B");
            Thread.Sleep(1000); //--- 非同期処理の完了待ち
        }
 
        static IDisposable SubscribeTracer<T>(this IObservable<T> source, string name)
        {
            Console.ReadLine(); //--- キー押下で開始
            Console.WriteLine("----- {0}. Subscribe -----", name);
            return source.Subscribe
            (
                value => Console.WriteLine("{0}. OnNext({1})", name, value),
                ()    => Console.WriteLine("{0}. OnCompleted", name)
            );
        }
    }
}
 
//----- 結果
/*
----- A : Subscribe -----
lamda has been called
A : OnNext(12345)
A : OnCompleted
 
----- B : Subscribe -----
lamda has been called
B : OnNext(12345)
B : OnCompleted
*/

Observable.Deferメソッドは、引数で指定したデリゲートをSubscribeメソッドが呼び出された時に実行します。この機能を利用することで、非同期処理の開始をSubscribe実行時まで遅延させることができます。この組み合わせパズルのような書き方も、Rxの特徴のひとつかもしれません。

参考記事

今回の内容も多くの先人が記事を残しています。どれも参考になるので、ぜひ眺めてみてください。

次回予告

これまで3回に渡って時間/イベント/非同期処理をIObservable<T>に変換する方法について見てきました。一旦IObservable<T>になってしまえば、あとはLINQの記法を使って宣言的に、やりたいことをやりたいように記述できます。それがイベントであっても、非同期処理であっても!実に気持ちが良いですね!ただし、非同期処理なので動作しているスレッドがUIスレッドとは異なることには注意が必要です。このあたりは、また次の機会に触れてみたいと思います。次回はIObservable<T>の重要な性質であるHotとColdについて見ていきます。