juan_gandhi: (VP)
Juan-Carlos Gandhi ([personal profile] juan_gandhi) wrote2014-05-16 05:32 pm
Entry tags:

code critique?

Guys, I do appreciate your opinions a lot.

  val patience = 10

  case class UploadProgress(uploaded: Set[File] = Set.empty,
                            failed:   Set[Result[File]] = Set.empty,
                            badLuckStreak: Int = 0) {
    def errors = Result.traverse(failed)
    def +(file:File) = {
      if (badLuckStreak > patience) this
      else {
        uploadOneFile(file) match {
          case Good(file) => UploadProgress(uploaded + file, failed, 0)
          case bad => UploadProgress(uploaded, failed + bad, badLuckStreak + 1)
        }
      }
    }
  }

  def uploadScheduledFiles():UploadProgress = {
    if (!client.isAlive) UploadProgress(Set.empty, Set(Result.error("Upload server is dead, sorry")), 0)
    else {
      (UploadProgress() /: listFilesForUpload)(_ + _)
    }
  }


What happens here: we upload until we are out of patience with a streak of bad luck, probably meaning the server is dead.

[identity profile] ivan-gandhi.livejournal.com 2014-05-17 03:23 pm (UTC)(link)
Попробую с фьючерсами. Да, тут коммуникации никакой не надо; пихнул и сиди посвистывай.

[identity profile] http://users.livejournal.com/_xacid_/ 2014-05-18 12:42 am (UTC)(link)
Was inspired with your code. Hope you will enjoy it:
  import java.util.concurrent.Executors
  import concurrent._, concurrent.duration._

  case class Progress[A, B](f: A => Future[B],
    completed: Seq[A] = Seq.empty,
    failed: Seq[A] = Seq.empty,
    skipped: Seq[A] = Seq.empty,
    stopped: Boolean = false) {

    // Just for more fun
    type Chunk = Seq[(A, Future[B])]
    def success(a: A) = copy(completed = completed :+ a)
    def fail(a: A) = copy(failed = failed :+ a)
    def skip(a: Seq[A]) = copy(skipped = skipped ++ a)
    def fold(e: A Either A) = e.fold(fail, success)

    // Stop if whole chunk failed
    def stop(q: Chunk) = Future.find(q.unzip._2)(
      _ => true).map(_.fold(true)(_ => false))

    def join(q: Chunk) = Future.sequence(q.map {
      case (a, b) => b.map(_ => Right(a))
        .recover { case _ => Left(a) }
    }).map(e => (this /: e)(_ fold _))

    def check(q: Chunk) = stop(q).flatMap(t =>
      join(q).map(_.copy(stopped = t)))

    def ++(q: Seq[A]) = if (stopped)
      Future(skip(q)) else check(q.zip(q.map(f)))

    // Split to chunks with patience size
    def apply(patience: Int, q: Seq[A]) =
      (Future(this) /: q.grouped(patience))(
        (x, y) => x.flatMap(_ ++ y))
  }

  def progress[A, B](patience: Int)(q: Seq[A])(f: A => Future[B])(
    implicit ex: ExecutionContext) = Progress[A, B](f)(patience, q)

  // So let's go ...
  val threadsCount = 5 // Throughput
  val xs = Executors.newFixedThreadPool(threadsCount)
  implicit val ex = ExecutionContext.fromExecutor(xs)

  val res = progress(patience = 10)(1 to 1000 toList) {
    case x if x >= 700 && x <= 777 => Future.failed(new Exception)
    case x => Future.successful(x)
  }

  println(Await.result(res, 1 second))

  xs.shutdownNow()
Edited 2014-05-18 00:46 (UTC)

[identity profile] ivan-gandhi.livejournal.com 2014-05-18 12:51 am (UTC)(link)
Wow, thanks a lot! The logic I was thinking about implemented beautifully.

[identity profile] http://users.livejournal.com/_xacid_/ 2014-05-18 01:42 pm (UTC)(link)
Of course in my implementation not everything is perfect - at least it doesn't start new tasks until current chunk isn't finished completely, even if all works fine without any fails.
But as first draft step IMHO it isn't so bad :) Further it always can be improved more.