views:

42

answers:

1

I see two functions mergeio and nmergeio in Data.Concurrent, but I can't find any examples of how they work.

Has anyone worked with these before? My hope is that I can use these to get a function like "parMapM".

+2  A: 
import Control.Concurrent (mergeIO, nmergeIO)

main = do
  xs <- mergeIO (map (*2) [1..10])
                (map (+3) [100..110])
  print xs

  xs <- nmergeIO [ map (\x->x*x) [1..10]
                 , map (\x->x+x) [1..10]
                 ]
  print $ maximum xs

Output:

[2,4,103,6,104,8,105,10,106,12,107,14,108,16,109,18,110,20,111,112,113]
100

The internal order may differ depending on how quickly each thread pitches back results.

Writing parMapM is a little tricky, but the result is nice:

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Word
import System.IO

import qualified Data.ByteString as BS

main :: IO ()
main = do
  xs <- parMapM (reverse . show) $ replicate 4 (readFromNet 5)
  print xs

We'll read from /dev/urandom as a standin:

readFromNet :: Int -> IO [Word8]
readFromNet n = do
  h <- openFile "/dev/urandom" ReadMode 
  let go :: Int -> IO [Word8]
      go 0 = return []
      go remaining = do s <- BS.head <$> BS.hGet h 1
                        ss <- go (remaining-1)
                        return (s:ss)
  go n

Finally the gory bits:

parMapM :: (a -> b) -> [IO [a]] -> IO [b]
parMapM f as = do
  kids <- newMVar []
  answers <- atomically $ newTVar []
  forM_ as $ \ a ->
    do mvar <- newEmptyMVar
       curkids <- takeMVar kids
       putMVar kids (mvar:curkids)
       let ax = do xs <- a
                   atomically $ do sofar <- readTVar answers
                                   writeTVar answers (sofar ++ xs)
       forkIO (ax `finally` putMVar mvar ())
  waitForChildren kids
  atomically $ map f <$> readTVar answers
  where
    waitForChildren kids = do ks <- takeMVar kids
                              case ks of
                                [] -> return ()
                                m:ms -> do
                                  putMVar kids ms
                                  takeMVar m
                                  waitForChildren kids

It works by having the children write their answers to a TVar while the main thread waits for the children to signal their completion.

Unfortunately, the results are "chunky" because readFromNet is unaware of communication issues, so we get all values from a given thread at once. If you don't mind making them get their hands dirty, you could do so as in the following:

main :: IO ()
main = do
  let threads = 3
      nbytes  = 10
      total   = nbytes * threads
  byte <- newEmptyMVar
  let thr = forkIO $ readFromNetwork nbytes byte
      go 0 = return []
      go n = do b <- takeMVar byte
                bs <- go (n-1)
                return (b:bs)
  sequence_ $ replicate threads thr
  values <- map (reverse . show) <$> go total
  print values

Then the worker looks like

readFromNetwork :: Int -> MVar Word8 -> IO ()
readFromNetwork n var = do
  -- or something...
  h <- openFile "/dev/urandom" ReadMode 
  let go 0 = return ()
      go remaining = do s <- BS.hGet h 1
                        putMVar var (BS.head s)
                        go (remaining-1)
  go n
Greg Bacon
Thanks! So I guess it doesn't work with IO actions, e.g. mergeIO (map getFromNetwork [...])
Bill
@Bill Both mergeIO and nmergeIO take pure lists — [a] rather than IO [a] — so trying to use an action such as getFromNetwork won't typecheck.
Greg Bacon