I see two functions mergeio and nmergeio in Data.Concurrent, but I can't find any examples of how they work.
Has anyone worked with these before? My hope is that I can use these to get a function like "parMapM".
I see two functions mergeio and nmergeio in Data.Concurrent, but I can't find any examples of how they work.
Has anyone worked with these before? My hope is that I can use these to get a function like "parMapM".
import Control.Concurrent (mergeIO, nmergeIO)
main = do
xs <- mergeIO (map (*2) [1..10])
(map (+3) [100..110])
print xs
xs <- nmergeIO [ map (\x->x*x) [1..10]
, map (\x->x+x) [1..10]
]
print $ maximum xs
Output:
[2,4,103,6,104,8,105,10,106,12,107,14,108,16,109,18,110,20,111,112,113] 100
The internal order may differ depending on how quickly each thread pitches back results.
Writing parMapM
is a little tricky, but the result is nice:
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Word
import System.IO
import qualified Data.ByteString as BS
main :: IO ()
main = do
xs <- parMapM (reverse . show) $ replicate 4 (readFromNet 5)
print xs
We'll read from /dev/urandom
as a standin:
readFromNet :: Int -> IO [Word8]
readFromNet n = do
h <- openFile "/dev/urandom" ReadMode
let go :: Int -> IO [Word8]
go 0 = return []
go remaining = do s <- BS.head <$> BS.hGet h 1
ss <- go (remaining-1)
return (s:ss)
go n
Finally the gory bits:
parMapM :: (a -> b) -> [IO [a]] -> IO [b]
parMapM f as = do
kids <- newMVar []
answers <- atomically $ newTVar []
forM_ as $ \ a ->
do mvar <- newEmptyMVar
curkids <- takeMVar kids
putMVar kids (mvar:curkids)
let ax = do xs <- a
atomically $ do sofar <- readTVar answers
writeTVar answers (sofar ++ xs)
forkIO (ax `finally` putMVar mvar ())
waitForChildren kids
atomically $ map f <$> readTVar answers
where
waitForChildren kids = do ks <- takeMVar kids
case ks of
[] -> return ()
m:ms -> do
putMVar kids ms
takeMVar m
waitForChildren kids
It works by having the children write their answers to a TVar
while the main thread waits for the children to signal their completion.
Unfortunately, the results are "chunky" because readFromNet
is unaware of communication issues, so we get all values from a given thread at once. If you don't mind making them get their hands dirty, you could do so as in the following:
main :: IO ()
main = do
let threads = 3
nbytes = 10
total = nbytes * threads
byte <- newEmptyMVar
let thr = forkIO $ readFromNetwork nbytes byte
go 0 = return []
go n = do b <- takeMVar byte
bs <- go (n-1)
return (b:bs)
sequence_ $ replicate threads thr
values <- map (reverse . show) <$> go total
print values
Then the worker looks like
readFromNetwork :: Int -> MVar Word8 -> IO ()
readFromNetwork n var = do
-- or something...
h <- openFile "/dev/urandom" ReadMode
let go 0 = return ()
go remaining = do s <- BS.hGet h 1
putMVar var (BS.head s)
go (remaining-1)
go n