iteratee Strange behaviour eneeCheckIfDone

Conrad Parker conrad at metadecks.org
Wed Jul 27 09:25:33 BST 2011


Hi John,

Michael and I have worked through his proposed eneeCheckIfDone*
variants this afternoon, and we would like to propose the following
updated functions: eneeCheckIfDonePass, eneeCheckIfDoneIgnore,
eneeCheckIfDoneHandle, unfoldConvStreamCheck.

A version of unfoldConvStream that can handle seeking should just be a
matter of replacing the call to eneeCheckIfDone with
eneeCheckIfDonePass or eneeCheckIfDoneHandle. We could also do the
same for variants of convStream. However if we go down this path, it
seems we would have 4 versions of each of these *convStream functions,
as well as the 4 versions of eneeCheckIfDone.

An alternative would be to add a parameter to unfoldConvStream of the
same type as eneeCheckIfDonePass:

unfoldConvStreamCheck ::
 (Monad m, Nullable s) =>
  (((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
Iteratee elo m (Iteratee eli m a)) -> Enumeratee elo eli m a)
  -> (acc -> Iteratee s m (acc, s'))
  -> acc
  -> Enumeratee s s' m a
unfoldConvStreamCheck checkDone f acc0 = checkDone (check acc0)
  where
    check acc k = isStreamFinished >>=
                    maybe (step acc k) (idone (icont k) . EOF . Just)
    step acc k = f acc >>= \(acc', s') ->
                    checkDone (check acc') . k . Chunk $ s'

such that an enumeratee that passes on seek exceptions can be built
with a call to:

unfoldConvStreamCheck eneeCheckIfDonePass

In order to allow partial applications of eneeCheckIfDoneHandle to be
passed to unfoldConvStreamCheck, we must reverse the order of the
first two arguments (compared to the earlier proposal):

-- | The same as eneeCheckIfDonePass, with one extra argument: a
handler which is used
-- to process any exceptions in a separate method.
eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
    ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee elo
m (Iteratee eli m a))
    -> ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
Iteratee elo m (Iteratee eli m a))
    -> Enumeratee elo eli m a
eneeCheckIfDoneHandle h f inner = Iteratee $ \od oc ->
    let onDone x s = od (idone x s) (Chunk empty)
        onCont k Nothing  = runIter (f k Nothing) od oc
        onCont k (Just e) = runIter (h k e)       od oc
    in runIter inner onDone onCont

Then the remaining variants can be rewritten in terms of this function:

eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
Iteratee elo m (Iteratee eli m a))
    -> Enumeratee elo eli m a
eneeCheckIfDonePass f inner = eneeCheckIfDoneHandle (\k e -> f k (Just e))

eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
Iteratee elo m (Iteratee eli m a))
    -> Enumeratee elo eli m a
eneeCheckIfDoneIgnore f = eneeCheckIfDoneHandle (\k _ -> f k Nothing)


These functions allow us to build stream-processing applications with
multiple, layered enumeratees, where the controlling application
provides an iteratee that may initiate seeking. Seek exceptions are
passed from the inner iteratee outwards to fileDriverRandomFd, as you
have also suggested.

One feature of this approach, making use of the extensible exceptions,
is that each layer of the enumeratee stack can provide a custom "seek"
exception, eg. using different units. Enumeratees can then convert
seek requests from inner units like "time" or "video frames" to outer
units such as "file offset", such that dealing with seek tables etc.
can be isolated into the enumeratee that deals with generating frames,
rather than exposed to the application.

Michael also suggests that custom exceptions can be used to initiate
other control requests such as cache flushing, and these functions
would allow such messages to also be propagated.

A remaining issue would be whether it is also useful to provide
similar seekable versions of breakE, mapChunks and the various library
functions that use mapChunks.

cheers,

Conrad.

On 7 July 2011 18:50, Michael Baikov <manpacket at gmail.com> wrote:
> On Thu, Jul 7, 2011 at 6:10 PM, John Lato <jwlato at gmail.com> wrote:
>> Hi Michael,
>> Thanks for this, more comments inline.
>>
>> On Thu, Jul 7, 2011 at 3:59 AM, Michael Baikov <manpacket at gmail.com> wrote:
>>>
>>> First let's import some things, which will be used later
>>>
>>> > import Data.Iteratee as I
>>> > import Data.Iteratee.Char as I
>>> > import Data.Iteratee.IO as I
>>> > import Control.Monad.IO.Class
>>> > import Control.Monad.Trans.Class
>>> > import Control.Exception
>>> > import Control.Monad (when)
>>> > import Data.Char (toUpper)
>>>
>>>
>>> And then let's define some Iteratees
>>>
>>> This one just dumps all it gets from input
>>>
>>> > dump = printLinesUnterminated
>>>
>>> This one performs one seek and then dumps everything else
>>>
>>> > dumpAndSeek = I.seek 6 >> dump
>>>
>>> Let's define some Enumeratees
>>>
>>> This one - using regular mapChunks (and eneeCheckIfDone) (actually we
>>> can use streamMap, but mapChunks's  type signature looks better)
>>>
>>> > upStream :: Enumeratee String String IO a
>>> > upStream = mapChunks (map toUpper)
>>>
>>> This one - with my mapChunks (and modified eneeCheckIfDone)
>>>
>>> > upStream' :: Enumeratee String String IO a
>>> > upStream' = mapChunks' (map toUpper)
>>>
>>> And it's time to do some test. File "hello.txt" contains message
>>> "Hello world!!!!\n\n"
>>>
>>> > test1 = enumFileRandom 1 "hello.txt" dump
>>>
>>> As expected: Hello world!!!!
>>>
>>> > test2 = enumFileRandom 1 "hello.txt" dumpAndSeek
>>>
>>> world!!!!
>>>
>>> > test3 = enumFileRandom 1 "hello.txt" (joinI $ upStream dump)
>>>
>>> HELLO WORLD!!!!
>>>
>>> > test4 = enumFileRandom 1 "hello.txt" (joinI $ upStream dumpAndSeek)
>>>
>>> throwErr in eneeCheckIfDone - so it just hangs forever.
>>> Unexpected behaviour.
>>
>> This is indeed a bug.
>>
>>>
>>> > test5 = enumFileRandom 1 "hello.txt" (joinI $ upStream' dumpAndSeek)
>>>
>>> And with modified version - it works fine.
>>> WORLD!!!!
>>>
>>> > test6 = enumFileRandom 1 "hello.txt" (joinI $ upStream (I.seek 6 >>
>>> > stream2list)) >>= run >>= print
>>>
>>> hangs forever
>>
>> This looks like the same bug, since 'upStream' is defined in terms of
>> 'mapChunks', which in turn is defined with 'eneeCheckIfDone'.
>
> It is the same bug, I just wanted to show it one more :)
> I found mine version of this bug in the middle of huge multi-threaded
> haskell program which takes ages to run so i decided to provide you
> with a nice and simple version :)
>
>>
>>>
>>> > test7 = enumFileRandom 1 "hello.txt" (joinI $ upStream' (I.seek 6 >>
>>> > stream2list)) >>= run >>= print
>>>
>>> "WORLD!!!!\n\n"
>>>
>>> I don't see why it must behave differently when I am applying a simple
>>> transformation to the stream.
>>> And if I am misunderstanding something - what is the proper way to
>>> dump file contents from 6'th byte
>>> to the and while applying map upCase to it. With iteratees.
>>
>> I would put the seek outside of the enumeratee stream.  Or, since you know
>> you're using ASCII characters, use drop instead.
>
> Sure, that will work, but again - this is very simplified problem. In
> the real world you need to use several layers of transformations and
> decision to do seek is made in top most iteratee. So we need to be
> able to pass exeptions as transparently as possible.
>
>
>>> test8 = enumFileRandom 1 "hello.txt" (I.seek 6 >> joinI (upStream
>>> stream2list)) >>= run >>= print
>>
>>>
>>> And my modified implementation - it uses
>>> eneeCheckIfDonePass (icont . step)
>>> instead of
>>> eneeCheckIfDone (liftI . go)
>>>
>>> > mapChunks' :: (Monad m, NullPoint s) => (s -> s') -> Enumeratee s s' m a
>>> > mapChunks' f = eneeCheckIfDonePass (icont . step)
>>> >     where
>>> >         step k (Chunk xs)     = eneeCheckIfDonePass (icont . step) . k .
>>> > Chunk $ f xs
>>> >         step k str@(EOF mErr) = idone (k $ EOF mErr) str
>>>
>>>
>>> eneeCheckIfDonePass - does not tries to handle any exceptions, just
>>> passes them to
>>> the parent Enumeratee/Enumerator
>>>
>>>
>>> > eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>> > elo m (Iteratee eli m a))
>>> >     -> Enumeratee elo eli m a
>>> > eneeCheckIfDonePass f inner = Iteratee $ \od oc ->
>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>> >         onCont k e = runIter (f k e) od oc
>>> >     in runIter inner onDone onCont
>>>
>>> eneeCheckIfDoneHandle - Has a separate handler for exception, so user
>>> can decide if
>>> he wants to handle the exception or pass it to the partent.
>>>
>>> > eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>> > elo m (Iteratee eli m a))
>>> >     -> ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee
>>> > elo m (Iteratee eli m a))
>>> >     -> Enumeratee elo eli m a
>>> > eneeCheckIfDoneHandle f h inner = Iteratee $ \od oc ->
>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>> >         onCont k Nothing = runIter (f k Nothing) od oc
>>> >         onCont k (Just e) = runIter (h k e)      od oc
>>> >     in runIter inner onDone onCont
>>>
>>> eneeCheckIfDoneIgnore - Ignores all exceptions
>>>
>>> > eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>> > elo m (Iteratee eli m a))
>>> >     -> Enumeratee elo eli m a
>>> > eneeCheckIfDoneIgnore f inner = Iteratee $ \od oc ->
>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>> >         onCont k _ = runIter (f k Nothing) od oc
>>> >     in runIter inner onDone onCont
>>
>> I need to spend a little more time reviewing these, but they all seem like
>> useful alternatives.  Sometimes it makes sense for the stream transformer
>> (enumeratee) to handle an exception, sometimes not.  In particular, seeking
>> would have to be passed up to the handle enumerator.
>> Unfortunately it's not quite that simple in all cases.   If there isn't a
>> 1-1 correspondence between elements of the inner stream and the outer
>> stream, how should seek behave?  Should it attempt to seek in the inner
>> stream (which may not be possible), or just pass it up and assume you know
>> what you're doing?  The second is much easier to implement, but I think the
>> former would be more useful.
>> Thoughts?
>
> Sure. You just need to create several different seeks and probably
> rename I.seek to something more specific, like fileHandleSeek or
> something like that. So if you want to go to specific point in file
> (and you know that your enumeratee's chain can handle such seek - you
> sends an exception named (FileSeek Offset). If you want to go to
> specific time frame - you just fire up another exception - (TimeSeek
> TimeOffset) and handle it in the appropriate place. You can use drop
> for stream of chars if you are not doing any transformations, but if
> each chunk takes 1 second to process and you need to drop 1 million of
> them...
>
> So just create several types of exceptions, place handlers in
> reasonable places, handle those exceptions that you can and pass then
> further if you can't and create several different seek functions.
>
>> John
>
> _______________________________________________
> Iteratee mailing list
> Iteratee at projects.haskell.org
> http://projects.haskell.org/cgi-bin/mailman/listinfo/iteratee
>



More information about the Iteratee mailing list