Lambdaカクテル

ウェブアプリケーションエンジニアです.玉石混淆です.

私立プログラミングキャンプに行ってScala+PlayでWebSocket通信してみた

どうも。Windymeltです。 僕は今はてなサマーインターンで京都に滞在しており、id:masawada私立プログラミングキャンプのお誘いをしてくれて僕も参加してきたのでそのレポートです。

私立プログラミングキャンプはてなオフィスで行われたため迷わず会場に来ることができました。会場では大量のレッドブルサイボウズウォーターが配給?されてメンバーの戦意を高揚させていました。

このキャンプ、ひとたび自己紹介が終わればいきなり作業に突入するタイプのイベントのようです。事前に何もする目標を決めていなかったので恐怖以外のなにものでもありませんでしたが、なんと同じScalaユーザであるUAdachiさんに遭遇したためScalaプログラミングをすることにしました。わいわい。

今回の技術的目標はScalaによるWebSocketを利用したXFD(eXtreme Frrdback Device)の作成でした。自前の開発環境が諸事情により使えないためTypesafe Activatorを利用して開発しました。UAdachiさんはサーバサイドに強い方ということなのでJenkinsサーバの建築などを行なっていただきました。

XFDを知らない方の為に説明すると、これはテストの結果などをちょっと楽しく通知しようというエクストリームプログラミング的な楽しみの一つです。電子工作やギミックなどを活用し、大々的にテストの結果をチームに共有することで修正の機運を高めようという試みです。もちろん、ギミックを見てひとときの癒しを得るという非常に良い効能があります。興味を持たれた方はArduino で Jenkins の XFD を作る - haradakiro's blogなどを参照なさると良いと思います。

今回作成したXFDは電子工作を使用しない、ブラウザに色でビルド状況を通知するシンプルなものでしたが、WebSocketやPlay Frameworkなどのイカした機能を用いて実装しています。もうCometなんか扱わなくても良い喜びがあります。

会場を貸してくださった株式会社はてなさん、レッドブルサイボウズウォーターの配給を下さったサイボウズさん、ありがとうございました。

実装解説

開発環境はActivator 1.2.10 Akka 2.3.4 Play 2.3.3 Scala 2.11.1です。

まずWebSocketを扱うURLとWebSocketを扱うjavascriptを送り出すURL、そしてJenkinsがWebhookとして叩くURLが必要になるため、conf/routesにルーティングの記述を行います。

GET     /xfd/status                 controllers.Application.xfd
GET     /xfd/ws                     controllers.Application.xfdWs
GET    /xfd/jenkinsnotifyget          controllers.Application.jenkinsNotify(status, job_name, build_number)

Jenkinsが叩くURLは本来はPOSTであるべきでしたが、うまく実装できずに時間が不足してきたためGETを用いて実装しています。

続いて、リクエストを処理するためのControllerをapp/controllers/Application.scalaに実装します。まずは全コードを載せ、続いて細かい説明を行います。

package controllers

import play.api._
import play.api.mvc._
import play.api.data.Form
import play.api.data.Forms._
import play.api.libs.json.Json
import play.api.libs.iteratee._
import play.api.Play.current
import scala.concurrent.duration.DurationInt
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Future
import play.api.libs.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import models._

object Application extends Controller {
    
  val jenkinsReceiver = Akka.system.actorOf(Props[JenkinsReceiver])

  def xfd = Action {
      Ok(views.html.iphone())
  }
  
  def xfdWs = WebSocket.async { request =>
      implicit val timeout: akka.util.Timeout = 1 minutes
      val connecting = jenkinsReceiver ? Connect("joining")
      connecting.mapTo[(Iteratee[String, _], Enumerator[String])]
  }
  
  def jenkinsNotify(status: String, job_name: String, build_number: String) = Action { implicit request =>
      val query = Map("status" -> status, "job_name" -> job_name, "build_number" -> build_number)
      // throw jenkinsnotify
      jenkinsReceiver ! JenkinsNotify(query)
      Ok("ok")
  }
}

case class Connect(msg:String)
case class JenkinsNotify(msg: Map[String, String])
case class Broadcast(msg: String)

class JenkinsReceiver extends Actor {
    val (enumerator, channel) = Concurrent.broadcast[String]
    def receive = {
        case Connect(_) =>
            val iteratee = Iteratee.foreach[String] { message =>
                self ! Broadcast("user said " + message)
            }.map { _ =>
                self ! Broadcast("user left")
            }
            sender ! (iteratee, enumerator)
        case JenkinsNotify(msg) =>
            msg match {
                case query: Map[String, String] => 
                // write parsing and broadcast code here
                println(query)
                self ! Broadcast("{ \"status\":\"" + query("status") + "\", \"name\":\"" + query("job_name") + "\", \"no\":\"" + query("build_number") + "\" }")
                case default =>
                    println("unrecognized message: " + default)
            }
        case Broadcast(msg) =>
            channel.push(msg);
    }
}

コードの中盤でViewを呼び出している部分があります。これはroute/xfd/statusGETが投げられた時に呼ばれるアクションとして定義されています。

  def xfd = Action {
      Ok(views.html.iphone())
  }

Playではリクエストに対するアクションをActionというDSL的な囲いの中で定義します。アクションの内容は、200 OKviews.html.iphone()というViewを呼び出すものです。views.html.iphoneに対応するViewは、app/views/iphone.scala.htmlに、以下のように定義されています。

@()

@main("xfd") {
    main?
    <section id="message"></section>
    <script src="/assets/javascripts/ws.js"></script>
}

簡単に説明すると、最上行は引数がないことを示しており、途中の行ではmainテンプレートを"xfd"という引数で呼び出しています。mainはhtmlタグやheadタグなどの定義を共通して行う為に定義してあるものです。次にメッセージ表示のためのsectionタグが置かれ、次に/assets/javascripts/ws.jsというスクリプトが呼ばれています。このスクリプトはクライアントサイドで動作するもので、後程説明します。

app/controllers/Application.scalaに戻りましょう。

case class Connect(msg:String)
case class JenkinsNotify(msg: Map[String, String])
case class Broadcast(msg: String)

これらのケースクラスは、Actorに処理を投げるときにデータのカプセルとして使います。Akkaでのメッセージパッシングは通例ケースクラスを用いて行います。ConnectはWebSocketの接続時に、JankinsNotifyはJenkinsから飛んできたWebhookを渡すときに中間のデータホルダとして、Broadcastは接続している全てのWebSocketにメッセージを投げるときに使います。

JenkinsからのWebhookの受付部分を見てみましょう。

  def jenkinsNotify(status: String, job_name: String, build_number: String) = Action { implicit request =>
      val query = Map("status" -> status, "job_name" -> job_name, "build_number" -> build_number)
      // throw jenkinsnotify
      jenkinsReceiver ! JenkinsNotify(query)
      Ok("ok")
  }
}

受け取ったクエリパラメータをJenkinsNotifyクラスにカプセル化してJenkinsReceiverに渡しています。

WebSocketの受付部分を見てみましょう。

  val jenkinsReceiver = Akka.system.actorOf(Props[JenkinsReceiver])

  def xfd = Action {
      Ok(views.html.iphone())
  }
  
  def xfdWs = WebSocket.async { request =>
      implicit val timeout: akka.util.Timeout = 1 minutes
      val connecting = jenkinsReceiver ? Connect("joining")
      connecting.mapTo[(Iteratee[String, _], Enumerator[String])]
  }

xfdメソッドはもう見ましたが、その前後を見ていきます。まずJenkinsReceiverクラスで定義されたアクターを作成してjenkinsReceiverという名前に割り当てています。ScalaでActorを使うのに必要なAkkaなどはPlay Frameworkで半自動的に定義さているので、特に別途の設定を行う必要が無ければそのまま使うことができます。

xfdWsメソッドを見てみましょう。このメソッドはクライアントがWebSocketで接続を開こうとするときに呼ばれるコードです。WebSocketの通信はActionでは処理できないため、ActionではなくWebSocket.asyncで処理しているのが分かると思います。requestが渡ってきますが、処理には必要でないので使っていません。

メソッド1行目ではアクターにメッセージを投げた時にどのくらい応答が無ければタイムアウトとしてエラーを返すかを定義しています。WebSocketのタイムアウトとは関係のない定義です。そもそもWebSocketにタイムアウトはありません。たぶん。

メソッド2行目でjenkinsReceiverにConnectケースクラスを投げて返答させます。jenkinsReceiver ? Connect("joining")jenkinsReceiver.ask(Connect("joining"))の構文糖で、import akka.pattern.askすることで使えるようになります。またaskは非同期で行われるので処理は次に進みます。

メソッド3行目でメッセージの返答を(Iteratee[String, _], Enumerator[String])という型で待ち受けるよう定義し、それをブロックの戻り値にしています。WebSocket.asyncFuture[(Iteratee[String, _], Enumerator[String])]を受けることになります。(WebSocket.asyncはdeprecatedになっているようです。tryAcceptを使えと怒られました。これはFuture[Either[Result, (Iteratee[A, _], Enumerator[A])]]を受けます。)

ここでIterateeEnumeratorという厄介な概念が登場するのですが、Iterateeは入力を担当し、Enumeratorは出力を担当しています。ストリーミングなどの複雑な処理をを容易に行うために利用されているようなのですが、僕も良く分かっていません。

いよいよアクターに移ります。

class JenkinsReceiver extends Actor {
    val (enumerator, channel) = Concurrent.broadcast[String]
    def receive = {
        case Connect(_) =>
            val iteratee = Iteratee.foreach[String] { message =>
                self ! Broadcast("user said " + message)
            }.map { _ =>
                self ! Broadcast("user left")
            }
            sender ! (iteratee, enumerator)
        case JenkinsNotify(msg) =>
            msg match {
                case query: Map[String, String] => 
                // write parsing and broadcast code here
                println(query)
                self ! Broadcast("{ \"status\":\"" + query("status") + "\", \"name\":\"" + query("job_name") + "\", \"no\":\"" + query("build_number") + "\" }")
                case default =>
                    println("unrecognized message: " + default)
            }
        case Broadcast(msg) =>
            channel.push(msg);
    }
}

val (enumerator, channel) = Concurrent.broadcast[String]の部分では、WebSocketに接続している全てのクライアントに対して送信するためのチャンネルを取得しています。書き方次第では個別に通信できそうなのですが、このあたりの仕組みは後ほど調べたいと思います。

def receive ...の部分はメッセージを受信した時の処理をパターンマッチを用いて記述しています。Connect(_)を受け取った時は"foreachでメッセージを受けて自分に対してBroadcastを送り、mapで相手が接続を終了した時はBroadcasttを投げる"という処理をiterateeとして定義しています。これを送信元にenumeratorと組にして返しています。

JenkinsNotifyを受け取った時はmsg: Map[String, String]をケースクラスから取り出し、selfに対してmsgをJSONに変換したものをBroadcastメッセージに載せて送信しています。

さて、Broadcastメッセージを受け取った時は接続している全てのWebSocketに対してメッセージを送信しています。

クライアント側のjavascriptではJSONを受信した時にその内容で画面の色を変える操作を行なっています。メッセージが送られてくるとws.onmessageが呼ばれます。

var ws = new WebSocket("ws://5384a561.ngrok.com/xfd/ws");

ws.onopen = function () {
    $('body').text('opened');
};

ws.onmessage = function (message) {
    var parsed = JSON.parse(message.data);
    if (parsed.status == 'running') $('body').attr('style', 'background-color: cyan');
    if (parsed.status == 'success') $('body').attr('style', 'background-color: green');
    if (parsed.status == 'failed') $('body').attr('style', 'background-color: red');
};

ws.onerror = function (evt) {
    $('#message').append(' Error occurred: ' + evt + "\n");
};

これによりJenkinsのビルドの状況をWabhookするとクライアントに状況が通知されるシステムが出来ました。