ちょっとこのあたりが複雑に感じたので、一度整理しておく。まぁ全部「公式読めば書いてあるよ」で OK なんだけど、それではあまりにマッチョすぎるしわたしがあとから参照したいしみたいな感じ。
IOManager とはなにか
Akka でIOを扱うときには、 IOManager
を通じて行うと良い。実際の IO の煩わしいことを隠蔽してくれる。IOManager
は IOManagerActor
というアクターへのリファレンスであり、実際にActしてくれるのは IOManagerActor
のようだ。IOManager
は手動で止めたりする必要はなく、管理する IO が無くなれば勝手に止まってくれるらしい。賢い。
IOManager を通じてソケットを作るには、IOManager#connect
や IOManager#listen
を呼び出す。それぞれ SocketHandle
や ServerHandle
を返す。また、このとき同時に「作られた socket に対する IO の結果をどのアクターにメッセージを送るか」を指定してやることができる。例としてはこんな感じ
val serverActor = context.sysetm.actorOf[Props[ServerActor]] val serversocket:ServerHanlde = IOManager(context.system).listen("localhost", port)(serverActor)
こんな感じでサーバーソケットを作ると、serverActor
に対して各種イベント(新しいクライアントが接続してきたよ、とか)がメッセージで送られてくるようになる。
こんな感じ。
class ServerActor extends Actor { def receive = { // 新しくクライアントが接続してきたらこのメッセージが受信される case IO.NewClient(serverSocket: ServerHandle) => // ここでserverSocketに対してなんかする。典型的な処理はacceptだと思う } }
かんたんにまとめると、IOManager
を通じてソケットを作ると、そのソケットに対する直接の操作は IOManagerActor
がやってくれて、わたしたちは自分で作ったアクターでその結果を受信して種々の処理を行えばよい、ということになる。IOManagerActor
からのメッセージには ServerHandle
や SocketHandle
、あるいは「ソケットからデータが読み込まれたよ」というイベントの場合はそのデータの内容などが入っているので、それに対して accept
やら write
やらの処理を行えば良い。
詳しくは 公式のリファレンス を見ると良いと思う。
では ServerHanlde/SocketHandle とはなにか
普通のネットワークプログラミングに慣れていると、この ServerHandle/SocketHandle
ってのはソケットそのものか、あるいはそれを wrap したものなのかな、と思ってしまうのだけれど、実はこれはそうではなくて、ほんどうにただの「ハンドル」である。たとえば、SocketHandle
を通じてソケットに何かを書き込む例を見てみよう。
val byteString = ByteString("some string") socketHandle.asWritable.write(byteString)
これでソケットに対して byteString
の内容が書かれる。では socketHandle.asWritable.write()
したときになにが起こっているのだろうか。じつは、このメソッドがやっていることは「ソケットにbyteStringを書き込む」という操作ではなくて、「IOManager
に対して、この socketHandle
に紐づけられたチャンネルにbyteStringを書き込んでくれと頼む」ことである。つまり、socket への実際の書き込みは IOManager
により行われることになるわけだ。
なんでこんなことになってるのかというと、おそらく、これでアプリケーションの開発者はソケットの管理(ソケットが閉じてるとか開いてるとか)を IOManager に任せることができるといのが一点。それと、ネットワーク越しにアクター同士で協調作業を行うときに、socket そのものはネットワーク越しに渡せないので、ネットワーク越しに「こいつにこれ書き込んでよ」というメッセージを送り、実際の IO は実際に socket を持っているマシンが行うということを実現するためにこのようになっている、というのが一点、なのではないかと思う(まだここはちゃんと調べてないので真偽不明、識者によりツッコミを期待)。
まとめると、ServerHandle や SocketHandle はソケットそのものではなく、そのソケットを管理しているアクターに対して「これ書き込んで」とか「acceptして」とかそういうメッセージを送ってくれるやつである。
同じく詳細は 公式のリファレンス を参照のこと。
じゃあ IO.Iteratee ってなんなの
IO.Iteratee
が出てきていないけれど、それもそのはず、じつは IO.Iteratee
は IOManager
たちと一緒に使うと便利ではあるけれど、直接はなんの関係もない。IO.Itereatee
というのは、「IOの結果に対してどういう操作をするかを先に定義しておいて、IO.Iteratee
に対して実際にデータが流し込まれるまでその操作を遅延する仕組み」である。
REPLで実験してみよう。
scala> import akka.actor.IO import akka.actor.IO scala > import akka.util.ByteString scala> val take4 = IO take(4) res0: akka.actor.IO.Iteratee[akka.util.ByteString] = Next(<function1>) scala>
まずは import しておく。(とうぜんakkaがインストールされていることが前提ですよ)そして、そのあとに IO take(4) で簡単な Iteratee を作っている。この操作は「IOから4バイト読み込む」ではなく、「IOから4バイト読み込むという操作を定義したIO.Iterateeを作る」という操作であることに注意してほしい。実際にこの段階では読み込みはなされていない(実際、IO待ちでブロックせずに、すぐにプロンプトが返ってきている)。
では今度は、「この4バイト読み込むという操作のあと、それを文字列にしてプリントする」という操作を定義してみよう。
まずは byteString を受け取り、その内容を String にして println する関数を定義する。
val printBytes = {bytes:ByteString => println(bytes.decodeString("US-ASCII"))} printBytes: akka.util.ByteString => Unit = <function1>
そしたら、take4 に対する map 操作を行う。IO.Iteratee#map(f: A => B)
は、「"レシーバの IO.Iteratee
の結果得られる値に対して f を呼び出す"という操作を定義したIO.Iteratee
」を返す。ちょうどリストに対する map 操作が、「リストに格納されているに対して f を呼び出した結果をリストに格納して返す」のと同じような感じだ。
scala> val take4AndPrintThat = take4.map(printBytes) take4AndPrintThat: akka.actor.IO.Iteratee[Unit] = Next(<function1>)
おなじことを下のように表現することもできる。
scala> val take4andPrintThat = for { | bytes <- IO.take(4) | string = bytes.decodeString("US-ASCII") | } yield println(string) take4andPrintThat: akka.actor.IO.Iteratee[Unit] = Next(<function1>)
Scala の for はちょっと特殊なので慣れないと読みにくいかもしれないが、やってることは上でやった map と一緒である。
さて、これで、「4バイト読み込んで、その結果をprintする」という操作が定義されたIO.Iteratee
が作り出せた。
では、このIO.Iterateeに対して、実際にデータを書き込んでいってみよう。書き込みは apply
でできる。
scala> take4andPrintThat(IO.Chunk(ByteString("abcdefg"))) abcd res0: (akka.actor.IO.Iteratee[Unit], akka.actor.IO.Input) = (Done(()),Chunk(ByteString(101, 102, 103)))
"abcdefg"
を書き込んだ結果、4バイトが取り出されて表示された。IO.Iteratee
は immutable なオブジェクトであるため、「残った3バイトを内部に保持する」みたいなことはしない。これだとちょっと使い勝手がわるい。ので、残ったバイトの管理などをやってくれるものでこの IO.Iteratee
を一枚 wrap する。それが IO.IterateeRef
だ。
IO.IterateeRef でもう何も煩わしいことはなくなる
ここでは IO.IterateeRefAsync
というのを使うけれど、それの為には実行コンテキストが必要になるので、まずは下準備として、今回は import scala.concurrent.ExecutionContext.Implicits.global
で暗黙の実行コンテキストを作っておく。Akkaと一緒に使う場合には明示的にakkaのsystem.contextを使うべきだろう。
scala> import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
さて、これで準備が整ったので、まずはIterateeRefを取得する。
scala> val irefAsync = IO.IterateeRef.async(take4andPrintThat) irefAsync: akka.actor.IO.IterateeRefAsync[Unit] = akka.actor.IO$IterateeRefAsync@f57f347
明示的に実行コンテキストを指定したいときには IO.IterateeRef.async(take4andPrintThat)(context)
とすれば良い。
ではこのirefAsyncに対してデータを書き込んで行ってみよう。
scala> irefAsync(IO.Chunk(ByteString("abcde"))) scala> abcd
take4andPrint
で定義された操作が非同期で実行されたため、先にプロンプトが表示されたのが見て取れるかと思う。さて、今、take4の残り1バイト(e)はどこへ行っただろうか。irefAsyncの中に残っているはずである。試してみよう
scala> irefAsync(IO.Chunk(ByteString("fgh"))) scala> irefAsync(IO.Chunk(ByteString("fghg"))) scala> irefAsync(IO.Chunk(ByteString("fgha")))
!?!?何も表示されない!? それもそのはず、irefAsync
は内部に take4andPrintThat
を持っているが、この操作はすでに行われてしまった。「次にどうするか」が定義されていないので、そのあとにどんどんデータが書き込まれても、なにも起こらない。これでは使い物にならない。
このような事態を避けるために、今度は、「永遠に同じ操作を繰り返すという操作を定義したIO.Iteratee」を作り出してみよう。
scala> val loopIteratee = IO.repeat(take4andPrintThat) loopIteratee: akka.actor.IO.Iteratee[Unit] = Next(<function1>)
そしてこれを参照した IO.IterateeRef
を作る。
scala> val loopIrefAsync = IO.IterateeRef.async(loopIteratee) loopIrefAsync: akka.actor.IO.IterateeRefAsync[Unit] = akka.actor.IO$IterateeRefAsync@4f758781
ではこれに対して書き込みを行ってみよう。
scala> loopIrefAsync(IO.Chunk(ByteString("abcdef"))) scala> abcd scala> loopIrefAsync(IO.Chunk(ByteString("ghij"))) scala> efgh scala> loopIrefAsync(IO.Chunk(ByteString("k"))) scala> loopIrefAsync(IO.Chunk(ByteString("l"))) scala> ijkl
おおーーーー便利!
というわけで、まとめると
IO.Iteratee
は「IOに対してどういう操作を行うか」が定義されたものIO.Iteratee
に対してデータを書き込むと、あらかじめ定義された通りにそのデータに対して操作を行うIO.Iteratee
は immutable な object であるため、「のこったデータ」とか「足りなかったデータ」を内部に保持してくれたりはしない。- それだと「細かいチャンクをどんどん書き込んでいくぜ」みたいなときにめっちょ不便なので、普通は
IO.IterateeRef
に包んで使う IO.IterateeRef
は書き込まれたデータを保持してくれて、そのデータに対して順次内部のIO.Iteratee
を適用していくIO.IterateeRef
は「一度使ったIO.Iteratee
を何度も再利用」みたいなことはしないので、そういうことがしたければ、「この操作を何度も繰り返すぜ」とあらかじめ定義しておいたIO.Iteratee
を用意してあげる必要がある
そんなところでしょうか。
実際の例
手前味噌だが、猫型チャットサーバー や前回前々回のエントリーがこれらの技術を組み合わせたミニマルな例となっているので、実例としてはそれを見ると良いと思う。
追記
@neko_gata_s doc.akka.io/docs/akka/2.2-… akka2.2のドキュメントでは既にOld IO扱いになっていて、akka.ioパッケージが別に展開されています。また、akka.actor.IOがdeprecatedになるようです…。
— mather ɹǝɥʇɐɯさん (@mather314) 2013年6月4日
@mather314 さんがこのようなことを教えてくれました。ありがとうございます。
泣いてないよ!!!!! もうすぐこれdeprecatedになるそうなので、新しく学ぶひとは下のやつ見たほうがいいかも!!!!!! 泣いてない!!!!!!