package scalaz package concurrent import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.ConcurrentLinkedQueue import Scalaz._ sealed trait Actor[A] { private val suspended = new AtomicBoolean(true) private val mbox = new ConcurrentLinkedQueue[A] private def work = { val mt = mbox.isEmpty if (mt) () => () else if (suspended.compareAndSet(!mt, false)) act ! (()) else () => () } val toEffect: Effect[A] = effect[A]((a) => this ! a) def !(a: A) = if (mbox offer a) work else toEffect ! a def apply(a: A) = this ! a private val act: Effect[Unit] = effect((u: Unit) => { val m = mbox.poll if (m != null) try { e(m) act ! u } catch { case e => onError(e) } else { suspended.set(true) work } }) val e: A => Unit implicit val strategy: Strategy val onError: Throwable => Unit } trait Actors { def actor[A](err: Throwable => Unit, c: A => Unit)(implicit s: Strategy): Actor[A] = new { val e = c implicit val strategy = s val onError = err } with Actor[A] def actor[A](c: A => Unit)(implicit s: Strategy): Actor[A] = actor[A]((e: Throwable) => throw e, c): Actor[A] implicit def ActorFrom[A](a: Actor[A]): A => Unit = a ! _ }