iteratee Strange behaviour eneeCheckIfDone

John Lato jwlato at gmail.com
Fri Aug 26 10:37:45 BST 2011


I pushed some additions to the HEAD repo several days ago, but I'm not
sure that they're actually what we want.  My two concerns:

1.  I think that this

> type EnumerateeHandler eli elo m a = (Stream eli -> Iteratee eli m a)
>      -> SomeException -> Iteratee elo m (Iteratee eli m a)

should be

> type EnumerateeHandler e eli elo m a = (Stream eli -> Iteratee eli m a)
>     -> e -> Iteratee elo m (Iteratee eli m a)

with the constraint (Exception e), to parallel the exception handlers
in Control.Exception.  Of course this places the onus of pattern
matching on the Enumeratee rather than the handler.

I'm pretty sure this is the right thing to do here.

2.  I think the the implementation for unfoldConvStreamCheck needs to
be a bit more complicated than originally proposed.  Currently I have
this:

unfoldConvStreamCheck
  :: (Monad m, Nullable elo, Monoid elo)
  => (((Stream eli -> Iteratee eli m a)
        -> Maybe SomeException
        -> Iteratee elo m (Iteratee eli m a)
      )
      -> Enumeratee elo eli m a
     )
  -> (acc -> Iteratee elo m (acc, eli))
  -> acc
  -> Enumeratee elo eli m a
unfoldConvStreamCheck checkDone f acc0 = checkDone (check acc0)
  where
    check acc k mX = isStreamFinished >>=
                   maybe (step acc k mX) (idone (icont k mX) . EOF . Just)
    step acc k Nothing = f acc >>= \(acc', s') ->
                  (checkDone (check acc') . k $ Chunk s')
    step acc k (Just ex) = throwRecoverableErr ex $ \str' ->
      let i = f acc >>= \(acc', s') ->
                           (checkDone (check acc') . k $ Chunk s')
      in joinIM $ enumChunk str' i

The `step` function needs to check the exception, because otherwise an
exception would only be propagated when the outer stream is terminated
(isStreamFinished returns a wrapped exception).

Working with this makes me suspect that the type of
`throwRecoverableErr` is wrong.  There are a few instances where I've
needed to manually wrap an Iteratee into a `Stream -> Iteratee` just
for this case, and it's easy to get wrong.  Compare to `seek`, which
is currently this:

-- |Seek to a position in the stream
seek :: (Monad m, NullPoint s) => FileOffset -> Iteratee s m ()
seek o = throwRecoverableErr (toException $ SeekException o) (const identity)

Obviously the stream initially passed after a seek is discarded.  This
only works because `enumFromCallbackCatch` passes an empty chunk after
an exception is handled, specifically to deal with `const identity`
returned by seek.

tl;dr - this is slightly tricky, and I'd appreciate a review of this
implementation of unfoldConvStreamCheck before I push a new 0.8.x
release.  Also `throwRecoverableErr` is likely to change for 0.9.

Thanks,
John

On Wed, Aug 17, 2011 at 10:20 AM, John Lato <jwlato at gmail.com> wrote:
> Hi Conrad,
>
> Apologies for the latency over the past few weeks, been busy with
> personal stuff.
>
> Current plan is to add the enee* functions and push a new minor
> release over the next few days.
>
> I'll probably push an 0.9 release not long after (within 2 weeks?),
> which changes the semantics of enee*-dependent functions and a few
> other changes, on the grounds that the semantic changes are very
> useful and I don't want to delay them any longer.
>
> Changes to the iteratee type itself will wait for 0.10, which will be
> a bit longer.  Will possibly include more explicit exception types and
> implementation changes, this needs more research.
>
> Best,
> John
>
> On Wed, Aug 17, 2011 at 8:54 AM, Conrad Parker <conrad at metadecks.org> wrote:
>> John,
>>
>> what are your current thoughts about these enee* updates?
>>
>> Conrad.
>>
>> On 28 July 2011 08:40, Conrad Parker <conrad at metadecks.org> wrote:
>>> On 28 July 2011 01:21, John Lato <jwlato at gmail.com> wrote:
>>>> Hi Conrad,
>>>>
>>>> Thanks very much for this.  Unfortunately I won't be able to really
>>>> think about it until next week, but a few preliminary thoughts...
>>>>
>>>> Is it possible to implement these changes as part of the 0.8.* line
>>>> (by the PVP), or would a bump to 0.9 be necessary (or desirable)?
>>>
>>> If only the suggested four functions and one type alias were added
>>> then I think it would be fine for the 0.8 line as nothing is
>>> removed and no existing behavior is changed.
>>>
>>> I did also note that new functions or changed behavior could be
>>> introduced for breakE, mapChunks etc.; this changed behavior
>>> could be delayed for a later release. Perhaps it is worth introducing
>>> a cleaner abstraction for an application to indicate how it
>>> wants to handle errors in a new major version.
>>>
>>>> If it is a major version bump, then what else should go in the major
>>>> update?  Here are a few things I've been considering that also require
>>>> a major bump:
>>>>
>>>> 1.  Support for monad-control (mostly implemented already)
>>>> 2.  Stream type tagging (e.g. to indicate that a stream supports
>>>> seeking).  Not implemented, and I'm not sure how I would implement it,
>>>> I need to re-read "data types a la carte"...
>>>> 3.  Change the iteratee implementation; I'm thinking of doing
>>>> something a bit closer to a CPS-transform of Oleg's current
>>>> IterateeM.hs.  Untested, but probably will offer better performance.
>>>> Any iteratees built from "liftI", "icont", or "idone" would need to be
>>>> re-written though; it's a major change.
>>>> 4.  Some relatively minor changes to enumFromCallback and IO-related stuff.
>>>>
>>>> (I'm sure I'm forgetting something)
>>>>
>>>> 1) and 4) can be done relatively easily with low impact.  3) is simple
>>>> from my end, but would be a major breaking change.  I really don't
>>>> know how much effort 2) would be.
>>>>
>>>> Anyway, if it is a major update, I suggest doing the enee* updates
>>>> with 1) and 4), and holding off on 2) and 3) for several months.
>>>> Alternatively I could wait and do the whole batch of changes at once,
>>>> but it would be several months before it's ready.
>>>
>>> We'd be happy with the enee* updates in a 0.8 release, as much because
>>> I like to encourage getting upstream acceptance for changes we plan to
>>> use in production code :)
>>>
>>> cheers,
>>>
>>> Conrad.
>>>
>>>>
>>>> Best,
>>>> John
>>>>
>>>> On Wed, Jul 27, 2011 at 9:25 AM, Conrad Parker <conrad at metadecks.org> wrote:
>>>>> Hi John,
>>>>>
>>>>> Michael and I have worked through his proposed eneeCheckIfDone*
>>>>> variants this afternoon, and we would like to propose the following
>>>>> updated functions: eneeCheckIfDonePass, eneeCheckIfDoneIgnore,
>>>>> eneeCheckIfDoneHandle, unfoldConvStreamCheck.
>>>>>
>>>>> A version of unfoldConvStream that can handle seeking should just be a
>>>>> matter of replacing the call to eneeCheckIfDone with
>>>>> eneeCheckIfDonePass or eneeCheckIfDoneHandle. We could also do the
>>>>> same for variants of convStream. However if we go down this path, it
>>>>> seems we would have 4 versions of each of these *convStream functions,
>>>>> as well as the 4 versions of eneeCheckIfDone.
>>>>>
>>>>> An alternative would be to add a parameter to unfoldConvStream of the
>>>>> same type as eneeCheckIfDonePass:
>>>>>
>>>>> unfoldConvStreamCheck ::
>>>>>  (Monad m, Nullable s) =>
>>>>>  (((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>>>> Iteratee elo m (Iteratee eli m a)) -> Enumeratee elo eli m a)
>>>>>  -> (acc -> Iteratee s m (acc, s'))
>>>>>  -> acc
>>>>>  -> Enumeratee s s' m a
>>>>> unfoldConvStreamCheck checkDone f acc0 = checkDone (check acc0)
>>>>>  where
>>>>>    check acc k = isStreamFinished >>=
>>>>>                    maybe (step acc k) (idone (icont k) . EOF . Just)
>>>>>    step acc k = f acc >>= \(acc', s') ->
>>>>>                    checkDone (check acc') . k . Chunk $ s'
>>>>>
>>>>> such that an enumeratee that passes on seek exceptions can be built
>>>>> with a call to:
>>>>>
>>>>> unfoldConvStreamCheck eneeCheckIfDonePass
>>>>>
>>>>> In order to allow partial applications of eneeCheckIfDoneHandle to be
>>>>> passed to unfoldConvStreamCheck, we must reverse the order of the
>>>>> first two arguments (compared to the earlier proposal):
>>>>>
>>>>> -- | The same as eneeCheckIfDonePass, with one extra argument: a
>>>>> handler which is used
>>>>> -- to process any exceptions in a separate method.
>>>>> eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
>>>>>    ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee elo
>>>>> m (Iteratee eli m a))
>>>>>    -> ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>>>> Iteratee elo m (Iteratee eli m a))
>>>>>    -> Enumeratee elo eli m a
>>>>> eneeCheckIfDoneHandle h f inner = Iteratee $ \od oc ->
>>>>>    let onDone x s = od (idone x s) (Chunk empty)
>>>>>        onCont k Nothing  = runIter (f k Nothing) od oc
>>>>>        onCont k (Just e) = runIter (h k e)       od oc
>>>>>    in runIter inner onDone onCont
>>>>>
>>>>> Then the remaining variants can be rewritten in terms of this function:
>>>>>
>>>>> eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
>>>>>    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>>>> Iteratee elo m (Iteratee eli m a))
>>>>>    -> Enumeratee elo eli m a
>>>>> eneeCheckIfDonePass f inner = eneeCheckIfDoneHandle (\k e -> f k (Just e))
>>>>>
>>>>> eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
>>>>>    ((Stream eli -> Iteratee eli m a) -> Maybe SomeException ->
>>>>> Iteratee elo m (Iteratee eli m a))
>>>>>    -> Enumeratee elo eli m a
>>>>> eneeCheckIfDoneIgnore f = eneeCheckIfDoneHandle (\k _ -> f k Nothing)
>>>>>
>>>>>
>>>>> These functions allow us to build stream-processing applications with
>>>>> multiple, layered enumeratees, where the controlling application
>>>>> provides an iteratee that may initiate seeking. Seek exceptions are
>>>>> passed from the inner iteratee outwards to fileDriverRandomFd, as you
>>>>> have also suggested.
>>>>>
>>>>> One feature of this approach, making use of the extensible exceptions,
>>>>> is that each layer of the enumeratee stack can provide a custom "seek"
>>>>> exception, eg. using different units. Enumeratees can then convert
>>>>> seek requests from inner units like "time" or "video frames" to outer
>>>>> units such as "file offset", such that dealing with seek tables etc.
>>>>> can be isolated into the enumeratee that deals with generating frames,
>>>>> rather than exposed to the application.
>>>>>
>>>>> Michael also suggests that custom exceptions can be used to initiate
>>>>> other control requests such as cache flushing, and these functions
>>>>> would allow such messages to also be propagated.
>>>>>
>>>>> A remaining issue would be whether it is also useful to provide
>>>>> similar seekable versions of breakE, mapChunks and the various library
>>>>> functions that use mapChunks.
>>>>>
>>>>> cheers,
>>>>>
>>>>> Conrad.
>>>>>
>>>>> On 7 July 2011 18:50, Michael Baikov <manpacket at gmail.com> wrote:
>>>>>> On Thu, Jul 7, 2011 at 6:10 PM, John Lato <jwlato at gmail.com> wrote:
>>>>>>> Hi Michael,
>>>>>>> Thanks for this, more comments inline.
>>>>>>>
>>>>>>> On Thu, Jul 7, 2011 at 3:59 AM, Michael Baikov <manpacket at gmail.com> wrote:
>>>>>>>>
>>>>>>>> First let's import some things, which will be used later
>>>>>>>>
>>>>>>>> > import Data.Iteratee as I
>>>>>>>> > import Data.Iteratee.Char as I
>>>>>>>> > import Data.Iteratee.IO as I
>>>>>>>> > import Control.Monad.IO.Class
>>>>>>>> > import Control.Monad.Trans.Class
>>>>>>>> > import Control.Exception
>>>>>>>> > import Control.Monad (when)
>>>>>>>> > import Data.Char (toUpper)
>>>>>>>>
>>>>>>>>
>>>>>>>> And then let's define some Iteratees
>>>>>>>>
>>>>>>>> This one just dumps all it gets from input
>>>>>>>>
>>>>>>>> > dump = printLinesUnterminated
>>>>>>>>
>>>>>>>> This one performs one seek and then dumps everything else
>>>>>>>>
>>>>>>>> > dumpAndSeek = I.seek 6 >> dump
>>>>>>>>
>>>>>>>> Let's define some Enumeratees
>>>>>>>>
>>>>>>>> This one - using regular mapChunks (and eneeCheckIfDone) (actually we
>>>>>>>> can use streamMap, but mapChunks's  type signature looks better)
>>>>>>>>
>>>>>>>> > upStream :: Enumeratee String String IO a
>>>>>>>> > upStream = mapChunks (map toUpper)
>>>>>>>>
>>>>>>>> This one - with my mapChunks (and modified eneeCheckIfDone)
>>>>>>>>
>>>>>>>> > upStream' :: Enumeratee String String IO a
>>>>>>>> > upStream' = mapChunks' (map toUpper)
>>>>>>>>
>>>>>>>> And it's time to do some test. File "hello.txt" contains message
>>>>>>>> "Hello world!!!!\n\n"
>>>>>>>>
>>>>>>>> > test1 = enumFileRandom 1 "hello.txt" dump
>>>>>>>>
>>>>>>>> As expected: Hello world!!!!
>>>>>>>>
>>>>>>>> > test2 = enumFileRandom 1 "hello.txt" dumpAndSeek
>>>>>>>>
>>>>>>>> world!!!!
>>>>>>>>
>>>>>>>> > test3 = enumFileRandom 1 "hello.txt" (joinI $ upStream dump)
>>>>>>>>
>>>>>>>> HELLO WORLD!!!!
>>>>>>>>
>>>>>>>> > test4 = enumFileRandom 1 "hello.txt" (joinI $ upStream dumpAndSeek)
>>>>>>>>
>>>>>>>> throwErr in eneeCheckIfDone - so it just hangs forever.
>>>>>>>> Unexpected behaviour.
>>>>>>>
>>>>>>> This is indeed a bug.
>>>>>>>
>>>>>>>>
>>>>>>>> > test5 = enumFileRandom 1 "hello.txt" (joinI $ upStream' dumpAndSeek)
>>>>>>>>
>>>>>>>> And with modified version - it works fine.
>>>>>>>> WORLD!!!!
>>>>>>>>
>>>>>>>> > test6 = enumFileRandom 1 "hello.txt" (joinI $ upStream (I.seek 6 >>
>>>>>>>> > stream2list)) >>= run >>= print
>>>>>>>>
>>>>>>>> hangs forever
>>>>>>>
>>>>>>> This looks like the same bug, since 'upStream' is defined in terms of
>>>>>>> 'mapChunks', which in turn is defined with 'eneeCheckIfDone'.
>>>>>>
>>>>>> It is the same bug, I just wanted to show it one more :)
>>>>>> I found mine version of this bug in the middle of huge multi-threaded
>>>>>> haskell program which takes ages to run so i decided to provide you
>>>>>> with a nice and simple version :)
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> > test7 = enumFileRandom 1 "hello.txt" (joinI $ upStream' (I.seek 6 >>
>>>>>>>> > stream2list)) >>= run >>= print
>>>>>>>>
>>>>>>>> "WORLD!!!!\n\n"
>>>>>>>>
>>>>>>>> I don't see why it must behave differently when I am applying a simple
>>>>>>>> transformation to the stream.
>>>>>>>> And if I am misunderstanding something - what is the proper way to
>>>>>>>> dump file contents from 6'th byte
>>>>>>>> to the and while applying map upCase to it. With iteratees.
>>>>>>>
>>>>>>> I would put the seek outside of the enumeratee stream.  Or, since you know
>>>>>>> you're using ASCII characters, use drop instead.
>>>>>>
>>>>>> Sure, that will work, but again - this is very simplified problem. In
>>>>>> the real world you need to use several layers of transformations and
>>>>>> decision to do seek is made in top most iteratee. So we need to be
>>>>>> able to pass exeptions as transparently as possible.
>>>>>>
>>>>>>
>>>>>>>> test8 = enumFileRandom 1 "hello.txt" (I.seek 6 >> joinI (upStream
>>>>>>>> stream2list)) >>= run >>= print
>>>>>>>
>>>>>>>>
>>>>>>>> And my modified implementation - it uses
>>>>>>>> eneeCheckIfDonePass (icont . step)
>>>>>>>> instead of
>>>>>>>> eneeCheckIfDone (liftI . go)
>>>>>>>>
>>>>>>>> > mapChunks' :: (Monad m, NullPoint s) => (s -> s') -> Enumeratee s s' m a
>>>>>>>> > mapChunks' f = eneeCheckIfDonePass (icont . step)
>>>>>>>> >     where
>>>>>>>> >         step k (Chunk xs)     = eneeCheckIfDonePass (icont . step) . k .
>>>>>>>> > Chunk $ f xs
>>>>>>>> >         step k str@(EOF mErr) = idone (k $ EOF mErr) str
>>>>>>>>
>>>>>>>>
>>>>>>>> eneeCheckIfDonePass - does not tries to handle any exceptions, just
>>>>>>>> passes them to
>>>>>>>> the parent Enumeratee/Enumerator
>>>>>>>>
>>>>>>>>
>>>>>>>> > eneeCheckIfDonePass :: (Monad m, NullPoint elo) =>
>>>>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>>>>> > elo m (Iteratee eli m a))
>>>>>>>> >     -> Enumeratee elo eli m a
>>>>>>>> > eneeCheckIfDonePass f inner = Iteratee $ \od oc ->
>>>>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>>>>> >         onCont k e = runIter (f k e) od oc
>>>>>>>> >     in runIter inner onDone onCont
>>>>>>>>
>>>>>>>> eneeCheckIfDoneHandle - Has a separate handler for exception, so user
>>>>>>>> can decide if
>>>>>>>> he wants to handle the exception or pass it to the partent.
>>>>>>>>
>>>>>>>> > eneeCheckIfDoneHandle :: (Monad m, NullPoint elo) =>
>>>>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>>>>> > elo m (Iteratee eli m a))
>>>>>>>> >     -> ((Stream eli -> Iteratee eli m a) -> SomeException -> Iteratee
>>>>>>>> > elo m (Iteratee eli m a))
>>>>>>>> >     -> Enumeratee elo eli m a
>>>>>>>> > eneeCheckIfDoneHandle f h inner = Iteratee $ \od oc ->
>>>>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>>>>> >         onCont k Nothing = runIter (f k Nothing) od oc
>>>>>>>> >         onCont k (Just e) = runIter (h k e)      od oc
>>>>>>>> >     in runIter inner onDone onCont
>>>>>>>>
>>>>>>>> eneeCheckIfDoneIgnore - Ignores all exceptions
>>>>>>>>
>>>>>>>> > eneeCheckIfDoneIgnore :: (Monad m, NullPoint elo) =>
>>>>>>>> >     ((Stream eli -> Iteratee eli m a) -> Maybe SomeException -> Iteratee
>>>>>>>> > elo m (Iteratee eli m a))
>>>>>>>> >     -> Enumeratee elo eli m a
>>>>>>>> > eneeCheckIfDoneIgnore f inner = Iteratee $ \od oc ->
>>>>>>>> >     let onDone x s = od (idone x s) (Chunk empty)
>>>>>>>> >         onCont k _ = runIter (f k Nothing) od oc
>>>>>>>> >     in runIter inner onDone onCont
>>>>>>>
>>>>>>> I need to spend a little more time reviewing these, but they all seem like
>>>>>>> useful alternatives.  Sometimes it makes sense for the stream transformer
>>>>>>> (enumeratee) to handle an exception, sometimes not.  In particular, seeking
>>>>>>> would have to be passed up to the handle enumerator.
>>>>>>> Unfortunately it's not quite that simple in all cases.   If there isn't a
>>>>>>> 1-1 correspondence between elements of the inner stream and the outer
>>>>>>> stream, how should seek behave?  Should it attempt to seek in the inner
>>>>>>> stream (which may not be possible), or just pass it up and assume you know
>>>>>>> what you're doing?  The second is much easier to implement, but I think the
>>>>>>> former would be more useful.
>>>>>>> Thoughts?
>>>>>>
>>>>>> Sure. You just need to create several different seeks and probably
>>>>>> rename I.seek to something more specific, like fileHandleSeek or
>>>>>> something like that. So if you want to go to specific point in file
>>>>>> (and you know that your enumeratee's chain can handle such seek - you
>>>>>> sends an exception named (FileSeek Offset). If you want to go to
>>>>>> specific time frame - you just fire up another exception - (TimeSeek
>>>>>> TimeOffset) and handle it in the appropriate place. You can use drop
>>>>>> for stream of chars if you are not doing any transformations, but if
>>>>>> each chunk takes 1 second to process and you need to drop 1 million of
>>>>>> them...
>>>>>>
>>>>>> So just create several types of exceptions, place handlers in
>>>>>> reasonable places, handle those exceptions that you can and pass then
>>>>>> further if you can't and create several different seek functions.
>>>>>>
>>>>>>> John
>>>>>>
>>>>>> _______________________________________________
>>>>>> Iteratee mailing list
>>>>>> Iteratee at projects.haskell.org
>>>>>> http://projects.haskell.org/cgi-bin/mailman/listinfo/iteratee
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>



More information about the Iteratee mailing list