import java.util.concurrent.Executors import concurrent._, concurrent.duration._ case class Progress[A, B](f: A => Future[B], completed: Seq[A] = Seq.empty, failed: Seq[A] = Seq.empty, skipped: Seq[A] = Seq.empty, stopped: Boolean = false) { // Just for more fun type Chunk = Seq[(A, Future[B])] def success(a: A) = copy(completed = completed :+ a) def fail(a: A) = copy(failed = failed :+ a) def skip(a: Seq[A]) = copy(skipped = skipped ++ a) def fold(e: A Either A) = e.fold(fail, success) // Stop if whole chunk failed def stop(q: Chunk) = Future.find(q.unzip._2)( _ => true).map(_.fold(true)(_ => false)) def join(q: Chunk) = Future.sequence(q.map { case (a, b) => b.map(_ => Right(a)) .recover { case _ => Left(a) } }).map(e => (this /: e)(_ fold _)) def check(q: Chunk) = stop(q).flatMap(t => join(q).map(_.copy(stopped = t))) def ++(q: Seq[A]) = if (stopped) Future(skip(q)) else check(q.zip(q.map(f))) // Split to chunks with patience size def apply(patience: Int, q: Seq[A]) = (Future(this) /: q.grouped(patience))( (x, y) => x.flatMap(_ ++ y)) } def progress[A, B](patience: Int)(q: Seq[A])(f: A => Future[B])( implicit ex: ExecutionContext) = Progress[A, B](f)(patience, q) // So let's go ... val threadsCount = 5 // Throughput val xs = Executors.newFixedThreadPool(threadsCount) implicit val ex = ExecutionContext.fromExecutor(xs) val res = progress(patience = 10)(1 to 1000 toList) { case x if x >= 700 && x <= 777 => Future.failed(new Exception) case x => Future.successful(x) } println(Await.result(res, 1 second)) xs.shutdownNow()
Other options:
no subject