TcpPipelineHandlerで管理してるコネクションを切るの術

解決したい問題

Akka でネットワークサーバーを書く場合、エンコード/デコードの部分は Pipeline を利用し、TCPレイヤーの IO をその Pipeline に流し込む場合は TcpPipelineHandlerを利用するのが定石だと思いますが、ではサーバーがレスポンスを返したあとにすぐにコネクションを切ってしまいたいみたいな場合はどうすればいいでしょうか?

簡単なエコーサーバーを例に見ます。

class ServerHandler(port: Int) extends Actor with ActorLogging{
  IO(Tcp)(context.system) ! Tcp.Bind(self, new InetSocketAddress("localhost", port))

  def receive: Receive = {
    case Tcp.Bound(localAddress) =>
      context.become(bound(sender))

    case Tcp.CommandFailed(_: Tcp.Bind) =>
      log.error("bound failed...")
  }

  def bound(serverConnection: ActorRef): Receive = {
    case _: Tcp.Connected =>
      val clientConnection = sender
      context.actorOf(Props(new ClientHandler(clientConnection)))
  }
}

class ClientHandler(connection: ActorRef) extends Actor with ActorLogging{

  val init = TcpPipelineHandler.withLogger(log,
    new StringByteStringAdapter("utf-8") >>
      new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), includeDelimiter = false) >>
      new TcpReadWriteAdapter)

  val pipelineHandler = context.actorOf(TcpPipelineHandler.props(
    init, connection, self).withDeploy(Deploy.local))
  context watch pipelineHandler

  connection ! Tcp.Register(pipelineHandler)

  def receive: Receive = {
    case init.Event(string) =>
      pipelineHandler ! init.Command(string) // echo back ...(1)
      connection ! Tcp.Close // ...(2)
  }
}

どこがダメでしょうか。最後の

      pipelineHandler ! init.Command(string) // echo back ...(1)
      connection ! Tcp.Close // ...(2)

の部分です。このようにメッセージを送信した場合、(1)で送信したメッセージが pipelineHandler に処理される前に (2) が connection に処理される可能性があるため、echo back の前にコネクションが切られてしまう可能性があります。

ではこの問題はどうすれば解決するでしょうか?

解決策

      pipelineHandler ! init.Command(string)
      pipelineHandler ! TCPコネクション切ってねというメッセージ

同じアクターが同じアクターに送ったメッセージの処理順は保証されているので、上記のようにすれば問題ありません。

このあたりの順序の話の詳細については公式のドキュメントを参照すると良いでしょう。

では、どうやって pipelineHandler に対して「コネクションをhogehogeしてくれー!」って頼むのでしょうか。

そのためには、 TcpPipelineHandler.Management に Tcp.Command を wrap したものをメッセージとして送信してあげる必要があります。

      pipelineHandler ! init.Command(string) // echo back
      pipelineHandler ! TcpPipelineHandler.Management(Tcp.Close)

APIリファレンス読むと書いてあるんだけどドキュメントのほうには書いてなかったので調べるのに時間かかってしまった。

まとめ

pipelineHandler 経由で TCP の IO を行っている場合、直接 connection に Tcp.Close メッセージなどを送ると順序がおかしくなってしまう可能性があるので、 pipelineHandler に 対して TcpPipelineHandler.Management(Tcp.Close) を送ることで順序を保証しつつコネクションを切るのが良いでしょう。

なお、このコードは clientHandler を止める処理が書かれていないのでもりもりメモリが漏れます。TcpPipeilineHandler はコネクションが切れると Stop するようになっているので、clientHandler では pipelineHandler を watch しておいて、Terminated(pipelineHandler) が送られてきたら自分も Stop するようにしておくとなお良いのではないでしょうか。ただ今回のエントリの内容と関係ないので、その部分のコードは割愛してあります。