juan_gandhi: (VP)
[personal profile] juan_gandhi
Guys, I do appreciate your opinions a lot.

  val patience = 10

  case class UploadProgress(uploaded: Set[File] = Set.empty,
                            failed:   Set[Result[File]] = Set.empty,
                            badLuckStreak: Int = 0) {
    def errors = Result.traverse(failed)
    def +(file:File) = {
      if (badLuckStreak > patience) this
      else {
        uploadOneFile(file) match {
          case Good(file) => UploadProgress(uploaded + file, failed, 0)
          case bad => UploadProgress(uploaded, failed + bad, badLuckStreak + 1)
        }
      }
    }
  }

  def uploadScheduledFiles():UploadProgress = {
    if (!client.isAlive) UploadProgress(Set.empty, Set(Result.error("Upload server is dead, sorry")), 0)
    else {
      (UploadProgress() /: listFilesForUpload)(_ + _)
    }
  }


What happens here: we upload until we are out of patience with a streak of bad luck, probably meaning the server is dead.

Date: 2014-05-17 04:03 am (UTC)
From: [identity profile] thedeemon.livejournal.com
>Guys, I do appreciate our opinions a lot.

We appreciate ours too. ;)

Date: 2014-05-17 04:54 am (UTC)
From: [identity profile] thedeemon.livejournal.com
Я думал там опечатка просто ("our" vs. "your").
Edited Date: 2014-05-17 05:03 am (UTC)

Date: 2014-05-17 05:11 am (UTC)
From: [identity profile] ivan-gandhi.livejournal.com
omfg; да, опечатка. Так и живем, блин.

Date: 2014-05-17 04:57 am (UTC)
From: [identity profile] jdevelop.livejournal.com
I would create a set of futures and then waited upon completion of all of them, with certain timeout

if not using the Akka actors.

Date: 2014-05-17 05:11 am (UTC)
From: [identity profile] ivan-gandhi.livejournal.com
Omfg. Right. I was planning it.
But actually, I was planning to just have a parallel collection; is not it enough?

Also, judging by the iq level of the server, I've abandoned the idea of sending all of them in one fell swoop, in one request.

Date: 2014-05-17 11:01 am (UTC)
From: [identity profile] http://users.livejournal.com/_xacid_/
I doubt in parallel collections for this case. Futures should be much better. Better even than actors.

Date: 2014-05-17 03:23 pm (UTC)
From: [identity profile] ivan-gandhi.livejournal.com
Попробую с фьючерсами. Да, тут коммуникации никакой не надо; пихнул и сиди посвистывай.

Date: 2014-05-18 12:42 am (UTC)
From: [identity profile] http://users.livejournal.com/_xacid_/
Was inspired with your code. Hope you will enjoy it:
  import java.util.concurrent.Executors
  import concurrent._, concurrent.duration._

  case class Progress[A, B](f: A => Future[B],
    completed: Seq[A] = Seq.empty,
    failed: Seq[A] = Seq.empty,
    skipped: Seq[A] = Seq.empty,
    stopped: Boolean = false) {

    // Just for more fun
    type Chunk = Seq[(A, Future[B])]
    def success(a: A) = copy(completed = completed :+ a)
    def fail(a: A) = copy(failed = failed :+ a)
    def skip(a: Seq[A]) = copy(skipped = skipped ++ a)
    def fold(e: A Either A) = e.fold(fail, success)

    // Stop if whole chunk failed
    def stop(q: Chunk) = Future.find(q.unzip._2)(
      _ => true).map(_.fold(true)(_ => false))

    def join(q: Chunk) = Future.sequence(q.map {
      case (a, b) => b.map(_ => Right(a))
        .recover { case _ => Left(a) }
    }).map(e => (this /: e)(_ fold _))

    def check(q: Chunk) = stop(q).flatMap(t =>
      join(q).map(_.copy(stopped = t)))

    def ++(q: Seq[A]) = if (stopped)
      Future(skip(q)) else check(q.zip(q.map(f)))

    // Split to chunks with patience size
    def apply(patience: Int, q: Seq[A]) =
      (Future(this) /: q.grouped(patience))(
        (x, y) => x.flatMap(_ ++ y))
  }

  def progress[A, B](patience: Int)(q: Seq[A])(f: A => Future[B])(
    implicit ex: ExecutionContext) = Progress[A, B](f)(patience, q)

  // So let's go ...
  val threadsCount = 5 // Throughput
  val xs = Executors.newFixedThreadPool(threadsCount)
  implicit val ex = ExecutionContext.fromExecutor(xs)

  val res = progress(patience = 10)(1 to 1000 toList) {
    case x if x >= 700 && x <= 777 => Future.failed(new Exception)
    case x => Future.successful(x)
  }

  println(Await.result(res, 1 second))

  xs.shutdownNow()
Edited Date: 2014-05-18 12:46 am (UTC)

Date: 2014-05-18 12:51 am (UTC)
From: [identity profile] ivan-gandhi.livejournal.com
Wow, thanks a lot! The logic I was thinking about implemented beautifully.

Date: 2014-05-18 01:42 pm (UTC)
From: [identity profile] http://users.livejournal.com/_xacid_/
Of course in my implementation not everything is perfect - at least it doesn't start new tasks until current chunk isn't finished completely, even if all works fine without any fails.
But as first draft step IMHO it isn't so bad :) Further it always can be improved more.

Date: 2014-05-17 11:07 am (UTC)
From: [identity profile] http://users.livejournal.com/_xacid_/
Just one problem will be there - you can't cancel uncompleted futures if time is out.
So instead of create whole bunch of task it better would create some stream of chunks with them.
IMHO.

Date: 2014-05-17 08:08 am (UTC)
From: [identity profile] shabunc.livejournal.com
Я на всякий случай.

Вот про такое место вы знаете?
http://codereview.stackexchange.com

Мне интересно читать такие посты, не подумайте, просто там заведомо шире аудитория.

Date: 2014-05-17 03:24 pm (UTC)
From: [identity profile] ivan-gandhi.livejournal.com
Спасибо большое. Не слышал. Буду пользоваться.

Date: 2014-05-18 12:11 am (UTC)
From: [identity profile] mstone.livejournal.com
+1 for futures or any other machinery for asynchronous programming in your language/platform.

If there are N files to upload, start N asynchronous upload tasks; the continuation for each task should check the result and update the corresponding item in the collection of N results. After all the N tasks are started, call asynchronous wait_all/when_all for the collection of the started tasks.

Bonus point if the platform's library provides an asynchronous implementation of HTTP POST: it's naturally IO-bound so shouldn't block any threads. Double-bonus if it supports cooperative cancellation.

This pattern is first-class citizen in C#/.NET these days with full library support of asynchronous I/O functions and compiler support for automatic code rewriting into continuation-passing style for asynchrony. There should be something for Scala too (first link of https://www.google.com/search?q=async+await+scala seems relevant).

Date: 2014-05-18 12:29 am (UTC)
From: [identity profile] ivan-gandhi.livejournal.com
This is the kind of solution I was thinking about.
The problem is, I have to stop them all if n report failure... well, is it a problem? Probably not.

Date: 2014-05-18 12:53 am (UTC)
From: [identity profile] http://users.livejournal.com/_xacid_/
Imho, better and simpler do not start them all before n report success :)

Date: 2014-05-18 09:27 am (UTC)
From: [identity profile] mstone.livejournal.com
Semaphore? Set to n initially. Continuation of each upload task decrements it if the task failed. Another task spawns when (if) the semaphore is 0 and triggers the cancellation token for all the still-running upload tasks.

Date: 2014-05-18 03:50 pm (UTC)
From: [identity profile] ivan-gandhi.livejournal.com
Right; the perfect case for semaphore...

Profile

juan_gandhi: (Default)
Juan-Carlos Gandhi

June 2025

S M T W T F S
1 2345 6 7
8 9 10 11 121314
15161718 1920 21
222324252627 28
29 30     

Most Popular Tags

Style Credit

Expand Cut Tags

No cut tags
Page generated Jul. 1st, 2025 12:10 pm
Powered by Dreamwidth Studios