iteratee Strange behaviour eneeCheckIfDone

Conrad Parker conrad at metadecks.org
Wed Aug 17 08:54:50 BST 2011


John,

what are your current thoughts about these enee* updates?

Conrad.

On 28 July 2011 08:40, Conrad Parker <conrad at metadecks.org> wrote:
> On 28 July 2011 01:21, John Lato <jwlato at gmail.com> wrote:
>> Hi Conrad,
>>
>> Thanks very much for this.  Unfortunately I won't be able to really
>> think about it until next week, but a few preliminary thoughts...
>>
>> Is it possible to implement these changes as part of the 0.8.* line
>> (by the PVP), or would a bump to 0.9 be necessary (or desirable)?
>
> If only the suggested four functions and one type alias were added
> then I think it would be fine for the 0.8 line as nothing is
> removed and no existing behavior is changed.
>
> I did also note that new functions or changed behavior could be
> introduced for breakE, mapChunks etc.; this changed behavior
> could be delayed for a later release. Perhaps it is worth introducing
> a cleaner abstraction for an application to indicate how it
> wants to handle errors in a new major version.
>
>> If it is a major version bump, then what else should go in the major
>> update?  Here are a few things I've been considering that also require
>> a major bump:
>>
>> 1.  Support for monad-control (mostly implemented already)
>> 2.  Stream type tagging (e.g. to indicate that a stream supports
>> seeking).  Not implemented, and I'm not sure how I would implement it,
>> I need to re-read "data types a la carte"...
>> 3.  Change the iteratee implementation; I'm thinking of doing
>> something a bit closer to a CPS-transform of Oleg's current
>> IterateeM.hs.  Untested, but probably will offer better performance.
>> Any iteratees built from "liftI", "icont", or "idone" would need to be
>> re-written though; it's a major change.
>> 4.  Some relatively minor changes to enumFromCallback and IO-related stuff.
>>
>> (I'm sure I'm forgetting something)
>>
>> 1) and 4) can be done relatively easily with low impact.  3) is simple
>> from my end, but would be a major breaking change.  I really don't
>> know how much effort 2) would be.
>>
>> Anyway, if it is a major update, I suggest doing the enee* updates
>> with 1) and 4), and holding off on 2) and 3) for several months.
>> Alternatively I could wait and do the whole batch of changes at once,
>> but it would be several months before it's ready.
>
> We'd be happy with the enee* updates in a 0.8 release, as much because
> I like to encourage getting upstream acceptance for changes we plan to
> use in production code :)
>
> cheers,
>
> Conrad.
>
>>
>> Best,
>> John
>>
>> On Wed, Jul 27, 2011 at 9:25 AM, Conrad Parker <conrad at metadecks.org> wrote:
>>> Hi John,
>>>
>>> Michael and I have worked through his proposed eneeCheckIfDone*
>>> variants this afternoon, and we would like to propose the following
>>> updated functions: eneeCheckIfDonePass, eneeCheckIfDoneIgnore,
>>> eneeCheckIfDoneHandle, unfoldConvStreamCheck.
>>>
>>> A version of unfoldConvStream that can handle seeking should just be a
>>> matter of replacing the call to eneeCheckIfDone with
>>> eneeCheckIfDonePass or eneeCheckIfDoneHandle. We could also do the
>>> same for variants of convStream. However if we go down this path, it
>>> seems we would have 4 versions of each of these *convStream functions,
>>> as well as the 4 versions of eneeCheckIfDone.
>>>
>>> An alternative would be to add a parameter to unfoldConvStream of the
>>> same type as eneeCheckIfDonePass:
>>>
>>> unfoldConvStreamCheck ::
>>>  (Monad m, Nullable s) =>
>>>  (((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>> Iteratee elo m (Iteratee eli m a)) -> Enumeratee elo eli m a)
>>>  -> (acc -> Iteratee s m (acc, s'))
>>>  -> acc
>>>  -> Enumeratee s s' m a
>>> unfoldConvStreamCheck checkDone f acc0 = checkDone (check acc0)
>>>  where
>>>    check acc k = isStreamFinished >>=
>>>                    maybe (step acc k) (idone (icont k) . EOF . Just)
>>>    step acc k = f acc >>= \(acc', s') ->
>>>                    checkDone (check acc') . k . Chunk $ s'
>>>
>>> such that an enumeratee that passes on seek exceptions can be built
>>> with a call to:
>>>
>>> unfoldConvStreamCheck eneeCheckIfDonePass
>>>
>>> In order to allow partial applications of eneeCheckIfDoneHandle to be
>>> passed to unfoldConvStreamCheck, we must reverse the order of the
>>> first two arguments (compared to the earlier proposal):
>>>
>>> -- | The same as eneeCheckIfDonePass, with one extra argument: a
>>> handler which is used
>>> -- to process any exceptions in a separate method.
>>> eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
>>>    ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee elo
>>> m (Iteratee eli m a))
>>>    -> ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>> Iteratee elo m (Iteratee eli m a))
>>>    -> Enumeratee elo eli m a
>>> eneeCheckIfDoneHandle h f inner = Iteratee $ \od oc ->
>>>    let onDone x s = od (idone x s) (Chunk empty)
>>>        onCont k Nothing  = runIter (f k Nothing) od oc
>>>        onCont k (Just e) = runIter (h k e)       od oc
>>>    in runIter inner onDone onCont
>>>
>>> Then the remaining variants can be rewritten in terms of this function:
>>>
>>> eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
>>>    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>> Iteratee elo m (Iteratee eli m a))
>>>    -> Enumeratee elo eli m a
>>> eneeCheckIfDonePass f inner = eneeCheckIfDoneHandle (\k e -> f k (Just e))
>>>
>>> eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
>>>    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>> Iteratee elo m (Iteratee eli m a))
>>>    -> Enumeratee elo eli m a
>>> eneeCheckIfDoneIgnore f = eneeCheckIfDoneHandle (\k _ -> f k Nothing)
>>>
>>>
>>> These functions allow us to build stream-processing applications with
>>> multiple, layered enumeratees, where the controlling application
>>> provides an iteratee that may initiate seeking. Seek exceptions are
>>> passed from the inner iteratee outwards to fileDriverRandomFd, as you
>>> have also suggested.
>>>
>>> One feature of this approach, making use of the extensible exceptions,
>>> is that each layer of the enumeratee stack can provide a custom "seek"
>>> exception, eg. using different units. Enumeratees can then convert
>>> seek requests from inner units like "time" or "video frames" to outer
>>> units such as "file offset", such that dealing with seek tables etc.
>>> can be isolated into the enumeratee that deals with generating frames,
>>> rather than exposed to the application.
>>>
>>> Michael also suggests that custom exceptions can be used to initiate
>>> other control requests such as cache flushing, and these functions
>>> would allow such messages to also be propagated.
>>>
>>> A remaining issue would be whether it is also useful to provide
>>> similar seekable versions of breakE, mapChunks and the various library
>>> functions that use mapChunks.
>>>
>>> cheers,
>>>
>>> Conrad.
>>>
>>> On 7 July 2011 18:50, Michael Baikov <manpacket at gmail.com> wrote:
>>>> On Thu, Jul 7, 2011 at 6:10 PM, John Lato <jwlato at gmail.com> wrote:
>>>>> Hi Michael,
>>>>> Thanks for this, more comments inline.
>>>>>
>>>>> On Thu, Jul 7, 2011 at 3:59 AM, Michael Baikov <manpacket at gmail.com> wrote:
>>>>>>
>>>>>> First let's import some things, which will be used later
>>>>>>
>>>>>> > import Data.Iteratee as I
>>>>>> > import Data.Iteratee.Char as I
>>>>>> > import Data.Iteratee.IO as I
>>>>>> > import Control.Monad.IO.Class
>>>>>> > import Control.Monad.Trans.Class
>>>>>> > import Control.Exception
>>>>>> > import Control.Monad (when)
>>>>>> > import Data.Char (toUpper)
>>>>>>
>>>>>>
>>>>>> And then let's define some Iteratees
>>>>>>
>>>>>> This one just dumps all it gets from input
>>>>>>
>>>>>> > dump = printLinesUnterminated
>>>>>>
>>>>>> This one performs one seek and then dumps everything else
>>>>>>
>>>>>> > dumpAndSeek = I.seek 6 >> dump
>>>>>>
>>>>>> Let's define some Enumeratees
>>>>>>
>>>>>> This one - using regular mapChunks (and eneeCheckIfDone) (actually we
>>>>>> can use streamMap, but mapChunks's  type signature looks better)
>>>>>>
>>>>>> > upStream :: Enumeratee String String IO a
>>>>>> > upStream = mapChunks (map toUpper)
>>>>>>
>>>>>> This one - with my mapChunks (and modified eneeCheckIfDone)
>>>>>>
>>>>>> > upStream' :: Enumeratee String String IO a
>>>>>> > upStream' = mapChunks' (map toUpper)
>>>>>>
>>>>>> And it's time to do some test. File "hello.txt" contains message
>>>>>> "Hello world!!!!\n\n"
>>>>>>
>>>>>> > test1 = enumFileRandom 1 "hello.txt" dump
>>>>>>
>>>>>> As expected: Hello world!!!!
>>>>>>
>>>>>> > test2 = enumFileRandom 1 "hello.txt" dumpAndSeek
>>>>>>
>>>>>> world!!!!
>>>>>>
>>>>>> > test3 = enumFileRandom 1 "hello.txt" (joinI $ upStream dump)
>>>>>>
>>>>>> HELLO WORLD!!!!
>>>>>>
>>>>>> > test4 = enumFileRandom 1 "hello.txt" (joinI $ upStream dumpAndSeek)
>>>>>>
>>>>>> throwErr in eneeCheckIfDone - so it just hangs forever.
>>>>>> Unexpected behaviour.
>>>>>
>>>>> This is indeed a bug.
>>>>>
>>>>>>
>>>>>> > test5 = enumFileRandom 1 "hello.txt" (joinI $ upStream' dumpAndSeek)
>>>>>>
>>>>>> And with modified version - it works fine.
>>>>>> WORLD!!!!
>>>>>>
>>>>>> > test6 = enumFileRandom 1 "hello.txt" (joinI $ upStream (I.seek 6 >>
>>>>>> > stream2list)) >>= run >>= print
>>>>>>
>>>>>> hangs forever
>>>>>
>>>>> This looks like the same bug, since 'upStream' is defined in terms of
>>>>> 'mapChunks', which in turn is defined with 'eneeCheckIfDone'.
>>>>
>>>> It is the same bug, I just wanted to show it one more :)
>>>> I found mine version of this bug in the middle of huge multi-threaded
>>>> haskell program which takes ages to run so i decided to provide you
>>>> with a nice and simple version :)
>>>>
>>>>>
>>>>>>
>>>>>> > test7 = enumFileRandom 1 "hello.txt" (joinI $ upStream' (I.seek 6 >>
>>>>>> > stream2list)) >>= run >>= print
>>>>>>
>>>>>> "WORLD!!!!\n\n"
>>>>>>
>>>>>> I don't see why it must behave differently when I am applying a simple
>>>>>> transformation to the stream.
>>>>>> And if I am misunderstanding something - what is the proper way to
>>>>>> dump file contents from 6'th byte
>>>>>> to the and while applying map upCase to it. With iteratees.
>>>>>
>>>>> I would put the seek outside of the enumeratee stream.  Or, since you know
>>>>> you're using ASCII characters, use drop instead.
>>>>
>>>> Sure, that will work, but again - this is very simplified problem. In
>>>> the real world you need to use several layers of transformations and
>>>> decision to do seek is made in top most iteratee. So we need to be
>>>> able to pass exeptions as transparently as possible.
>>>>
>>>>
>>>>>> test8 = enumFileRandom 1 "hello.txt" (I.seek 6 >> joinI (upStream
>>>>>> stream2list)) >>= run >>= print
>>>>>
>>>>>>
>>>>>> And my modified implementation - it uses
>>>>>> eneeCheckIfDonePass (icont . step)
>>>>>> instead of
>>>>>> eneeCheckIfDone (liftI . go)
>>>>>>
>>>>>> > mapChunks' :: (Monad m, NullPoint s) => (s -> s') -> Enumeratee s s' m a
>>>>>> > mapChunks' f = eneeCheckIfDonePass (icont . step)
>>>>>> >     where
>>>>>> >         step k (Chunk xs)     = eneeCheckIfDonePass (icont . step) . k .
>>>>>> > Chunk $ f xs
>>>>>> >         step k str@(EOF mErr) = idone (k $ EOF mErr) str
>>>>>>
>>>>>>
>>>>>> eneeCheckIfDonePass - does not tries to handle any exceptions, just
>>>>>> passes them to
>>>>>> the parent Enumeratee/Enumerator
>>>>>>
>>>>>>
>>>>>> > eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
>>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>>> > elo m (Iteratee eli m a))
>>>>>> >     -> Enumeratee elo eli m a
>>>>>> > eneeCheckIfDonePass f inner = Iteratee $ \od oc ->
>>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>>> >         onCont k e = runIter (f k e) od oc
>>>>>> >     in runIter inner onDone onCont
>>>>>>
>>>>>> eneeCheckIfDoneHandle - Has a separate handler for exception, so user
>>>>>> can decide if
>>>>>> he wants to handle the exception or pass it to the partent.
>>>>>>
>>>>>> > eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
>>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>>> > elo m (Iteratee eli m a))
>>>>>> >     -> ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee
>>>>>> > elo m (Iteratee eli m a))
>>>>>> >     -> Enumeratee elo eli m a
>>>>>> > eneeCheckIfDoneHandle f h inner = Iteratee $ \od oc ->
>>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>>> >         onCont k Nothing = runIter (f k Nothing) od oc
>>>>>> >         onCont k (Just e) = runIter (h k e)      od oc
>>>>>> >     in runIter inner onDone onCont
>>>>>>
>>>>>> eneeCheckIfDoneIgnore - Ignores all exceptions
>>>>>>
>>>>>> > eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
>>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>>> > elo m (Iteratee eli m a))
>>>>>> >     -> Enumeratee elo eli m a
>>>>>> > eneeCheckIfDoneIgnore f inner = Iteratee $ \od oc ->
>>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>>> >         onCont k _ = runIter (f k Nothing) od oc
>>>>>> >     in runIter inner onDone onCont
>>>>>
>>>>> I need to spend a little more time reviewing these, but they all seem like
>>>>> useful alternatives.  Sometimes it makes sense for the stream transformer
>>>>> (enumeratee) to handle an exception, sometimes not.  In particular, seeking
>>>>> would have to be passed up to the handle enumerator.
>>>>> Unfortunately it's not quite that simple in all cases.   If there isn't a
>>>>> 1-1 correspondence between elements of the inner stream and the outer
>>>>> stream, how should seek behave?  Should it attempt to seek in the inner
>>>>> stream (which may not be possible), or just pass it up and assume you know
>>>>> what you're doing?  The second is much easier to implement, but I think the
>>>>> former would be more useful.
>>>>> Thoughts?
>>>>
>>>> Sure. You just need to create several different seeks and probably
>>>> rename I.seek to something more specific, like fileHandleSeek or
>>>> something like that. So if you want to go to specific point in file
>>>> (and you know that your enumeratee's chain can handle such seek - you
>>>> sends an exception named (FileSeek Offset). If you want to go to
>>>> specific time frame - you just fire up another exception - (TimeSeek
>>>> TimeOffset) and handle it in the appropriate place. You can use drop
>>>> for stream of chars if you are not doing any transformations, but if
>>>> each chunk takes 1 second to process and you need to drop 1 million of
>>>> them...
>>>>
>>>> So just create several types of exceptions, place handlers in
>>>> reasonable places, handle those exceptions that you can and pass then
>>>> further if you can't and create several different seek functions.
>>>>
>>>>> John
>>>>
>>>> _______________________________________________
>>>> Iteratee mailing list
>>>> Iteratee at projects.haskell.org
>>>> http://projects.haskell.org/cgi-bin/mailman/listinfo/iteratee
>>>>
>>>
>>
>>
>



More information about the Iteratee mailing list