読者です 読者をやめる 読者になる 読者になる

日本一わかりやすいというほどでもない Akka2.2 の IO まわりと Pipeline まわり

programming Scala

!!!!!!この記事は古い情報を扱っています!!!!!!

Akka 2.3 からは Pipeline はなくなっています。この記事の内容は古いです。気をつけてください

前回までのあらすじ

自信満々で日本一わかりやすい Akka の IOManager と SocketHandle(やServerHandle)、それに IO.Iteratee と IO.IterateeRef というブログ記事を書いたしんぺい a.k.a. 猫型蓄音機。しかし彼を待ち受けていたのは、「それ 2.2 から deprecated になるよ」という無慈悲な現実であった。Akka 2.2 から使われることになる akka.io と Pipeline という新しい敵が待ち受ける中、しんぺいは日本一わかりやすい記事を書けるのだろうか?

という感じでした。それでは書いていきます。Akkaのバージョンは2.2.0-RC1です。

IOManager の役割をするひとが IO(Tcp) とか IO(Udp) とかになった

これはけっこう簡単。前回の記事 を見ればすぐにわかると思います。大きな変更点として、今まではメソッド呼び出しで connect や listen をおこなっていたけれど、2.2 からは IO(Tcp) に対して Connect メッセージや Bind メッセージを送る形になったというのと、socketHandleなどを経由せずに直接IOアクターにコマンドを送る方式になりました。これにより、実際の IO は裏に隠れた IO アクターがやってくれているんだなということが一目でわかるようになり、非常にいい感じですね。

IO.Iteratee や IO.IterateeRef が担っていた役割は Pipeline に取って代わられた

「入力されたやつをごにょごにょしてアプリで使える形にする」のに便利に使える仕組みが IO.Iteratee や IO.IterateeRef でありましたが、こんどはそれは Pipeline に取って代わられました。この記事では Pipeline について重点的に説明します。

Pipeline はどんな問題を解決するのか

さて、ネットワークプログラミングでめんどくさいことのうちのひとつに、「パケットはそのままアプリで使いたいわけではない」というのがあります。クライアントから送られてきたパケットはたんなるバイト列です。そのバイト列を、論理的な単位で分け(行指向の通信プロトコルならば CRLF で分けることになるでしょう)、さらにその内容をパースしてアプリケーションロジックの中で使いやすい、抽象度の高いオブジェクトの形にしてあげる必要が出てきたりするでしょう。

あるいは、アプリケーションからクライアントに何かメッセージを送りたい、というとき、アプリケーションでは抽象度の高いオブジェクトで表現していたものを、ネットワークに流すためにバイト列にシリアライズする必要がありますね。

こういう部分を担う役割をするのが、Pipeline です。Pipeline に関連する重要な登場人物は、「パイプライン」と「パイプラインステージ」です。

Pipeline の概要

図で説明しましょう。

パイプラインとパイプラインステージ

緑のパイプみたいなやつが、「パイプライン」です。そしてこのパイプラインを2本合わせたものが、パイプラインステージです。黄色い四角で囲まれてる部分ですね。

左のパイプが「イベント」が通ってくるパイプラインです。イベントとは何でしょうか。それは、「アプリケーションの外からシステムに入ってきたデータ」のことです。今回で言えば、クライアントから送られてきた生のデータが「イベント」として扱われ、左のパイプに入ってきます。パイプラインの中では、このイベントとして送られてきたデータに対して、なんらかの変換処理を行います。変換処理を行われたデータは、そのままパイプの外に出て行きます。

ちなみに、この変換部分ではデータをバッファすることもできるので、「イベントで送られてきたデータが『きりのいいところ』(たとえば改行コードとか)まで溜まるまで待って、溜まったデータをひとつのものに変換して外に出す」みたいなことができます。これは言い換えると、「イベントとしてひとつのデータが入ってきたからといって、ひとつの変換されたデータが外に出て行くとは限らない」ということです。100個イベントとして入ってきたデータをひとつにつなぎ合わせてから外に出す、みたいなことも可能、あるいは、ひとつのイベントで入ってきたデータをたくさんのデータに変換して外に出すということも可能、ということです。

右のパイプは「コマンド」が通って行くパイプです。コマンドとは何でしょうか。コマンドというのは、「アプリケーションの中から外に出したいデータ」です。イベントと同じく、アプリケーションがこのパイプラインに対して「コマンド」としてデータを書き込むと、パイプラインの中で変換処理が走り、変換されたデータがパイプの外に出て行きます。

このとき注意してほしいのが、この変換処理はイベントとコマンドで「逆のこと」をする必要があるということです。例えば、イベント側のパイプが StringSeq[String] に変換する処理を行っているならば、コマンド側のパイプはSeq[String]String に変換する処理を行う必要があります。

ちなみに、この「パイプラインステージ」は、複数をがっちゃんこすることが可能です。

たとえば、「イベントが通るパイプが ByteString --> Seq[String] の変換を行い、コマンドが通るパイプがその逆を行う」パイプラインステージの上に、「イベントが通るパイプが Seq[String] --> SomeObject の変換を行い、コマンドが通るパイプが その逆の変換を行う」パイプラインステージをつなげることができます。

この場合、アプリケーションの外から入ってきた ByteString はまずSeq[String] に変換され、その結果が次のパイプラインステージに渡され、Seq[String] から SomeObject に変換されます。アプリケーションが外に出そうとする「コマンド」についても、逆のことが行われるわけですね。

さて、こうしてパイプラインの中で何が起こっているのかを見てきましたが、それでは最終的に変換が終わってパイプラインから出ていくデータは、どこに届くのでしょうか? あるいは、そもそもパイプラインにデータを流し込むにはどうすればいいのでしょうか? パイプラインステージには、そのためのインターフェイスが用意されています。つまり、出来上がったパイプラインステージにデータを流し込むためのメソッドと、パイプラインから出てくるデータをハンドルするためのハンドラを設定する方法が用意されています。具体的な例はあとで見てみましょう。

では、このパイプライン、実際にはどんなふうに使えばいいのでしょうか?その例を見てみましょう。例えば、今ここに ByteStringSomeObject に変換/ SomeObjectByteStringに変換するパイプを持ったパイプラインステージがあるとします。わたしたちは、IOワーカーから受け取ったクライアントからのデータをこのパイプラインのイベント側のパイプの入り口に流し込んでやれば、いい感じに変換されたデータがパイプラインの外に出てきて、ハンドラに渡されます。ハンドラに渡された段階ではもうアプリケーションで使いやすいデータの形になっているので、あとはどうぞ、よしなに処理してください。あるいは、クライアントになんかデータを送りたいなーと思ったら、コマンド側のパイプに対してデータを流し込んでやります。そうすれば、バイト列となったデータがパイプラインから出てきてハンドラに渡されます。ハンドラではもうバイト列になっているデータを、IOワーカーに対して書き込んであげればいいでしょう。

では、あとは実際のコードを見てみましょう。

実際のコードの説明

このコードは、2行毎にエコーバックする特殊なエコーサーバーです。

ちょっと長いですね。まずはパイプラインの定義から見てみましょうか。PipelineStage.scala がそれです。

二つクラスがありますが、ByteStringStage というクラスは、細切れだったり大きすぎたりする ByteString を受け取って、二行毎に分けて外に出すイベント側のパイプ(eventPipelineというメソッドがそれです)と、二行分のByteStringを受け取って最後に改行を付与されたByteStringに変換するコマンド側のパイプ(commandPipelineというメソッドです)を持ったクラスです。

イベント側で変換したデータをパイプの外に渡すためには、eventPipelineIterable[Left[T]] を返します。なぜIterableに包まれているかわかりますか? それは、「一度のイベントで入ってきたデータがひとつに変換されるとは限らない」からです。今回はgots.map(Left(_))Iterable[Left[ByteString]] を返していますね。「ひとつのイベントに対してはひとつしか返さないんだけど」みたいな場合には、このクラスに渡されてきている「PipelineContext」という型のオブジェクトを使ってctx.singleEvent(byteString)みたいなふうにすることも可能です。言葉を変えれば、ctx.singleEvent(value: T) は 中身がひとつのIterable[Left[T]]を返す、ということです。

コマンド側も同じように、commandPipelineIterable[Right[T]]を返すように定義してあげればOKです。

もうひとつのクラス、TwoLinesStage というのは、このByteStringPipelineの上に乗っけることを意図したクラスです。二行分のByteStringTwoLineクラスのオブジェクトに変換するイベント側のパイプラインと、その逆を行うコマンド側のパイプラインを持っています。内容は簡単なので解説は要らないでしょう。

さて、これで、ByteStringTwoLines に変換したり、その逆を行うためのpipeLineができました。ではこれを使ってみましょう。使っている部分はClient.scalaです。

めちゃめちゃ丁寧にコメント書き込んであるので、ちょっとわかりにくいところだけ捕捉します。

39行目と44行目、Success(twoLines: TwoLines)Success(bytes: ByteString)パイプラインから出てきたデータを受け取っていますね。実は、パイプラインから出てきたデータは、Try[T]に包まれています。これは、パイプライン内でなんらかの例外が起こるった場合にその例外をこのハンドラで捕捉できるためでしょう。今回はサンプルなので例外を無視していますが、適切にハンドラとパターンマッチを定義してあげることで、パイプライン処理中に起こった例外をここでハンドルしてあげることができます。

TcpPipelineHandler を使おう

さて、しかしこのPIpeline、一番下の部分の出口は IO ワーカーにまるっと処理を委譲しているだけだし、一番上の出口も自分でハンドリングしているだけですね。なんかこういうの定型っぽいです。Akkaにはこういう定型っぽい部分を自分でハンドリングしなくてもいいような仕組みが備わっています。それが「TcpPipelineHandler」です。

もう燃え尽きたので、TcpPipelineHandlerを使ったコードの例を貼るのでそれ見てください。コメント丁寧に書いたので多分このサンプルみればわかると思います。

TcpReadWtiteAdapter という Akka が用意してくれている PipelineStage を一番下に組み込んでいるのがポイントです。ちなみに、この下に SslTlsSupport という PipelineStage や BackPressureBuffer というPIpelineStageを差し込むことで、SSLやバックプレッシャーを透過的に扱うことができるようです。すご〜い。