!!!!!!この記事は古い情報を扱っています!!!!!!
Akka 2.3 からは Pipeline はなくなっています。この記事の内容は古いです。気をつけてください
前回までのあらすじ
自信満々で日本一わかりやすい Akka の IOManager と SocketHandle(やServerHandle)、それに IO.Iteratee と IO.IterateeRef というブログ記事を書いたしんぺい a.k.a. 猫型蓄音機。しかし彼を待ち受けていたのは、「それ 2.2 から deprecated になるよ」という無慈悲な現実であった。Akka 2.2 から使われることになる akka.io と Pipeline という新しい敵が待ち受ける中、しんぺいは日本一わかりやすい記事を書けるのだろうか?
という感じでした。それでは書いていきます。Akkaのバージョンは2.2.0-RC1です。
IOManager の役割をするひとが IO(Tcp) とか IO(Udp) とかになった
これはけっこう簡単。前回の記事 を見ればすぐにわかると思います。大きな変更点として、今まではメソッド呼び出しで connect や listen をおこなっていたけれど、2.2 からは IO(Tcp) に対して Connect メッセージや Bind メッセージを送る形になったというのと、socketHandleなどを経由せずに直接IOアクターにコマンドを送る方式になりました。これにより、実際の IO は裏に隠れた IO アクターがやってくれているんだなということが一目でわかるようになり、非常にいい感じですね。
IO.Iteratee や IO.IterateeRef が担っていた役割は Pipeline に取って代わられた
「入力されたやつをごにょごにょしてアプリで使える形にする」のに便利に使える仕組みが IO.Iteratee や IO.IterateeRef でありましたが、こんどはそれは Pipeline に取って代わられました。この記事では Pipeline について重点的に説明します。
Pipeline はどんな問題を解決するのか
さて、ネットワークプログラミングでめんどくさいことのうちのひとつに、「パケットはそのままアプリで使いたいわけではない」というのがあります。クライアントから送られてきたパケットはたんなるバイト列です。そのバイト列を、論理的な単位で分け(行指向の通信プロトコルならば CRLF で分けることになるでしょう)、さらにその内容をパースしてアプリケーションロジックの中で使いやすい、抽象度の高いオブジェクトの形にしてあげる必要が出てきたりするでしょう。
あるいは、アプリケーションからクライアントに何かメッセージを送りたい、というとき、アプリケーションでは抽象度の高いオブジェクトで表現していたものを、ネットワークに流すためにバイト列にシリアライズする必要がありますね。
こういう部分を担う役割をするのが、Pipeline です。Pipeline に関連する重要な登場人物は、「パイプライン」と「パイプラインステージ」です。
Pipeline の概要
図で説明しましょう。
緑のパイプみたいなやつが、「パイプライン」です。そしてこのパイプラインを2本合わせたものが、パイプラインステージです。黄色い四角で囲まれてる部分ですね。
左のパイプが「イベント」が通ってくるパイプラインです。イベントとは何でしょうか。それは、「アプリケーションの外からシステムに入ってきたデータ」のことです。今回で言えば、クライアントから送られてきた生のデータが「イベント」として扱われ、左のパイプに入ってきます。パイプラインの中では、このイベントとして送られてきたデータに対して、なんらかの変換処理を行います。変換処理を行われたデータは、そのままパイプの外に出て行きます。
ちなみに、この変換部分ではデータをバッファすることもできるので、「イベントで送られてきたデータが『きりのいいところ』(たとえば改行コードとか)まで溜まるまで待って、溜まったデータをひとつのものに変換して外に出す」みたいなことができます。これは言い換えると、「イベントとしてひとつのデータが入ってきたからといって、ひとつの変換されたデータが外に出て行くとは限らない」ということです。100個イベントとして入ってきたデータをひとつにつなぎ合わせてから外に出す、みたいなことも可能、あるいは、ひとつのイベントで入ってきたデータをたくさんのデータに変換して外に出すということも可能、ということです。
右のパイプは「コマンド」が通って行くパイプです。コマンドとは何でしょうか。コマンドというのは、「アプリケーションの中から外に出したいデータ」です。イベントと同じく、アプリケーションがこのパイプラインに対して「コマンド」としてデータを書き込むと、パイプラインの中で変換処理が走り、変換されたデータがパイプの外に出て行きます。
このとき注意してほしいのが、この変換処理はイベントとコマンドで「逆のこと」をする必要があるということです。例えば、イベント側のパイプが String
を Seq[String]
に変換する処理を行っているならば、コマンド側のパイプはSeq[String]
を String
に変換する処理を行う必要があります。
ちなみに、この「パイプラインステージ」は、複数をがっちゃんこすることが可能です。
たとえば、「イベントが通るパイプが ByteString
--> Seq[String]
の変換を行い、コマンドが通るパイプがその逆を行う」パイプラインステージの上に、「イベントが通るパイプが Seq[String]
--> SomeObject
の変換を行い、コマンドが通るパイプが その逆の変換を行う」パイプラインステージをつなげることができます。
この場合、アプリケーションの外から入ってきた ByteString
はまずSeq[String]
に変換され、その結果が次のパイプラインステージに渡され、Seq[String]
から SomeObject
に変換されます。アプリケーションが外に出そうとする「コマンド」についても、逆のことが行われるわけですね。
さて、こうしてパイプラインの中で何が起こっているのかを見てきましたが、それでは最終的に変換が終わってパイプラインから出ていくデータは、どこに届くのでしょうか? あるいは、そもそもパイプラインにデータを流し込むにはどうすればいいのでしょうか? パイプラインステージには、そのためのインターフェイスが用意されています。つまり、出来上がったパイプラインステージにデータを流し込むためのメソッドと、パイプラインから出てくるデータをハンドルするためのハンドラを設定する方法が用意されています。具体的な例はあとで見てみましょう。
では、このパイプライン、実際にはどんなふうに使えばいいのでしょうか?その例を見てみましょう。例えば、今ここに ByteString
を SomeObject
に変換/ SomeObject
を ByteString
に変換するパイプを持ったパイプラインステージがあるとします。わたしたちは、IOワーカーから受け取ったクライアントからのデータをこのパイプラインのイベント側のパイプの入り口に流し込んでやれば、いい感じに変換されたデータがパイプラインの外に出てきて、ハンドラに渡されます。ハンドラに渡された段階ではもうアプリケーションで使いやすいデータの形になっているので、あとはどうぞ、よしなに処理してください。あるいは、クライアントになんかデータを送りたいなーと思ったら、コマンド側のパイプに対してデータを流し込んでやります。そうすれば、バイト列となったデータがパイプラインから出てきてハンドラに渡されます。ハンドラではもうバイト列になっているデータを、IOワーカーに対して書き込んであげればいいでしょう。
では、あとは実際のコードを見てみましょう。
実際のコードの説明
このコードは、2行毎にエコーバックする特殊なエコーサーバーです。
ちょっと長いですね。まずはパイプラインの定義から見てみましょうか。PipelineStage.scala
がそれです。
二つクラスがありますが、ByteStringStage
というクラスは、細切れだったり大きすぎたりする ByteString
を受け取って、二行毎に分けて外に出すイベント側のパイプ(eventPipeline
というメソッドがそれです)と、二行分のByteString
を受け取って最後に改行を付与されたByteString
に変換するコマンド側のパイプ(commandPipeline
というメソッドです)を持ったクラスです。
イベント側で変換したデータをパイプの外に渡すためには、eventPipeline
でIterable[Left[T]]
を返します。なぜIterable
に包まれているかわかりますか? それは、「一度のイベントで入ってきたデータがひとつに変換されるとは限らない」からです。今回はgots.map(Left(_))
で Iterable[Left[ByteString]]
を返していますね。「ひとつのイベントに対してはひとつしか返さないんだけど」みたいな場合には、このクラスに渡されてきている「PipelineContext
」という型のオブジェクトを使ってctx.singleEvent(byteString)
みたいなふうにすることも可能です。言葉を変えれば、ctx.singleEvent(value: T)
は 中身がひとつのIterable[Left[T]]
を返す、ということです。
コマンド側も同じように、commandPipeline
でIterable[Right[T]]
を返すように定義してあげればOKです。
もうひとつのクラス、TwoLinesStage
というのは、このByteStringPipeline
の上に乗っけることを意図したクラスです。二行分のByteString
をTwoLine
クラスのオブジェクトに変換するイベント側のパイプラインと、その逆を行うコマンド側のパイプラインを持っています。内容は簡単なので解説は要らないでしょう。
さて、これで、ByteString
を TwoLines
に変換したり、その逆を行うためのpipeLineができました。ではこれを使ってみましょう。使っている部分はClient.scala
です。
めちゃめちゃ丁寧にコメント書き込んであるので、ちょっとわかりにくいところだけ捕捉します。
39行目と44行目、Success(twoLines: TwoLines)
や Success(bytes: ByteString)
でパイプラインから出てきたデータを受け取っていますね。実は、パイプラインから出てきたデータは、Try[T]
に包まれています。これは、パイプライン内でなんらかの例外が起こるった場合にその例外をこのハンドラで捕捉できるためでしょう。今回はサンプルなので例外を無視していますが、適切にハンドラとパターンマッチを定義してあげることで、パイプライン処理中に起こった例外をここでハンドルしてあげることができます。
TcpPipelineHandler を使おう
さて、しかしこのPIpeline、一番下の部分の出口は IO ワーカーにまるっと処理を委譲しているだけだし、一番上の出口も自分でハンドリングしているだけですね。なんかこういうの定型っぽいです。Akkaにはこういう定型っぽい部分を自分でハンドリングしなくてもいいような仕組みが備わっています。それが「TcpPipelineHandler」です。
もう燃え尽きたので、TcpPipelineHandlerを使ったコードの例を貼るのでそれ見てください。コメント丁寧に書いたので多分このサンプルみればわかると思います。
TcpReadWtiteAdapter という Akka が用意してくれている PipelineStage を一番下に組み込んでいるのがポイントです。ちなみに、この下に SslTlsSupport という PipelineStage や BackPressureBuffer というPIpelineStageを差し込むことで、SSLやバックプレッシャーを透過的に扱うことができるようです。すご〜い。