tags:

views:

857

answers:

8

Background:

I have a sequence of contiguous, time-stamped data. The data-sequence has gaps in it where the data is not contiguous. I want create a method to split the sequence up into a sequence of sequences so that each subsequence contains contiguous data (split the input-sequence at the gaps).

Constraints:

  • The return value must be a sequence of sequences to ensure that elements are only produced as needed (cannot use list/array/cacheing)
  • The solution must NOT be O(n^2), probably ruling out a Seq.take - Seq.skip pattern (cf. Brian's post)
  • Bonus points for a functionally idiomatic approach (since I want to become more proficient at functional programming), but it's not a requirement.

Method signature

let groupContiguousDataPoints (timeBetweenContiguousDataPoints : TimeSpan) (dataPointsWithHoles : seq<DateTime * float>) : (seq<seq< DateTime * float >>)= ...

On the face of it the problem looked trivial to me, but even employing Seq.pairwise, IEnumerator<_>, sequence comprehensions and yield statements, the solution eludes me. I am sure that this is because I still lack experience with combining F#-idioms, or possibly because there are some language-constructs that I have not yet been exposed to.

// Test data
let numbers = {1.0..1000.0}
let baseTime = DateTime.Now
let contiguousTimeStamps = seq { for n in numbers ->baseTime.AddMinutes(n)}

let dataWithOccationalHoles = Seq.zip contiguousTimeStamps numbers |> Seq.filter (fun (dateTime, num) -> num % 77.0 <> 0.0) // Has a gap in the data every 77 items

let timeBetweenContiguousValues = (new TimeSpan(0,1,0))

dataWithOccationalHoles |> groupContiguousDataPoints timeBetweenContiguousValues |> Seq.iteri (fun i sequence -> printfn "Group %d has %d data-points: Head: %f" i (Seq.length sequence) (snd(Seq.hd sequence)))
+1  A: 

You seem to want a function that has signature

(`a -> bool) -> seq<'a> -> seq<seq<'a>>

I.e. a function and a sequence, then break up the input sequence into a sequence of sequences based on the result of the function.

Caching the values into a collection that implements IEnumerable would likely be simplest (albeit not exactly purist, but avoiding iterating the input multiple times. It will lose much of the laziness of the input):

let groupBy (fun: 'a -> bool) (input: seq) =
  seq {
    let cache = ref (new System.Collections.Generic.List())
    for e in input do
      (!cache).Add(e)
      if not (fun e) then
        yield !cache
        cache := new System.Collections.Generic.List()
    if cache.Length > 0 then
     yield !cache
  }

An alternative implementation could pass cache collection (as seq<'a>) to the function so it can see multiple elements to chose the break points.

Richard
Richard, I was hoping to be able to avoid using a cache for the inner sequences.
Treefrog
Also, the inner-most let appears to be scoped only in the if statement. Did you intend to make cache mutable?
Treefrog
@Treefrog: oops yes, it should be a ref List<'a>, will correct that.
Richard
@Treefrog: I don't think this can be done without caching,: seq<'a> is an interface, you need a concrete type of which to yield instances.
Richard
Well, you could use a nested `seq` workflow instead.
Alexey Romanov
Alexey, could you elaborate on how one would use a nested seq workflow?
Treefrog
To use an inner seq workflow while retaining the single iteration of the input would require directly using an `IEnumerable<T>` from the seq (i.e. go to the `IEnumerator<T>` underlying the seq and call `GetEnumerator()` directly). In effect creating a `Seq.takeWhile` that allows another `takeWhile` to be used on the remainder of the sequence.
Richard
@Richard Right. Haskell's `span` does this for free, but with `seq` you have a difficulty.
Alexey Romanov
+1  A: 

A Haskell solution, because I don't know F# syntax well, but it should be easy enough to translate:

type TimeStamp = Integer -- ticks
type TimeSpan  = Integer -- difference between TimeStamps

groupContiguousDataPoints :: TimeSpan -> [(TimeStamp, a)] -> [[(TimeStamp, a)]]

There is a function groupBy :: (a -> a -> Bool) -> [a] -> [[a]] in the Prelude:

The group function takes a list and returns a list of lists such that the concatenation of the result is equal to the argument. Moreover, each sublist in the result contains only equal elements. For example,

group "Mississippi" = ["M","i","ss","i","ss","i","pp","i"]

It is a special case of groupBy, which allows the programmer to supply their own equality test.

It isn't quite what we want, because it compares each element in the list with the first element of the current group, and we need to compare consecutive elements. If we had such a function groupBy1, we could write groupContiguousDataPoints easily:

groupContiguousDataPoints maxTimeDiff list = groupBy1 (\(t1, _) (t2, _) -> t2 - t1 <= maxTimeDiff) list

So let's write it!

groupBy1 :: (a -> a -> Bool) -> [a] -> [[a]]
groupBy1 _    []            = [[]]
groupBy1 _    [x]           = [[x]]
groupBy1 comp (x : xs@(y : _))
  | comp x y  = (x : firstGroup) : otherGroups
  | otherwise = [x] : groups
  where groups@(firstGroup : otherGroups) = groupBy1 comp xs

UPDATE: it looks like F# doesn't let you pattern match on seq, so it isn't too easy to translate after all. However, this thread on HubFS shows a way to pattern match sequences by converting them to LazyList when needed.

UPDATE2: Haskell lists are lazy and generated as needed, so they correspond to F#'s LazyList (not to seq, because the generated data is cached (and garbage collected, of course, if you no longer hold a reference to it)).

Alexey Romanov
Alexey, you are working on an input-list, and producing an output of list of lists. As I explained in my question, I need to operate on a sequence of sequences rather than a list of lists, since in F# sequences are generated as needed, as opposed to lists that are immediately constructed in memory (which is a problem for very large data-sets)
Treefrog
+2  A: 

I translated Alexey's Haskell to F#, but it's not pretty in F#, and still one element too eager.

I expect there is a better way, but I'll have to try again later.

let N = 20
let data =  // produce some arbitrary data with holes
    seq {
        for x in 1..N do
            if x % 4 <> 0 && x % 7 <> 0 then
                printfn "producing %d" x
                yield x
    }

let rec GroupBy comp (input:LazyList<'a>) : LazyList<LazyList<'a>> = 
    LazyList.delayed (fun () ->
    match input with
    | LazyList.Nil -> LazyList.cons (LazyList.empty()) (LazyList.empty())
    | LazyList.Cons(x,LazyList.Nil) -> 
        LazyList.cons (LazyList.cons x (LazyList.empty())) (LazyList.empty())
    | LazyList.Cons(x,(LazyList.Cons(y,_) as xs)) ->
        let groups = GroupBy comp xs
        if comp x y then
            LazyList.consf 
                (LazyList.consf x (fun () -> 
                    let (LazyList.Cons(firstGroup,_)) = groups
                    firstGroup)) 
                (fun () -> 
                    let (LazyList.Cons(_,otherGroups)) = groups
                    otherGroups)
        else
            LazyList.cons (LazyList.cons x (LazyList.empty())) groups)

let result = data |> LazyList.of_seq |> GroupBy (fun x y -> y = x + 1)
printfn "Consuming..."
for group in result do
    printfn "about to do a group"
    for x in group do
        printfn "  %d" x
Brian
Brian, I when trying to FSI your code I get the error-message below, even though I have FSharp.PowerPack.dll referenced. (I can even find LazyList in the PowerPack using the object browser) "The type 'LazyList' is not defined. A construct with this name was found in FSharp.PowerPack.dll, which contains some modules and types that were implicitly referenced in some previous versions of F#. You may need to add an explicit reference to this DLL in order to compile this code."
Treefrog
FSI cannot see references in the project; you need to say #r "FSharp.PowerPack.dll";; in the FSI window to get that reference.
Brian
@Brian: Thanks, #r does the trick :-)
Treefrog
A: 

Below is some code that does what I think you want. It is not idiomatic F#.

(It may be similar to Brian's answer, though I can't tell because I'm not familiar with the LazyList semantics.)

But it doesn't exactly match your test specification: Seq.length enumerates its entire input. Your "test code" calls Seq.length and then calls Seq.hd. That will generate an enumerator twice, and since there is no caching, things get messed up. I'm not sure if there is any clean way to allow multiple enumerators without caching. Frankly, seq<seq<'a>> may not be the best data structure for this problem.

Anyway, here's the code:

type State<'a> = Unstarted | InnerOkay of 'a | NeedNewInner of 'a | Finished

// f() = true means the neighbors should be kept together
// f() = false means they should be split
let split_up (f : 'a -> 'a -> bool) (input : seq<'a>) =
    // simple unfold that assumes f captured a mutable variable
    let iter f = Seq.unfold (fun _ -> 
        match f() with
        | Some(x) -> Some(x,())
        | None -> None) ()

    seq {
        let state = ref (Unstarted)
        use ie = input.GetEnumerator()

        let innerMoveNext() = 
            match !state with
            | Unstarted -> 
                if ie.MoveNext()
                then let cur = ie.Current
                     state := InnerOkay(cur); Some(cur)
                else state := Finished; None 
            | InnerOkay(last) ->
                if ie.MoveNext()
                then let cur = ie.Current
                     if f last cur
                     then state := InnerOkay(cur); Some(cur)
                     else state := NeedNewInner(cur); None
                else state := Finished; None
            | NeedNewInner(last) -> state := InnerOkay(last); Some(last)
            | Finished -> None 

        let outerMoveNext() =
            match !state with
            | Unstarted | NeedNewInner(_) -> Some(iter innerMoveNext)
            | InnerOkay(_) -> failwith "Move to next inner seq when current is active: undefined behavior."
            | Finished -> None

        yield! iter outerMoveNext }


open System

let groupContigs (contigTime : TimeSpan) (holey : seq<DateTime * int>) =
    split_up (fun (t1,_) (t2,_) -> (t2 - t1) <= contigTime) holey


// Test data
let numbers = {1 .. 15}
let contiguousTimeStamps = 
    let baseTime = DateTime.Now
    seq { for n in numbers -> baseTime.AddMinutes(float n)}

let holeyData = 
    Seq.zip contiguousTimeStamps numbers 
        |> Seq.filter (fun (dateTime, num) -> num % 7 <> 0)

let grouped_data = groupContigs (new TimeSpan(0,1,0)) holeyData


printfn "Consuming..."
for group in grouped_data do
    printfn "about to do a group"
    for x in group do
        printfn "  %A" x
Gabriel
I think that your use of the `use` keyword is causing the problems with enumerating your sequences twice. Off hand, I'm not sure if there's an easy way to dispose of the enumerator correctly while still allowing multiple traversals.
kvb
@kvb, can you elaborate? I have not tried running this code, but at a glance it looks ok to me. Is there a repro that fails?
Brian
It seems that the problems people are encountering with this and other solutions (iterating the second seq before the first has been iterated completely) comes from a mis-specification or underspecification of the original problem: It asks for no caching. Therefore, if the consumer begins consuming the 2nd seq before it finishes consuming the 1st, what is the producer (this code we're all trying to write) supposed to yield for the 2nd seq? ...
Gabriel
... If the 2nd seq yields the current element and moves on, then the 1st seq is now invalid (ask yourself, what should it (1st seq) yield if the consumer then resumes iterating it?). If the 2nd doesn't yield the current element, what should it do instead?
Gabriel
Basically, seq<seq<_>> allows the consumer to do things (like skip uncompleted inner seqs) that are make no sense given the nature of the underlying data and the requirement that it not be cached.
Gabriel
+1  A: 

(EDIT: This suffers from a similar problem to Brian's solution, in that iterating the outer sequence without iterating over each inner sequence will mess things up badly!)

Here's a solution that nests sequence expressions. The imperitave nature of .NET's IEnumerable<T> is pretty apparent here, which makes it a bit harder to write idiomatic F# code for this problem, but hopefully it's still clear what's going on.

let groupBy cmp (sq:seq<_>) =
  let en = sq.GetEnumerator()
  let rec partitions (first:option<_>) =
    seq {
      match first with
      | Some first' ->             //'
        (* The following value is always overwritten; 
           it represents the first element of the next subsequence to output, if any *)
        let next = ref None

        (* This function generates a subsequence to output, 
           setting next appropriately as it goes *)
        let rec iter item = 
          seq {
            yield item
            if (en.MoveNext()) then
              let curr = en.Current
              if (cmp item curr) then
                yield! iter curr
              else // consumed one too many - pass it on as the start of the next sequence
                next := Some curr
            else
              next := None
          }
        yield iter first' (* ' generate the first sequence *)
        yield! partitions !next (* recursively generate all remaining sequences *)
      | None -> () // return an empty sequence if there are no more values
    }
  let first = if en.MoveNext() then Some en.Current else None
  partitions first

let groupContiguousDataPoints (time:TimeSpan) : (seq<DateTime*_> -> _) = 
  groupBy (fun (t,_) (t',_) -> t' - t <= time)
kvb
kvb, I am impressed with how functional you managed to make this (using only one ref cell). I will study it to improve my grasp of functional programming (The recursion makes it a little hard for me to follow). Thanks for your effort!
Treefrog
Ha, I was just about to comment on the issues similar to Brian's solution :-)This is turning into a real brain-twister (not Brian-twister).
Treefrog
A: 

Ok, here's an answer I'm not unhappy with.

(EDIT: I am unhappy - it's wrong! No time to try to fix right now though.)

It uses a bit of imperative state, but it is not too difficult to follow (provided you recall that '!' is the F# dereference operator, and not 'not'). It is as lazy as possible, and takes a seq as input and returns a seq of seqs as output.

let N = 20
let data =  // produce some arbitrary data with holes
    seq {
        for x in 1..N do
            if x % 4 <> 0 && x % 7 <> 0 then
                printfn "producing %d" x
                yield x
    }
let rec GroupBy comp (input:seq<_>) = seq {
    let doneWithThisGroup = ref false
    let areMore = ref true
    use e = input.GetEnumerator()
    let Next() = areMore := e.MoveNext(); !areMore
    // deal with length 0 or 1, seed 'prev'
    if not(e.MoveNext()) then () else
    let prev = ref e.Current
    while !areMore do
        yield seq {
            while not(!doneWithThisGroup) do
                if Next() then
                    let next = e.Current 
                    doneWithThisGroup := not(comp !prev next)
                    yield !prev 
                    prev := next
                else
                    // end of list, yield final value
                    yield !prev
                    doneWithThisGroup := true } 
        doneWithThisGroup := false }
let result = data |> GroupBy (fun x y -> y = x + 1)
printfn "Consuming..."
for group in result do
    printfn "about to do a group"
    for x in group do
        printfn "  %d" x
Brian
Brian, this is what I was looking for :-)My own attempt at solving the problem used a very similar approach (nested sequence comprehensions), but produced erratic results. At first I thought that this was due to sequence comprehension closures all capturing the same ref cell, but I just now discovered that the error were due to erroneous test data. I seem to have made multiple calls to "DateTime.Now" where only one was intended, causing subsequent DateTime comparisons to fail.BTW - The "if not(e.MoveNext()) then () else ..." appears to be equivalent to the simpler "if e.MoveNext() then..."?
Treefrog
The more I use sequence expressions, the less I understand them... Why does `Seq.length (GroupBy (fun _ _ -> true) [1])` go into an infinite loop?
kvb
Also, there seems to be no need to declare GroupBy "rec" since it is not recursive :-)
Treefrog
I am also getting an infinite loop in the "while !areMore do". It is as if the "yield seq" statement is never entered.
Treefrog
Yup; this solution is totally wrong, argh. If the consumer demands elements of the outer seq, but does not consume elements of the inner seq, for example, the effects never happen and no progress is ever made consuming the original list.
Brian
Keep in mind that seq { ... } is an expression that always returns immediately. Until someone gets the enumerator of the resultant seq object and calls MoveNext, it doesn't run any of the code inside the seq block. My solution presumes the consumer will entirely enumerate each inner seq before MoveNext-ing to the next outer one.
Brian
+1  A: 

Okay, trying again. Achieving the optimal amount of laziness turns out to be a bit difficult in F#... On the bright side, this is somewhat more functional than my last attempt, in that it doesn't use any ref cells.

let groupBy cmp (sq:seq<_>) =
  let en = sq.GetEnumerator()
  let next() = if en.MoveNext() then Some en.Current else None
  (* this function returns a pair containing the first sequence and a lazy option indicating the first element in the next sequence (if any) *)
  let rec seqStartingWith start =
    match next() with
    | Some y when cmp start y ->
        let rest_next = lazy seqStartingWith y // delay evaluation until forced - stores the rest of this sequence and the start of the next one as a pair
        seq { yield start; yield! fst (Lazy.force rest_next) }, 
          lazy Lazy.force (snd (Lazy.force rest_next))
    | next -> seq { yield start }, lazy next
  let rec iter start =
    seq {
      match (Lazy.force start) with
      | None -> ()
      | Some start -> 
          let (first,next) = seqStartingWith start
          yield first
          yield! iter next
    }
  Seq.cache (iter (lazy next()))
kvb
This doesn't dispose the enumerator. At a glance, you could probably do that in the 'else' branch of next().
Brian
I get an exception with the following (using VS2010 beta 1): "error FS0193: internal error: the module/namespace 'Microsoft.FSharp.Control' from compilation unit 'FSharp.Core' did not contain the val 'Lazy`1.Force.1'" Any ideas?
Treefrog
@Treefrog - I don't have VS2010 on this computer, but I don't get that error using F# 1.9.6.16... The "internal error" bit makes it look like a compiler bug to me; maybe report it to [email protected] and see what they say?
kvb
+1  A: 

I think this does what you want

dataWithOccationalHoles 
|> Seq.pairwise
|> Seq.map(fun ((time1,elem1),(time2,elem2)) -> if time2-time1 = timeBetweenContiguousValues then 0, ((time1,elem1),(time2,elem2)) else 1, ((time1,elem1),(time2,elem2)) )
|> Seq.scan(fun (indexres,(t1,e1),(t2,e2)) (index,((time1,elem1),(time2,elem2))) ->  (index+indexres,(time1,elem1),(time2,elem2))  ) (0,(baseTime,-1.0),(baseTime,-1.0))
|> Seq.map( fun (index,(time1,elem1),(time2,elem2)) -> index,(time2,elem2) )
|> Seq.filter( fun (_,(_,elem)) -> elem <> -1.0)
|> PSeq.groupBy(fst)
|> Seq.map(snd>>Seq.map(snd))

Thanks for asking this cool question

jlezard