views:

358

answers:

4

Basically I want to convert this:

def data(block: T => Unit)

to a Stream (dataToStream is a hypothetical function that do this conversion):

val dataStream: Stream[T] = dataToStream(data)

I suppose this problem could be resolved by continuations:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

Thanks, Dawid

+2  A: 

I still have to figure out how to do that myself. I suspect the answer lies somewhere here:

Edit: removed code that showed how to solved a different problem.

Edit2: Using the code http://gist.github.com/580157 that was initially posted http://gist.github.com/574873, you can do this:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data does not take a block code, but I think this is fine because with the continuation, block can be handled by the caller. The code for Generator can be seen in the gist on github.

huynhjl
Ehrm, didn't you solve a totally different problem than that of the OP? The OP's `data` function called the `block` function ten times and he wanted to turn that into a stream of ten elements. Your `data` function only calls `block` once.
sepp2k
@sepp2k, err, yes indeed. I guess continuation are necessary, then.
huynhjl
I tried to use code from this thread http://stackoverflow.com/questions/2201882/implementing-yield-yield-return-using-scala-continuations/3758084 but without success
Dawid Grzesiak
Yes, I tried that before. Unfortunately it doesn't resolve the problem because of CPS limitations. See the code http://gist.github.com/599575 It returns the error: type mismatch; found : Unit @scala.util.continuations.cpsParam[Unit,Unit] required: Unit data { i => yld(i) }
Dawid Grzesiak
@Dawid See the comment I added to that snippet.
Daniel
+3  A: 

Here's a simple solution that spawns a thread that consumes the data. It posts the data to a SynchronousQueue. A stream the pulls data from the queue is created and returned:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   
Geoff Reedy
Because of CPS limitations this may be the only solution for Scala up to v2.8. Unfortunately it is 170x slower than using pure generator. See https://gist.github.com/a79c0a9669eea3d47eee
Dawid Grzesiak
+2  A: 

Here's a delimited continuations-based implementation, adapted from @Geoff Reedy's offering:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}
pelotom
+4  A: 

EDITED: Modified the examples to show the laziness of traversable.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

The toTraversable method will convert your data function into a Traversable collection. By itself, it's nothing huge, but you can convert this to a TraversableView which is lazy. Here's an example:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

The unfortunate nature of the take method is that it must go one past the last value generated to work correctly, but it will terminate early. The above code would look the same without the ".view" call. However, here's a more compelling example:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

So in conclusion, I believe the collection you're looking for is TraversableView, which is easiest to create view making a Traversable and then calling "view" on it. If you really wanted the Stream type, here's a method that works in 2.8.0.final and will make a "Stream" without threads:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

The unfortunate nature of this method is that it will iterate over the entire traversable before making the stream. This also means all the values need to be buffered in memory. The only alternative is to resort to threads.

As an aside: This was the motivating reason to prefer Traversables as direct returns from scalax.io.File methods: "lines" "chars" and "bytes".

jsuereth
As you see, data is first evaluated and then converted to the Stream. So there is no laziness here.
Dawid Grzesiak
My point is that you can interact with the data as a "stream" if you use TraversableView. By requiring the type "Stream" you're limiting yourself. TraversableView *is* lazy.
jsuereth
If traversable view doesn't look lazy in the REPL it's because the REPL calls "toString" on resulting expressions and this will cause the TraversableView to traverse the entire collection (displaying all the values). If you develop a function using TraversableView you will see its lazyness.
jsuereth
Hmm, it is really not a bad idea. Sometimes this solution will be sufficient (especially when you want to traverse all data in a row) and sometimes not. See http://gist.github.com/603569 Ideally the last example output should be interlaced as well. It is a pity that you can't make a Stream or Iterator for it or you can, but it will evaluate all data first. If you have a Stream/Iterator, you can use two or more data streams parallely. For example take(3) from that take(10) from the other Iterator. Anyway it is a great and helpful piece of code!
Dawid Grzesiak
Using threads, when you didn't consume all data, thread will be not stopped but suspended. So it has drawbacks too...
Dawid Grzesiak
You forgot to call .view on the traversable. This will make it lazy and interleave the results. Without calling .view, all methods on collections are "eager" and will generate intermediate collections. In this case, your call to take is executing immediately. First call view then take.
jsuereth
@jsuereth On general cases I sometimes wonder if I should call .view or .toStream method. Which one is more efficient? I see that .view result is strictly binded with the subject, for example Traversable and TraversableView. So all *View classes must be prepared in advance, which is done by Scala creators.
Dawid Grzesiak
I would avoid calling toStream unless you are sure you can fit the entire collection in memory and wish to do so.
jsuereth
My question was apart of presented solution. I mean what's the pros and cons of calling .view and .toStream in general cases. Streams are lazy and memory efficient.
Dawid Grzesiak
Streams are lazy and memory efficient, however the default implementation of toStream must buffer the entire collections before creating a stream. The Iterable interface *can* make toStream be appropriately lazy. Most collections extend Iterable, so in that case you are right, they are lazy. Streams however are not as memory efficient as you might think. They memoize-> That is they retain previously generated values. In this case view is more memory efficient in that it does not retain generated values.
jsuereth