views:

111

answers:

3

I have an iterator (actually a Source.getLines) that's reading an infinite stream of data from a URL. Occasionally the iterator throws a java.io.IOException when there is a connection problem. In such situations, I need to re-connect and re-start the iterator. I want this to be seamless so that the iterator just looks like a normal iterator to the consumer, but underneath is restarting itself as necessary.

For example, I'd like to see the following behavior:

scala> val iter = restartingIterator(() => new Iterator[Int]{
  var i = -1
  def hasNext = {
    if (this.i < 3) {
      true
    } else {
      throw new IOException
    }
  }
  def next = {
    this.i += 1
    i
  }
})
res0: ...

scala> iter.take(6).toList
res1: List[Int] = List(0, 1, 2, 3, 0, 1)

I have a partial solution to this problem, but it will fail on some corner cases (e.g. an IOException on the first item after a restart) and it's pretty ugly:

def restartingIterator[T](getIter: () => Iterator[T]) = new Iterator[T] {
  var iter = getIter()
  def hasNext = {
    try {
      iter.hasNext
    } catch {
      case e: IOException => {
        this.iter = getIter()
        iter.hasNext
      }
    }
  }
  def next = {
    try {
      iter.next
    } catch {
      case e: IOException => {
        this.iter = getIter()
        iter.next
      }
    }
  }
}

I keep feeling like there's a better solution to this, maybe some combination of Iterator.continually and util.control.Exception or something like that, but I couldn't figure one out. Any ideas?

+2  A: 

There is a better solution, the Iteratee:

http://apocalisp.wordpress.com/2010/10/17/scalaz-tutorial-enumeration-based-io-with-iteratees/

Here is for example an enumerator that restarts on encountering an exception.

def enumReader[A](r: => BufferedReader, it: IterV[String, A]): IO[IterV[String, A]] = {
  val tmpReader = r
  def loop: IterV[String, A] => IO[IterV[String, A]] = {
    case i@Done(_, _) => IO { i }
    case Cont(k) => for {
      s <- IO { try { val x = tmpReader.readLine; IO(x) }
                catch { case e => enumReader(r, it) }}.join
      a <- if (s == null) k(EOF) else loop(k(El(s)))
    } yield a
  }
  loop(it)
}

The inner loop advances the Iteratee, but the outer function still holds on to the original. Since Iteratee is a persistent data structure, to restart you just have to call the function again.

I'm passing the Reader by name here so that r is essentially a function that gives you a fresh (restarted) reader. In practise you will want to bracket this more effectively (close the existing reader on exception).

Apocalisp
Interesting article, but it doesn't really talk about handling exceptions. Can you elaborate on how you would use scalaz Iteratees to handle my problem?
Steve
I stared at this for 15 minutes, but I still can't wrap my head around it. Which I guess means it's probably not good for me to write such code even if/when I do figure it out...
Steve
The article explains it. The code basically says: To feed from a Reader to an Iteratee, check if it's done accepting input. If it is, just return it. If it's expecting more input, it will have a function `k` for accepting input. Read a line from the reader and assign it to `s`. If we get an exception, restart the whole enumeration. If we got a null line, signal to the Iteratee that we have reached EOF. Otherwise feed `s` to `k` and loop.
Apocalisp
Thanks, that explanation helps much more than the article did. I probably wouldn't use this approach though - it really obscures the control flow in my eyes.
Steve
Right, it abstracts over the control flow. But the data flow is very clear. You are taking some enumeration of Strings and producing an A, somehow. The "what" is explicit, but the "how" is an implementation detail.
Apocalisp
+2  A: 

This is fairly close to your version and using scala.util.control.Exception:

def restartingIterator[T](getIter: () => Iterator[T]) = new Iterator[T] {
  import util.control.Exception.allCatch
  private[this] var i = getIter()
  private[this] def replace() = i = getIter()
  def hasNext: Boolean = allCatch.opt(i.hasNext).getOrElse{replace(); hasNext}
  def next(): T = allCatch.opt(i.next).getOrElse{replace(); next}
}

For some reason this is not tail recursive but it that can be fixed by using a slightly more verbose version:

def restartingIterator2[T](getIter: () => Iterator[T]) = new Iterator[T] {
  import util.control.Exception.allCatch
  private[this] var i = getIter()
  private[this] def replace() = i = getIter()
  @annotation.tailrec def hasNext: Boolean = {
    val v = allCatch.opt(i.hasNext)
    if (v.isDefined) v.get else {replace(); hasNext}
  }
  @annotation.tailrec def next(): T = {
    val v = allCatch.opt(i.next)
    if (v.isDefined) v.get else {replace(); next}
  }
}
huynhjl
Yeah, making it recursive solves the corner case I was a little worried about. I imagine I could get pretty much the same behavior by changing the second "iter.hasNext" and "iter.next" in my solution to "this.hasNext" and "this.next" and adding the talrec annotations.I was kind of hoping there was a simpler solution based on composition somehow though.
Steve
A: 

Here's an answer that doesn't work, but feels like it should:

def restartingIterator[T](getIter: () => Iterator[T]): Iterator[T] = {
  new Traversable[T] {
    def foreach[U](f: T => U): Unit = {
      try {
        for (item <- getIter()) {
          f(item)
        }
      } catch {
        case e: IOException => this.foreach(f)
      }
    }
  }.toIterator
}

I think this very clearly describes the control flow, which is great.

This code will throw a StackOverflowError in Scala 2.8.0 because of a bug in Traversable.toStream, but even after the fix for that bug, this code still won't work for my use case because toIterator calls toStream, which means that it will store all items in memory.

I'd love to be able to define an Iterator by just writing a foreach method, but there doesn't seem to be any easy way to do that.

Steve