@hackage streaming0.1.4.1

an elementary streaming prelude and general stream type.

Streaming.Prelude exports an elementary streaming prelude focused on a simple "source" or "producer" type, namely Stream (Of a) m r. This is a sort of effectful version of ([a],r) in which monadic action is interleaved between successive elements. The main module, Streaming, exports a much more general type, Stream f m r, which can be used to stream successive distinct steps characterized by any functor f, though we are here interested only in a limited range of cases.

The streaming-IO libraries have various devices for dealing with effectful variants of [a] or ([a],r). But it is only with the general type Stream f m r, or some equivalent, that one can envisage (for example) the connected streaming of their sorts of stream -- as one makes lists of lists in the Haskell Prelude and Data.List. One needs some such type if we are to express properly streaming equivalents of e.g.

group :: Ord a => [a] -> [[a]]
chunksOf :: Int -> [a] -> [[a]]
lines :: [Char] -> [[Char]] -- but similarly with bytestring, etc.

to mention a few obviously desirable operations. But once one grasps the iterable stream concept needed to express those functions - to wit, Stream f m r or some equivalent - then one will also see that, with it, one is already in possession of a complete elementary streaming library - since one possesses Stream ((,) a) m r or equivalently Stream (Of a) m r. This is the type of a 'generator' or 'producer' or whatever you call an effectful stream of items. The present Streaming.Prelude is thus the simplest streaming library that can replicate anything like the API of the Prelude and Data.List.

The emphasis of the library is on interoperation; for the rest its advantages are: extreme simplicity and re-use of intuitions the user has gathered from mastery of Prelude and Data.List. The two conceptual pre-requisites are some comprehension of monad transformers and some familiarity with 'rank 2 types'.

See the readme below for an explanation, including the examples linked there. Elementary usage can be divined from the ghci examples in Streaming.Prelude and perhaps from this rough beginning of a tutorial. Note also the streaming bytestring and streaming utils packages. Questions about usage can be put raised on StackOverflow with the tag [haskell-streaming], or as an issue on Github, or on the pipes list (the package understands itself as part of the pipes 'ecosystem'.)

The simplest form of interoperation with pipes is accomplished with this isomorphism:

Pipes.unfoldr Streaming.next        :: Stream (Of a) m r   -> Producer a m r
Streaming.unfoldr Pipes.next        :: Producer a m r      -> Stream (Of a) m r

Interoperation with io-streams is thus:

Streaming.reread IOStreams.read     :: InputStream a       -> Stream (Of a) IO ()
IOStreams.unfoldM Streaming.uncons  :: Stream (Of a) IO () -> IO (InputStream a)

With conduit one might use, e.g.:

Conduit.unfoldM Streaming.uncons                :: Stream (Of a) m ()  -> Source m a
Streaming.mapM_ Conduit.yield . hoist lift      :: Stream (Of o) m r -> ConduitM i o m r
($$ Conduit.mapM_ Streaming.yield) . hoist lift :: Source m a -> Stream (Of a) m ()

These conversions should never be more expensive than a single >-> or =$=.

Here is a simple example (conceptually it is a bit advanced, maybe) that runs a single underlying stream with several streaming-io libraries at once, superimposing their effects without any accumulation:

module Main (main) where
import Streaming
import Pipes
import Data.Conduit
import qualified Streaming.Prelude as S
import qualified Data.Conduit.List as CL
import qualified Pipes.Prelude as P
import qualified System.IO.Streams as IOS
import Data.ByteString.Char8 (pack)
import Data.Function ((&))

mkConduit  = CL.unfoldM S.uncons
mkPipe     = P.unfoldr S.next
mkIOStream = IOS.unfoldM S.uncons

main = iostreamed where
  urstream = S.take 3 S.readLn :: Stream (Of Int) IO ()
  streamed = S.copy urstream & S.map (\n -> "streaming says: " ++ show n)
                             & S.stdoutLn
  piped = runEffect $
    mkPipe (S.copy streamed) >-> P.map (\n -> "pipes says: " ++ show n)
                             >-> P.stdoutLn
  conduited =
    mkConduit (S.copy piped) $$ CL.map (\n -> "conduit says:  " ++ show n)
                             =$ CL.mapM_ (liftIO . putStrLn)
  iostreamed = do
    str0 <- mkIOStream conduited
    str1 <- IOS.map (\n -> pack $ "io-streams says: " ++ show n ++ "\n") str0
    IOS.supply str1 IOS.stdout

This program successively parses three Ints from standard input, and simulaneously passes them to (here trivial) stream-consuming processes from four different libraries, using the copy function from Streaming.Prelude. I mark my own input with <Enter> below:

>>> main
1 <Enter>
streaming says: 1
pipes says: 1
conduit says:  1
io-streams says: 1
2 <Enter>
streaming says: 2
pipes says: 2
conduit says:  2
io-streams says: 2
3 <Enter>
streaming says: 3
pipes says: 3
conduit says:  3
io-streams says: 3
>>>

Of course, I could as well have passed the stream to several independent conduits; and I might have derived the original stream from a conduit Source or pipes Producer etc., using one of the 'conversion' functions above. Further points of comparison with the going streaming-IO libraries are discussed in the readme below.

Here are the results of some microbenchmarks based on the benchmarks included in the machines package:

Because these are microbenchmarks for individual functions, they represent a sort of "worst case"; many other factors can influence the speed of a complex program.