Lambdaカクテル

Common LISPが好きなWeb屋さんです 自宅サーバやフロントエンドもできます

ScalaでAkkaのSTMを使う その1(Agents)

Akkaでアプリケーションを開発していて、ついにSTMを使う機会があったので、メモを兼ねてこの記事を書きます。
AkkaのDocumentationを訳しました。

STMとは

AkkaにおけるSTM

Akkaが利用するScalaSTMは近いうちにScalaに統合される。
Akkaでは、ActorとAgents/Transactorsが統合されている。

Agents

AkkaにおけるAgentsの考えはClojureのagentsにヒントを得ている。
Agentsは独立した状況における非同期的な値の更新を可能としている。
Agentsは存在する間、単一のストレージに束縛され、その状態はアクションの結果によってのみ変更することができる。
UpdateアクションはAgentの状態に非同期でapplyされ、新たなAgentの状態となる値を返す関数である。
Agentの状態はイミュータブルである必要がある。

Agentsへの更新処理こそ非同期であるものの、Agentの状態は常に即時的に、あらゆるスレッドから事前の通知無く(getやapplyを用いて)読み取ることができる。

Agentsはreactiveである。あらゆるAgentsの更新処理はスレッドプールのスレッドにより次々と(interleaved)処理される。どの時点でも、おのおののAgentは一つまでのsendアクションしか実行しない。他のスレッドからagentに割り当てられたアクションは送信された順に処理されるが、他のソースの同じagentに割り当てられたアクションと交ぜこぜになる可能性がある。
enclosedなトランザクション中でAgentが使われた場合、agentはそのトランザクションに参加(perticipate)する。AgentsはScala STMと統合されている - トランザクション中で生成される全ての割り当て(dispatches)はトランザがコミットするまで保持され、トランザクションをやり直したり中止したりすると破棄される。

Agentを生成して止めてみる

Agentは、初期値を/Agent(value)/のように渡して宣言することで生成される:

import akka.agent.Agent

val agent = Agent(5)

Agentを生成するには暗黙のActorSystemが(内部で利用するActorを生成するために)必要となる。
暗黙のスコープにActorSystemを置くこともできる:

import akka.actor.ActorSystem
import akka.agent.Agent

implicit val system = ActorSystem("app")

val agent = Agent(5)

もしくは、Agentを作成する時に明示的に渡してもよい:

import akka.actor.ActorSystem
import akka.agent.Agent

implicit val system = ActorSystem("app")

val agent = Agent(5)(system)

Agentは/close/を呼ぶまで動き続ける。そうすることでgarbage collectionの対象になる。(何らかの方法であなたがAgentを保持しない限り)

agent.close()

Agentを更新する

現在の状態をもとに新たな値を生む関数か、単に新たな値を送信することでAgentを更新することができる。Agentは新たな値か関数をアトミックかつ非同期的に適用する。更新はfire-forgetルールに基き、とりあえず値・関数が適用されることだけが保障される。いつ更新が適用されるかの保障は無いが、シングルスレッドからのAgentへの割り当ては順番通りに発生する。値や関数はsend関数を呼ぶことで更新される。

// 値を送信する
agent send 7

// 関数を送信する
agent send (_ + 1)
agent send (_ * 2)

それ自身のスレッドで関数を割り当てて状態を更新することもできます。これは相互作用するスレッドプールでは役には立たないが、長い間の利用やブロッキング処理では利用される。これはsenfOffメソッドで行うことができる。
sendOffを利用してもsendを利用しても、割り当ては順番通りに実行される。

// 時間のかかる処理やブロックする関数をsendOffする
agent sendOff (longRunningOrBlockingFunction)

Agentの値を読み出す

Agentはこのようにカッコと一緒に付けて呼び出すことでdereferenceできる(Agentの値を得られる):

val result = agent()

getメソッドを使ってもよい:

val result = agent.get

Agentの現在の値を得ても、何らのメッセージパッシングや発生も伴わない。
つまりAgentへの更新は非同期ですが、Agentの状態の読み出しは同期的だ。

Agentの値を待つ

キューされた送信が全て完了するまで待つこともできる。awaitで可能です。

import scala.concurrent.duration._
import akka.util.Timeout

implicit val timeout = Timeout(5 seconds)

val result = agent.await
<||
この値のFutureを得ることもできる。これは現在キューされた更新が完了した時点で完了する:
>|scala|
import scala.concurrent.Await

implicit val timeout = Timeout(5 seconds)
val future = agent.future

val result = Await.result(future, timeout.duration)

トランザクショナルAgents

enclosedなトランザクション中でAgentが使われた場合、agentはそのトランザクションに参加(perticipate)する。トランザクション中でAgentに送信するとAgentへの割り当てはトランザクションかコミットするまで保持されるか、トランザクションが中止された時点で破棄される。ここに例を置く:

import akka.agent.Agent
import scala.concurrent.duration._
import akka.util.Timeout
import scala.concurrent.stm._

def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = { atomic { txn ⇒

    if (from.get < amount) false
    else {

      from send (_ - amount)
      to send (_ + amount)
      true

} }

}

val from = Agent(100)
val to = Agent(20)
val ok = transfer(from, to, 50)

implicit val timeout = Timeout(5 seconds)
val fromValue = from.await // -> 50

val toValue = to.await // -> 70

モナディックな用法

Agentはモナディックでもあるので、for内包を用いて操作を連結することができる。モナディックな用法では、新たなAgentが元のAgentを触らずに放置して生成される。そういうわけで古い値(Agents)がそのまま使える。『永続的』とも呼ばれる。
モナディックな用法の例:

val agent1 = Agent(3)
val agent2 = Agent(5)

// uses foreach

varresult=0
for(valueagent1){

  result = value + 1
}

// uses map

valagent3= for(valueagent1)yieldvalue+1// or using map directly

val agent4 = agent1 map (_ + 1)

// uses flatMap

valagent5= for{value1agent1 value2agent2

} yield value1 + value2

原本: Akka documentation (Typesafe Inc.)