http://users.livejournal.com/_xacid_/ ([identity profile] http://users.livejournal.com/_xacid_/) wrote in [personal profile] juan_gandhi 2014-05-18 12:42 am (UTC)

Was inspired with your code. Hope you will enjoy it:
  import java.util.concurrent.Executors
  import concurrent._, concurrent.duration._

  case class Progress[A, B](f: A => Future[B],
    completed: Seq[A] = Seq.empty,
    failed: Seq[A] = Seq.empty,
    skipped: Seq[A] = Seq.empty,
    stopped: Boolean = false) {

    // Just for more fun
    type Chunk = Seq[(A, Future[B])]
    def success(a: A) = copy(completed = completed :+ a)
    def fail(a: A) = copy(failed = failed :+ a)
    def skip(a: Seq[A]) = copy(skipped = skipped ++ a)
    def fold(e: A Either A) = e.fold(fail, success)

    // Stop if whole chunk failed
    def stop(q: Chunk) = Future.find(q.unzip._2)(
      _ => true).map(_.fold(true)(_ => false))

    def join(q: Chunk) = Future.sequence(q.map {
      case (a, b) => b.map(_ => Right(a))
        .recover { case _ => Left(a) }
    }).map(e => (this /: e)(_ fold _))

    def check(q: Chunk) = stop(q).flatMap(t =>
      join(q).map(_.copy(stopped = t)))

    def ++(q: Seq[A]) = if (stopped)
      Future(skip(q)) else check(q.zip(q.map(f)))

    // Split to chunks with patience size
    def apply(patience: Int, q: Seq[A]) =
      (Future(this) /: q.grouped(patience))(
        (x, y) => x.flatMap(_ ++ y))
  }

  def progress[A, B](patience: Int)(q: Seq[A])(f: A => Future[B])(
    implicit ex: ExecutionContext) = Progress[A, B](f)(patience, q)

  // So let's go ...
  val threadsCount = 5 // Throughput
  val xs = Executors.newFixedThreadPool(threadsCount)
  implicit val ex = ExecutionContext.fromExecutor(xs)

  val res = progress(patience = 10)(1 to 1000 toList) {
    case x if x >= 700 && x <= 777 => Future.failed(new Exception)
    case x => Future.successful(x)
  }

  println(Await.result(res, 1 second))

  xs.shutdownNow()

Post a comment in response:

This account has disabled anonymous posting.
If you don't have an account you can create one now.
HTML doesn't work in the subject.
More info about formatting