I'm a Haskell beginner and thought this would be good exercise. I have an assignment where I need to read file in a thread A, handle the file lines in threads B_i, and then output the results in thread C.
I have implemented this far already, but one of the requirements is that we cannot trust that the entire file fits into memory. I was hoping that lazy IO and garbage collector would do this for me, but alas the memory usage keeps rising and rising.
The reader thread (A) reads the file with readFile
which is then zipped
with line numbers and wrapped in Just. These zipped lines are then written
to Control.Concurrent.Chan
. Each consumer thread B has its own channel.
Each consumer reads their own channel when it has data and if the regex matches, it's outputted to their own respective output channel wrapped within Maybe (made of lists).
The printer checks the output channel of each of the B threads. If none of the results (line) is Nothing, the line is printed. Since at this point there should be no reference to the older lines, I thought that the garbage collector would be able to release these lines, but alas I seem to be in the wrong here.
The .lhs file is in here: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs
So the question is, how do I limit the memory usage, or allow the garbage collector to remove the lines.
Snippets as per requested. Hopefully indenting isn't too badly destroyed :)
data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a
producer :: [Input] -> FilePath -> State ()
producer c p = do
liftIO $ Main.log "Starting producer"
d <- asks done
f <- liftIO $ readFile p
mapM_ (\l -> mapM_
(liftIO . flip writeChan l) c)
$ zip [1..] $ map Just $ lines f
liftIO $ modifyMVar_ d (return . not)
printer :: State ()
printer = do
liftIO $ Main.log "Starting printer"
c <- (fmap (map (snd . snd) . M.elems)
(asks consumers >>= liftIO . readMVar))
uniq' c
where head' :: Output -> IO Line
head' ch = fmap head (readMVar ch)
tail' = mapM_ (liftIO . flip modifyMVar_
(return . tail))
cont ch = tail' ch >> uniq' ch
printMsg ch = readMVar (head ch) >>=
liftIO . putStrLn . fromJust . snd . head
cempty :: [Output] -> IO Bool
cempty ch = fmap (any id)
(mapM (fmap ((==) 0 . length) . readMVar ) ch)
{- Return false unless none are Nothing -}
uniq :: [Output] -> IO Bool
uniq ch = fmap (any id . map (isNothing . snd))
(mapM (liftIO . head') ch)
uniq' :: [Output] -> State ()
uniq' ch = do
d <- consumersDone
e <- liftIO $ cempty ch
if not e
then do
u <- liftIO $ uniq ch
if u then cont ch else do
liftIO $ printMsg ch
cont ch
else unless d $ uniq' ch
Edit: Added snippets