iteratee Strange behaviour eneeCheckIfDone

Conrad Parker conrad at metadecks.org
Thu Jul 28 01:40:10 BST 2011


On 28 July 2011 01:21, John Lato <jwlato at gmail.com> wrote:
> Hi Conrad,
>
> Thanks very much for this.  Unfortunately I won't be able to really
> think about it until next week, but a few preliminary thoughts...
>
> Is it possible to implement these changes as part of the 0.8.* line
> (by the PVP), or would a bump to 0.9 be necessary (or desirable)?

If only the suggested four functions and one type alias were added
then I think it would be fine for the 0.8 line as nothing is
removed and no existing behavior is changed.

I did also note that new functions or changed behavior could be
introduced for breakE, mapChunks etc.; this changed behavior
could be delayed for a later release. Perhaps it is worth introducing
a cleaner abstraction for an application to indicate how it
wants to handle errors in a new major version.

> If it is a major version bump, then what else should go in the major
> update?  Here are a few things I've been considering that also require
> a major bump:
>
> 1.  Support for monad-control (mostly implemented already)
> 2.  Stream type tagging (e.g. to indicate that a stream supports
> seeking).  Not implemented, and I'm not sure how I would implement it,
> I need to re-read "data types a la carte"...
> 3.  Change the iteratee implementation; I'm thinking of doing
> something a bit closer to a CPS-transform of Oleg's current
> IterateeM.hs.  Untested, but probably will offer better performance.
> Any iteratees built from "liftI", "icont", or "idone" would need to be
> re-written though; it's a major change.
> 4.  Some relatively minor changes to enumFromCallback and IO-related stuff.
>
> (I'm sure I'm forgetting something)
>
> 1) and 4) can be done relatively easily with low impact.  3) is simple
> from my end, but would be a major breaking change.  I really don't
> know how much effort 2) would be.
>
> Anyway, if it is a major update, I suggest doing the enee* updates
> with 1) and 4), and holding off on 2) and 3) for several months.
> Alternatively I could wait and do the whole batch of changes at once,
> but it would be several months before it's ready.

We'd be happy with the enee* updates in a 0.8 release, as much because
I like to encourage getting upstream acceptance for changes we plan to
use in production code :)

cheers,

Conrad.

>
> Best,
> John
>
> On Wed, Jul 27, 2011 at 9:25 AM, Conrad Parker <conrad at metadecks.org> wrote:
>> Hi John,
>>
>> Michael and I have worked through his proposed eneeCheckIfDone*
>> variants this afternoon, and we would like to propose the following
>> updated functions: eneeCheckIfDonePass, eneeCheckIfDoneIgnore,
>> eneeCheckIfDoneHandle, unfoldConvStreamCheck.
>>
>> A version of unfoldConvStream that can handle seeking should just be a
>> matter of replacing the call to eneeCheckIfDone with
>> eneeCheckIfDonePass or eneeCheckIfDoneHandle. We could also do the
>> same for variants of convStream. However if we go down this path, it
>> seems we would have 4 versions of each of these *convStream functions,
>> as well as the 4 versions of eneeCheckIfDone.
>>
>> An alternative would be to add a parameter to unfoldConvStream of the
>> same type as eneeCheckIfDonePass:
>>
>> unfoldConvStreamCheck ::
>>  (Monad m, Nullable s) =>
>>  (((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>> Iteratee elo m (Iteratee eli m a)) -> Enumeratee elo eli m a)
>>  -> (acc -> Iteratee s m (acc, s'))
>>  -> acc
>>  -> Enumeratee s s' m a
>> unfoldConvStreamCheck checkDone f acc0 = checkDone (check acc0)
>>  where
>>    check acc k = isStreamFinished >>=
>>                    maybe (step acc k) (idone (icont k) . EOF . Just)
>>    step acc k = f acc >>= \(acc', s') ->
>>                    checkDone (check acc') . k . Chunk $ s'
>>
>> such that an enumeratee that passes on seek exceptions can be built
>> with a call to:
>>
>> unfoldConvStreamCheck eneeCheckIfDonePass
>>
>> In order to allow partial applications of eneeCheckIfDoneHandle to be
>> passed to unfoldConvStreamCheck, we must reverse the order of the
>> first two arguments (compared to the earlier proposal):
>>
>> -- | The same as eneeCheckIfDonePass, with one extra argument: a
>> handler which is used
>> -- to process any exceptions in a separate method.
>> eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
>>    ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee elo
>> m (Iteratee eli m a))
>>    -> ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>> Iteratee elo m (Iteratee eli m a))
>>    -> Enumeratee elo eli m a
>> eneeCheckIfDoneHandle h f inner = Iteratee $ \od oc ->
>>    let onDone x s = od (idone x s) (Chunk empty)
>>        onCont k Nothing  = runIter (f k Nothing) od oc
>>        onCont k (Just e) = runIter (h k e)       od oc
>>    in runIter inner onDone onCont
>>
>> Then the remaining variants can be rewritten in terms of this function:
>>
>> eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
>>    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>> Iteratee elo m (Iteratee eli m a))
>>    -> Enumeratee elo eli m a
>> eneeCheckIfDonePass f inner = eneeCheckIfDoneHandle (\k e -> f k (Just e))
>>
>> eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
>>    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>> Iteratee elo m (Iteratee eli m a))
>>    -> Enumeratee elo eli m a
>> eneeCheckIfDoneIgnore f = eneeCheckIfDoneHandle (\k _ -> f k Nothing)
>>
>>
>> These functions allow us to build stream-processing applications with
>> multiple, layered enumeratees, where the controlling application
>> provides an iteratee that may initiate seeking. Seek exceptions are
>> passed from the inner iteratee outwards to fileDriverRandomFd, as you
>> have also suggested.
>>
>> One feature of this approach, making use of the extensible exceptions,
>> is that each layer of the enumeratee stack can provide a custom "seek"
>> exception, eg. using different units. Enumeratees can then convert
>> seek requests from inner units like "time" or "video frames" to outer
>> units such as "file offset", such that dealing with seek tables etc.
>> can be isolated into the enumeratee that deals with generating frames,
>> rather than exposed to the application.
>>
>> Michael also suggests that custom exceptions can be used to initiate
>> other control requests such as cache flushing, and these functions
>> would allow such messages to also be propagated.
>>
>> A remaining issue would be whether it is also useful to provide
>> similar seekable versions of breakE, mapChunks and the various library
>> functions that use mapChunks.
>>
>> cheers,
>>
>> Conrad.
>>
>> On 7 July 2011 18:50, Michael Baikov <manpacket at gmail.com> wrote:
>>> On Thu, Jul 7, 2011 at 6:10 PM, John Lato <jwlato at gmail.com> wrote:
>>>> Hi Michael,
>>>> Thanks for this, more comments inline.
>>>>
>>>> On Thu, Jul 7, 2011 at 3:59 AM, Michael Baikov <manpacket at gmail.com> wrote:
>>>>>
>>>>> First let's import some things, which will be used later
>>>>>
>>>>> > import Data.Iteratee as I
>>>>> > import Data.Iteratee.Char as I
>>>>> > import Data.Iteratee.IO as I
>>>>> > import Control.Monad.IO.Class
>>>>> > import Control.Monad.Trans.Class
>>>>> > import Control.Exception
>>>>> > import Control.Monad (when)
>>>>> > import Data.Char (toUpper)
>>>>>
>>>>>
>>>>> And then let's define some Iteratees
>>>>>
>>>>> This one just dumps all it gets from input
>>>>>
>>>>> > dump = printLinesUnterminated
>>>>>
>>>>> This one performs one seek and then dumps everything else
>>>>>
>>>>> > dumpAndSeek = I.seek 6 >> dump
>>>>>
>>>>> Let's define some Enumeratees
>>>>>
>>>>> This one - using regular mapChunks (and eneeCheckIfDone) (actually we
>>>>> can use streamMap, but mapChunks's  type signature looks better)
>>>>>
>>>>> > upStream :: Enumeratee String String IO a
>>>>> > upStream = mapChunks (map toUpper)
>>>>>
>>>>> This one - with my mapChunks (and modified eneeCheckIfDone)
>>>>>
>>>>> > upStream' :: Enumeratee String String IO a
>>>>> > upStream' = mapChunks' (map toUpper)
>>>>>
>>>>> And it's time to do some test. File "hello.txt" contains message
>>>>> "Hello world!!!!\n\n"
>>>>>
>>>>> > test1 = enumFileRandom 1 "hello.txt" dump
>>>>>
>>>>> As expected: Hello world!!!!
>>>>>
>>>>> > test2 = enumFileRandom 1 "hello.txt" dumpAndSeek
>>>>>
>>>>> world!!!!
>>>>>
>>>>> > test3 = enumFileRandom 1 "hello.txt" (joinI $ upStream dump)
>>>>>
>>>>> HELLO WORLD!!!!
>>>>>
>>>>> > test4 = enumFileRandom 1 "hello.txt" (joinI $ upStream dumpAndSeek)
>>>>>
>>>>> throwErr in eneeCheckIfDone - so it just hangs forever.
>>>>> Unexpected behaviour.
>>>>
>>>> This is indeed a bug.
>>>>
>>>>>
>>>>> > test5 = enumFileRandom 1 "hello.txt" (joinI $ upStream' dumpAndSeek)
>>>>>
>>>>> And with modified version - it works fine.
>>>>> WORLD!!!!
>>>>>
>>>>> > test6 = enumFileRandom 1 "hello.txt" (joinI $ upStream (I.seek 6 >>
>>>>> > stream2list)) >>= run >>= print
>>>>>
>>>>> hangs forever
>>>>
>>>> This looks like the same bug, since 'upStream' is defined in terms of
>>>> 'mapChunks', which in turn is defined with 'eneeCheckIfDone'.
>>>
>>> It is the same bug, I just wanted to show it one more :)
>>> I found mine version of this bug in the middle of huge multi-threaded
>>> haskell program which takes ages to run so i decided to provide you
>>> with a nice and simple version :)
>>>
>>>>
>>>>>
>>>>> > test7 = enumFileRandom 1 "hello.txt" (joinI $ upStream' (I.seek 6 >>
>>>>> > stream2list)) >>= run >>= print
>>>>>
>>>>> "WORLD!!!!\n\n"
>>>>>
>>>>> I don't see why it must behave differently when I am applying a simple
>>>>> transformation to the stream.
>>>>> And if I am misunderstanding something - what is the proper way to
>>>>> dump file contents from 6'th byte
>>>>> to the and while applying map upCase to it. With iteratees.
>>>>
>>>> I would put the seek outside of the enumeratee stream.  Or, since you know
>>>> you're using ASCII characters, use drop instead.
>>>
>>> Sure, that will work, but again - this is very simplified problem. In
>>> the real world you need to use several layers of transformations and
>>> decision to do seek is made in top most iteratee. So we need to be
>>> able to pass exeptions as transparently as possible.
>>>
>>>
>>>>> test8 = enumFileRandom 1 "hello.txt" (I.seek 6 >> joinI (upStream
>>>>> stream2list)) >>= run >>= print
>>>>
>>>>>
>>>>> And my modified implementation - it uses
>>>>> eneeCheckIfDonePass (icont . step)
>>>>> instead of
>>>>> eneeCheckIfDone (liftI . go)
>>>>>
>>>>> > mapChunks' :: (Monad m, NullPoint s) => (s -> s') -> Enumeratee s s' m a
>>>>> > mapChunks' f = eneeCheckIfDonePass (icont . step)
>>>>> >     where
>>>>> >         step k (Chunk xs)     = eneeCheckIfDonePass (icont . step) . k .
>>>>> > Chunk $ f xs
>>>>> >         step k str@(EOF mErr) = idone (k $ EOF mErr) str
>>>>>
>>>>>
>>>>> eneeCheckIfDonePass - does not tries to handle any exceptions, just
>>>>> passes them to
>>>>> the parent Enumeratee/Enumerator
>>>>>
>>>>>
>>>>> > eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>> > elo m (Iteratee eli m a))
>>>>> >     -> Enumeratee elo eli m a
>>>>> > eneeCheckIfDonePass f inner = Iteratee $ \od oc ->
>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>> >         onCont k e = runIter (f k e) od oc
>>>>> >     in runIter inner onDone onCont
>>>>>
>>>>> eneeCheckIfDoneHandle - Has a separate handler for exception, so user
>>>>> can decide if
>>>>> he wants to handle the exception or pass it to the partent.
>>>>>
>>>>> > eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>> > elo m (Iteratee eli m a))
>>>>> >     -> ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee
>>>>> > elo m (Iteratee eli m a))
>>>>> >     -> Enumeratee elo eli m a
>>>>> > eneeCheckIfDoneHandle f h inner = Iteratee $ \od oc ->
>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>> >         onCont k Nothing = runIter (f k Nothing) od oc
>>>>> >         onCont k (Just e) = runIter (h k e)      od oc
>>>>> >     in runIter inner onDone onCont
>>>>>
>>>>> eneeCheckIfDoneIgnore - Ignores all exceptions
>>>>>
>>>>> > eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>> > elo m (Iteratee eli m a))
>>>>> >     -> Enumeratee elo eli m a
>>>>> > eneeCheckIfDoneIgnore f inner = Iteratee $ \od oc ->
>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>> >         onCont k _ = runIter (f k Nothing) od oc
>>>>> >     in runIter inner onDone onCont
>>>>
>>>> I need to spend a little more time reviewing these, but they all seem like
>>>> useful alternatives.  Sometimes it makes sense for the stream transformer
>>>> (enumeratee) to handle an exception, sometimes not.  In particular, seeking
>>>> would have to be passed up to the handle enumerator.
>>>> Unfortunately it's not quite that simple in all cases.   If there isn't a
>>>> 1-1 correspondence between elements of the inner stream and the outer
>>>> stream, how should seek behave?  Should it attempt to seek in the inner
>>>> stream (which may not be possible), or just pass it up and assume you know
>>>> what you're doing?  The second is much easier to implement, but I think the
>>>> former would be more useful.
>>>> Thoughts?
>>>
>>> Sure. You just need to create several different seeks and probably
>>> rename I.seek to something more specific, like fileHandleSeek or
>>> something like that. So if you want to go to specific point in file
>>> (and you know that your enumeratee's chain can handle such seek - you
>>> sends an exception named (FileSeek Offset). If you want to go to
>>> specific time frame - you just fire up another exception - (TimeSeek
>>> TimeOffset) and handle it in the appropriate place. You can use drop
>>> for stream of chars if you are not doing any transformations, but if
>>> each chunk takes 1 second to process and you need to drop 1 million of
>>> them...
>>>
>>> So just create several types of exceptions, place handlers in
>>> reasonable places, handle those exceptions that you can and pass then
>>> further if you can't and create several different seek functions.
>>>
>>>> John
>>>
>>> _______________________________________________
>>> Iteratee mailing list
>>> Iteratee at projects.haskell.org
>>> http://projects.haskell.org/cgi-bin/mailman/listinfo/iteratee
>>>
>>
>
>



More information about the Iteratee mailing list