今回はRxの特徴のひとつであるLINQスタイルでの記述について触れてみます。みんな大好きなLINQ、これができると世界がまた変わります。
情報伝播のイメージ
今回はLINQの中でも特に基本的なWhere/Select文を取り上げてみます。前回の「山から湧き出る水」イメージと組み合わせれば、「山 (IObservable<T>) から水 (情報) が湧き出て、川となって流れて (OnNext) おり、不純物を取り除き (Where)、殺菌/消毒のために微量の塩素を混ぜて流し (Select)、それを汲み上げる (Subscribe)」と言った感じでしょうか。(例に漏れず、勝手なイメージです)
メソッド形式
上記のイメージと合致させたメソッド形式のサンプルを下記に示します。1から10までのデータが流れているうち、Whereメソッドで偶数のものだけを抽出し、Selectメソッドで2乗してからSubscribeへデータを流しています。LINQをご存知の方なら「言わずもがな」と言った感じです。
using System; using System.Reactive.Linq; namespace Sample09_MethodChainLinq { class Program { static void Main() { Observable.Range(1, 10) .Where(value => value % 2 == 0) .Select(value => value * 2) .Subscribe ( value => Console.WriteLine("OnNext({0})", value), () => Console.WriteLine("OnCompleted") ); } } } //----- 結果 /* OnNext(4) OnNext(8) OnNext(12) OnNext(16) OnNext(20) OnCompleted */
重要なのは、IObservable<T>ではデータが上から順に流れていることです。片やIEnumerable<T>によるLINQの場合は、MoveNext/Curretメソッドを利用して芋づる式にデータを取得します。この違いは非常に重要なので、デバッガなどで双方確認してみると良いと思います。
上図は、@neueccさんのblog記事 (Reactive Extensions for .NET (Rx) メソッド探訪第7回:IEnumerable vs IObservable) からご本人の許可を得て拝借させていただきました。ありがとうございます。
クエリ式形式
メソッド形式の記述だけでなく、もちろんのクエリ式による記述も可能です。下記のサンプルは、名前空間の表記を除いて先に紹介したサンプルとIL (中間言語) レベルで同等なものです。
using System; using System.Reactive.Linq; namespace Sample10_QueryExpressionLinq { class Program { static void Main() { (from value in Observable.Range(1, 10) where value % 2 == 0 select value * 2) .Subscribe ( value => Console.WriteLine("OnNext({0})", value), () => Console.WriteLine("OnCompleted") ); } } } //----- 結果 /* OnNext(4) OnNext(8) OnNext(12) OnNext(16) OnNext(20) OnCompleted */
ただし、クエリ式においてはIEnumerable<T>で使えるすべての記法が使えるわけではありません。クエリ式の記法はコンパイラがメソッド形式に変換しているので、対応するメソッドがないとコンパイルエラーになるためです。ですので、対応する拡張メソッドを自作すれば、クエリ式形式での記述にも幅が広がるはずです。(本当にできるかどうかは未確認ですみません...というかできるならメソッド用意されてる気もするけど...)
挙動の確認
次に、Where/Selectメソッドの挙動を確認してみます。Where/Selectメソッドの中にコンソール出力を挿入したり、購読を解除したりしているだけの簡単なサンプルを下記に示します。
using System; using System.Reactive.Linq; using System.Reactive.Subjects; namespace Sample11_AnalyzeWhereSelect { static class Program { static void Main() { var subject = new Subject<int>(); var sequence1 = subject.Where(value => { Console.WriteLine("Where({0})", value); return value % 2 == 0; }); var sequence2 = sequence1.Select(value => { Console.WriteLine("\\tSelect({0})", value); return value * value; }); var disposer = sequence2.Subscribe(value => { Console.WriteLine("\\t\\tOnNext({0})", value); }); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); Console.WriteLine("Dispose Before"); disposer.Dispose(); Console.WriteLine("Dispose After"); subject.OnNext(4); subject.OnCompleted(); subject.OnNext(5); } } } //----- 結果 /* Where(1) Where(2) Select(2) OnNext(4) Where(3) Dispose Before Dispose After */
上記の結果から分かるように、Whereで条件に合わなかったものはSelectに流れていません。逆に、Whereで条件に適合したものはWhere → Select → Subscribeと順に流れていることが確認できます。また、Disposeメソッドを呼び出して購読を解除してからは、データを通知してもWhereにすら情報が流れません。これはSelectとSubscribeの間の購読を解除するだけでなく、最も根元であるSubject<T>とWhereの購読も解除していることを示しています。
参考記事
今回の内容も、@okazuki先生の下記のblog記事でも解説されています。ぜひ、合わせてお読みください。
また、RxのライブラリにはWhere/Selectなどを含むIObservable<T>のシーケンスに対する多数のメソッドが含まれています。具体的にはSystem.Reactive.dllのSystem.Reactive.Linq名前空間にあるObservableクラスに定義されています。あまりに数が多過ぎて、ここでは到底説明しきれません。が、流石@okazuki先生。「Reactive Extensions再入門」で、すべて網羅しようとしてるんじゃないかと思うくらいのボリュームで解説しています。脱帽です。
次回予告
今回は、LINQスタイルを用いたIObservable<T>シーケンスの記述について見てきました。次回は、より理解を深めるためにWhere/Selectメソッドを自作してみたいと思います。