読者です 読者をやめる 読者になる 読者になる

xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

非同期メソッド入門 (11) - Rxとの相互運用

Rx (=Reactive Extensions) は、イベントや非同期処理をLINQの形で統一的に記述できる準標準 (=.NET Frmaeworkに標準搭載されていないMicrosoft純正の) ライブラリです。これまでRxは.NET Framework 3.5 / .NET Framework 4環境下では非同期処理をするための最適解でしたが、C# 5.0でasync/awaitという強力な非同期構文が言語サポートになったこともあり、今後Rxとどう付き合って行くかというのはぜひ把握しておきたい内容です。RxについてはRx入門も合わせて読んでいただけると幸いです。

Rx v2に含まれる.NET Framework 4.5用のdllには、IObservable<T>Taskとの相互運用機能が追加されています。今回は、このRxと非同期メソッドの相互運用性について見て行きます。

TaskをIObservable<T>に変換

Task<T>をIObservable<T>に変換するには、System.Reactive.Threading.Tasks名前空間にあるToObservable拡張メソッドを利用します。以下にその利用例を示します。

//--- Taskの準備
var task = new Task<string>(() =>
{
    Thread.Sleep(2000);
    return "あいうえお";
});
            
//--- IObservable<T>に変換して通知を受ける準備
task.ToObservable().Subscribe
(
    value => Console.WriteLine("OnNext = {0}", value),
    error => Console.WriteLine("OnError = {0}", error.Message),
    ()    => Console.WriteLine("OnCompleted")
);

//--- Task実行
task.Start();

/*
OnNext = あいうえお
OnCompleted
*/

上記の例では、Taskの実行よりも前にIObservable<T>への変換と通知を受ける準備を行っていますが、以下のように順序を入れ替えても問題ありません。つまり、TaskがIObservable<T>の変換よりも先に完了してしまっていても構わないということです。

Task<string>.Run(() =>
{
    Thread.Sleep(2000);
    return "あいうえお";
})
.ToObservable()
.Subscribe
(
    value => Console.WriteLine("OnNext = {0}", value),
    error => Console.WriteLine("OnError = {0}", error.Message),
    ()    => Console.WriteLine("OnCompleted")
);

また、戻り値のないTask型からIObservable<T>に変換した場合は、Tは「何も表さない」を意味するUnit型になります。

IObservable<T>をTaskに変換

IObservable<T>をTask<T>に変換する場合はToTask拡張メソッドを利用します。先に紹介したToObservable拡張メソッドと同様の名前空間/クラスに定義されています。以下にその利用例を示します。

var task = Observable.Range(1, 10)
         .Where(x => x % 2 == 0)
         .Select(x => x * x)
         .ToTask();
Console.WriteLine("Result = {0}", task.Result);

/*
Result = 100
*/

上記の例では、1から始まる連続した10個の整数値 (=Observable.Range) のうち、偶数のものだけ取り出し (=Where)、2乗したもの (=Select) を取得するようになっていますが、結果は最後の1つである100だけになっています。これは、ToTaskメソッドの内部でAsyncSubject<T>を利用していることが理由です。AsyncSubject<T>の挙動はRx入門 (6) - Subject<T>の亜種でも紹介していますので、参考にしてください。

では、IObservable<T>が値をひとつも発行しなかった場合はどうなるでしょうか?この場合はTask<T>からInvalidOperationException例外が返されます。

try
{
    var task = Observable.Empty<int>().ToTask();
    Console.WriteLine("Result = {0}", task.Result);
}
catch (AggregateException ex)
{
    foreach (var inner in ex.InnerExceptions)
    {
        Console.WriteLine("Type = {0}", inner.GetType());
        Console.WriteLine("Message = {0}", inner.Message);
    }
}

/*
Type = System.InvalidOperationException
Message = Sequence contains no elements.
*/

IObservable<T>はawaitable

awaitするためには、インスタンスメソッド/拡張メソッドを問わずGetAwaiterメソッドがあればよいのでした (参照 : 非同期メソッド入門 (8) - コンパイラ要件)。Task型はawait可能なので、IObservable<T>についてもToTask拡張メソッドを経由すればawaitできることわかると思います。しかしやはりイチイチToTaskと書くのは面倒ですよね。ということで、Rx v2のライブラリにはGetAwaiter拡張メソッドが用意されています。

var sequence = ...;  //--- IObservable<T>な何か
var result   = await sequence.Where(...).Select(...);

この場合も、最後の値が結果として返ってきます。

非同期メソッドとRxの使い分け

Rxは時間軸上の値をひとまとまりとして捉えます。IEnumerable<T>に対するLINQをイベントや非同期処理にも拡張したものです。Rxを非同期処理ライブラリとして見たとき、その性質上1つ以上の値を返すことができます。そして、メソッドチェインの形でを統一的に射影/抽出/合成などができます。対して非同期メソッドは単一の値しか扱えず、かつ射影などの操作はできませんが、同期メソッドのような直感的な非同期操作を提供します。以下はこれらの使い分けの目安です。

開発環境 目安
.NET Framework 3.5
Silverlight 4
ほぼほぼRx一択。プロジェクトの都合などでRxを利用できない場合はBackgroundWorkerやThreadPoolを利用。
.NET Framework 4
Silverlight 5
Microsoft.Bcl.Asyncを推奨。複数の値の通知が必要だったり、LINQによる射影/抽出/合成などの操作が必要な場合はRx。Taskで事足りるような場合はTaskで。
.NET Framework 4.5
WinRT
戻り値が単一の値で良いケースではasync/awaitを率先して利用。

async/awaitによってRxがいらない子になったかというとそうではないと思うので、上手に使い分けたいですね。