views:

116

answers:

2

Hi All,

I'm performing streaming reads of an object using BufferedReader.

I need to do two things with this object:

  1. Pass it to a SuperCSV csv reader
  2. Obtain the raw lines and keep them in a (Clojure) lazy sequence

Currently, I am having to use two different BufferedReaders: one as an argument to a SuperCSV CSV reader class and one to initialize the lazy sequence of raw lines. I'm effectively downloading the S3 object twice, which is expensive ($) and slow.

One of my colleagues pointed out that something analogous to a Unix "tee" command is what I'm looking for. A BufferedReader that could somehow be "split", download a chunk of data, and pass a copy to both the lazy sequence and csv reader functionality would be useful.

I'm also currently investigating whether it would be possible to wrap the lazy sequence in a BufferedReader and pass that to super csv. I've had some Java heap space issues when passing very large lazy sequences to multiple consumers, so I'm kind of worried about employing this solution.

Another solution is just downloading the file locally and then opening two streams on this file. This eliminates the original motivation behind streaming: allowing processing of the file to begin as soon as data starts arriving.

The final solution, and one that I'd consider only if nothing else works, is implementing my own CSV reader that returns both parsed CSV and the original unparsed line. If you've used a very solid CSV reader that can return both a Java Hash of parsed CSV data and the original unparsed line, please let me know!

Thanks!

+2  A: 

I'd be inclined to go with creating a seq of lines from the network, and then hand that over to however many processes need to work on that seq; persistent data structures are cool that way. In the case of needing to turn a seq of strings into a Reader that you can hand off to the SuperCSV api, this seems to work:

(import '[java.io Reader StringReader])

(defn concat-reader
  "Returns a Reader that reads from a sequence of strings."
  [lines]
  (let [srs (atom (map #(StringReader. %) lines))]
    (proxy [Reader] []
      (read 
        ([] 
          (let [c (.read (first @srs))]
            (if (and (neg? c) (swap! srs next))
              (.read this)
              c)))
        ([cbuf] 
          (.read this cbuf 0 (count cbuf)))
        ([cbuf off len]
          (let [actual (.read (first @srs) cbuf off len)]
            (if (and (neg? actual) (swap! srs next))
              (.read this cbuf off len)
              actual))))
      (close [] ))))

E.g.

user=> (def r (concat-reader ["foo" "bar"]))
#'user/r
user=> (def cbuf (char-array 2))
#'user/cbuf
user=> (.read r cbuf)
2
user=> (seq cbuf)
(\f \o)
user=> (char (.read r))
\o
user=> (char (.read r))
\b
Alex Taggart
wwmorgan on #clojure clued me in on a solution that fundamentally adheres to your statement: "I'd be inclined to go with creating a seq of lines from the network, and then hand that over to however many processes need to work on that seq" I used (line-seq (create-buffered-reader-for-s3-object "object-name.csv")) to obtain the lazy sequence, and then wrapped each and every line in its own StringReader that was then passed to its own CsvMapReader. I had to create a CsvMapReader for each and every line rather than pass a reader that reads across the whole file into the CsvMapReader.
jkndrkn
My final solution looks something like this:(let [lines (line-seq amazon-s3-reader)] [(map #(.read (CsvMapReader. (java.io.StringReader. line) CsvPreference/STANDARD_PREFERENCE))) lines])The above code doesn't parse but it outlines the general solution: it returns both parsed CSV data and the raw lines and lets me access both without weird interactions.
jkndrkn
Ugh. Extensive testing of my solution revealed bandwidth consumption 4x higher than I wanted rather than the 2x consumption I was getting using two separate readers. I think the problem with seq'ing a network resources is that each and every place where that seq is consumed causes resources to be pulled down from the network again and again, resulting in the high bandwidth consumption issues I am experiencing.
jkndrkn
A: 

The solution was to use a single BufferedReader for all accesses and then reset()ing it every time it is passed into functionality that needs to read from the beginning.

jkndrkn