iteratee Concurrency with for free

Michael Baikov manpacket at gmail.com
Thu Jul 7 04:35:57 BST 2011


If you have long Iteratee chain and each link in that chain  takes
approximately the same (reasonably large) time to process incoming
chunk before feeding it further and those links do not share any
complex state  between them - you can gain some concurrency for free
just by applying several transformers.

Just by applying one parIi was able to get ~2x performance boost in
threaded mode in a program which first
does some complicated parsing and then does some complicating dumping
of the results.

The second program I tested this on gained ~40% performance, but
that's because it was broken into uneven links.




First let's import some stuff

> module Data.Iteratee.Parallel ( psequence_ , psequence , parE , parI ) where

> import Control.Monad.IO.Class
> import Control.Monad.Trans.Class
> import Data.Iteratee as I hiding (mapM_, zip, filter)
> import qualified Data.ListLike as LL

> import Control.Concurrent
> import Control.Exception
> import Control.Monad (join, zipWithM)
> import Data.Maybe (catMaybes)
> import Data.Time.Clock (getCurrentTime, diffUTCTime)

> -- | Transform usual Iteratee into parallel composable one, introducing one step extra delay
> -- EEEEx - time spent in Enumerator working on x'th packet
> -- IIIIx - time spent in Iteratee working on x'th packet
> -- z - last packet, y = (z-1)'th packet

this diagram  looks awful in variable width font :)

> -- regulular Iteratee: EEEE0 - IIII0,  EEEE1 - IIII1,  EEEE2 - IIII2        .. EEEEz -> IIIIz
> -- parallel  Iteratee: EEEE0,  EEEE1,  EEEE2,       .. EEEEz
> --                          \_ IIII0\_ IIII1\_      .. IIIIy\__ IIIIz
> parI :: (Nullable s) => Iteratee s IO a -> Iteratee s IO a -- {{{
> parI = liftI . firstStep
>     where
>
>         -- first step, here we fork separete thread for the next chain and at the
>         -- same time ask for more date from the previous chain
>         firstStep iter chunk = do
>             var <- liftIO newEmptyMVar
>             _   <- sideStep var chunk iter
>             liftI $ go var
>
>         -- somewhere in the middle, we are getting iteratee from previous step,
>         -- feeding it with some new data, asking for more data and starting
>         -- more processing in separete thread
>         go var chunk@(Chunk _) = do
>             iter <- liftIO $ takeMVar var
>             _    <- sideStep var chunk iter
>             liftI $ go var
>
>         -- final step - no more data, so  we need to inform our consumer about it
>         go var e = do
>             iter <- liftIO $ takeMVar var
>             join . lift $ enumChunk e iter
>
>         -- forks away from the main computation, return results via MVar
>         sideStep var chunk iter = liftIO . forkIO $ runIter iter onDone onCont
>             where
>                 onDone a s = putMVar var $ idone a s
>                 onCont k _ = runIter (k chunk) onDone onFina
>                 onFina k e = putMVar var $ icont k e
> -- }}}


> -- | Transform usual Iteratee into parallel composable one, introducing one step extra delay, see 'parI'
> parE :: (Nullable s1, Nullable s2) => Enumeratee s1 s2 IO r -> Enumeratee s1 s2 IO r -- {{{
> parE outer inner = parI (outer inner)
> -- }}}


> -- | Enumerate a list of iteratees over a single stream simultaneously
> -- and discard the results. Each iteratee runs in a separete forkIO thread, passes all
> -- errors from iteratees up.
> psequence_ = I.sequence_ . map parI


> -- | Enumerate a list of iteratees over a single stream simultaneously
> -- and keeps the results. Each iteratee runs in a separete forkIO thread, passes all
> -- errors from iteratees up.
> psequence = I.sequence . map parI



> -- some tests -- {{{


tests looks ugly, but that's because i modified them a bit from the
original code i wrote.

> data FeedPacket = FeedPacket Int deriving (Show)
>
> _unusedOK :: [IO ()]
> _unusedOK = [testSeq, testParE, testParI]
>
> testParE :: IO ()
> testParE = bm $ mkTestPackets >>= processList (slowChain . slowChain $ dumpAny "")
>
> testParI :: IO ()
> testParI = bm $ mkTestPackets >>= processList (slowChain . slowChain $ parI $ slowDumpAny "")
>
> testSeq :: IO ()
> testSeq = bm $ mkTestPackets >>= processList ( is)
>     where
>         is = psequence_ [i1, i2]
>         i1 = joinI . slowEnum $ dumpAny "first "
>         i2 = joinI . slowEnum $ dumpAny "last  "
>
> bm :: IO a -> IO ()
> bm action = do
>     before <- getCurrentTime
>     _      <- action
>     after  <- getCurrentTime
>     print $ after `diffUTCTime` before
>
> --  some test helpers {{{
>
> slowEnum :: Enumeratee [FeedPacket] [FeedPacket] IO a
> slowEnum iter = liftI $ go iter
>     where
>         go k c@(Chunk _) = do
>             liftIO $ threadDelay (1000000 :: Int)
>             k' <- liftIO $ enumChunk c k
>             liftI $ go k'
>         go k e = idone k e
>
> mkTestPackets :: IO ([FeedPacket])
> mkTestPackets =  return $ map FeedPacket [1..10]
>
>
> processList :: Iteratee [FeedPacket] IO () -> [FeedPacket] -> IO ()
> processList iter [] = enumEof iter >> return ()
> processList iter (p:ps) = do
>     putStrLn $ "<<< " ++ show p
>     enumChunk (Chunk [p]) iter >>= flip processList ps
>
> dumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
> dumpAny str = liftI go
>     where
>         go c@(Chunk _) = liftIO (putStrLn $ str ++ ">>> " ++ show c) >> liftI go
>         go e = idone () e
>
>
> slowDumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
> slowDumpAny str = liftI go
>     where
>         go c@(Chunk _) = do
>             liftIO $ threadDelay (1000000 :: Int)
>             liftIO (putStrLn $ str ++ ">>> " ++ show c)
>             liftI go
>         go e = idone () e
>
> slowChain :: Iteratee [FeedPacket] IO a -> Iteratee [FeedPacket] IO a
> slowChain = joinI . parE slowEnum



> -- }}}
> -- }}}



More information about the Iteratee mailing list