読者です 読者をやめる 読者になる 読者になる

Akka(2.2) のネットワークサーバーを Graceful に shutdown

こんな感じでどうでしょうね。

解説はコメントを参照のこと。

import akka.actor._
import akka.actor.Terminated
import akka.io._
import akka.io.IO
import akka.util.ByteString
import java.net.InetSocketAddress
import scala.collection.immutable.{HashSet}
import sun.misc.{SignalHandler, Signal}

case object GracefulShutdown

object EchoServer {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem.create
    val server = system.actorOf(Props[EchoServerHandler])

    // SIGTERMを受け取ったらGraceful shutdown
    Signal.handle(new Signal("TERM"), new SignalHandler {
      def handle(sig: Signal) = server ! GracefulShutdown
    })
  }
}

class EchoServerHandler extends Actor with ActorLogging{
  import scala.language.postfixOps

  IO(Tcp)(context.system) ! Tcp.Bind(self, new InetSocketAddress("localhost", 9876))

  // 処理中のクライアントを入れておくhashSet
  var clients = HashSet.empty[ActorRef]

  def receive = {
    case Tcp.Bound(localAddress) =>
      context.become(bound(sender))
  }

  def bound(serverConnection: ActorRef): Receive = {
    // クライアントからの接続があった
    case _: Tcp.Connected =>

      // clientHandlerに管理任せる
      val clientConnection = sender
      val clientHandler = context.actorOf(Props(new ClientHandler(clientConnection)))

      // clientHandlerを処理中リストに保持しておく
      clients += clientHandler
      // clientHandlerの終了を watch しておく
      context watch clientHandler


    // client の処理が終わった
    case Terminated(clientHandler) =>
      // 処理中リストから抜く
      clients -= clientHandler

    // GracefulShutdownメッセージを受け取った
    case GracefulShutdown =>
      // bindされてるsocketをunbindして、
      // 新しいクライアントからの接続を止める
      serverConnection ! Tcp.Unbind

    // 無事に Unbind された
    case Tcp.Unbound =>
      // 処理中のクライアントがなければ即時systemを落とす
      // あればshutdownフェーズに入る
      if (clients.isEmpty) {
        import scala.language.postfixOps
        context.system.shutdown
      } else {
        context.become(shutdown)
      }
  }

  // shutdownフェーズ
  def shutdown: Receive = {
    // clientの処理が終わったら
    // 処理中リストから抜いて行って
    // 全部終わったらsystemを落とす
    case Terminated(clientHandler) =>
      clients -= clientHandler
      if (clients.isEmpty) {
        context.system.shutdown
      }
  }
}

class ClientHandler(connection: ActorRef) extends Actor with ActorLogging {

  val init = TcpPipelineHandler.withLogger(log,
    new StringByteStringAdapter("utf-8") >>
      new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), includeDelimiter = false) >>
      new TcpReadWriteAdapter)

  val pipelineHandler = context.actorOf(TcpPipelineHandler.props(init, connection, self))
  connection ! Tcp.Register(pipelineHandler)
  context watch pipelineHandler

  def receive = {
    // データを受信したらエコーバックしてコネクション閉じる
    case init.Event(data) =>
      sender ! init.Command(data+"\r\n")
      sender ! TcpPipelineHandler.Management(Tcp.Close)

    // TCPコネクションが切れたら自分も止まる
    case Terminated(`pipelineHandler`) =>
      context stop self
  }
}