@hackage kafka-effectful0.2.0.0

Effectful effects for hw-kafka-client

kafka-effectful

Effectful effects and interpreters for hw-kafka-client, a Haskell binding to Apache Kafka via librdkafka.

Provides typed, composable KafkaProducer and KafkaConsumer effects for the effectful ecosystem.

Status: experimental. This package is on its first release. The API may change in breaking ways in subsequent 0.x versions. Pin to an exact version in production until 1.0 is tagged.

Features

  • KafkaProducer -- send messages and flush the producer queue
  • KafkaConsumer -- poll messages (single or batch), manage offsets, assign/pause/resume/seek partitions, and query committed offsets, positions, assignments, and subscriptions
  • Resource-safe interpreters that acquire and release Kafka handles via bracket
  • Errors surfaced through Effectful.Error.Static (Error KafkaError)

Usage

Producer

import Kafka.Effectful

example :: (IOE :> es, Error KafkaError :> es) => Eff es ()
example =
  runKafkaProducer producerProps $ do
    produceMessage record
    flushProducer

Producer scenarios

The eight scenarios below mirror the producer-best-practices guide from hw-kafka-client. Each snippet uses only symbols exported from Kafka.Effectful; when a snippet needs produceMessage' or askProducerHandle, it also imports Kafka.Effectful.Producer.

Scenario 1 — Fire-and-forget

Register a global delivery callback via ProducerProperties and enqueue without blocking. runKafkaProducer flushes on scope exit.

fireAndForgetProps =
  brokersList ["localhost:9092"]
  <> setCallback (deliveryCallback logFailure)

runKafkaProducer fireAndForgetProps $
  forM_ events (produceMessage . toRecord)
Scenario 2 — Synchronous delivery confirmation

produceMessageSync allocates an MVar, flushes the producer, and returns the broker-assigned Offset — throwing KafkaError on any failure.

runKafkaProducer producerProps $ do
  offset <- produceMessageSync record
  liftIO $ putStrLn ("stored at offset " <> show offset)
Scenario 3 — Idempotent producer

Configuration only — no new call site is needed. Safe to enable by default; the broker deduplicates retries by (producer-id, sequence).

idempotentProps =
  brokersList ["localhost:9092"]
  <> extraProp "enable.idempotence" "true"
  <> extraProp "acks" "all"
  <> extraProp "max.in.flight.requests.per.connection" "5"
Scenario 4 — High-throughput batching

Combine produceMessageBatch with linger.ms, batch.size, and compression to trade a few milliseconds of latency for substantially higher throughput. The result contains only records that failed to enqueue.

batchProps =
  brokersList ["localhost:9092"]
  <> compression Snappy
  <> extraProp "linger.ms" "10"
  <> extraProp "batch.size" "65536"

runKafkaProducer batchProps $ do
  failures <- produceMessageBatch records
  unless (null failures) $
    liftIO $ putStrLn ("enqueue failures: " <> show (length failures))
Scenario 5 — Transactional ETL

Consume, transform, produce, and commit consumer offsets — all inside one producer transaction. commitOffsetMessageTransaction requires both the KafkaProducer and KafkaConsumer effects. TxError must be dispatched on in a fixed order: kafkaErrorTxnRequiresAbort, kafkaErrorIsRetriable, kafkaErrorIsFatal.

txProps =
  brokersList ["localhost:9092"]
  <> extraProp "transactional.id" "etl-1"
  <> extraProp "enable.idempotence" "true"
  <> extraProp "acks" "all"

etl = runKafkaProducer txProps $ runKafkaConsumer consumerProps sub $ do
  initTransactions (Timeout 10000)
  forever $ do
    msgs <- pollMessageBatch (Timeout 500) (BatchSize 100)
    let records = rights msgs
    unless (null records) $ do
      beginTransaction
      forM_ records (produceMessage . transform)
      forM_ (lastPerPartition records) $ \r ->
        commitOffsetMessageTransaction r (Timeout 5000)
          >>= handleTxResult
      commitTransaction (Timeout 5000) >>= handleTxResult

handleTxResult Nothing  = pure ()
handleTxResult (Just e)
  | kafkaErrorTxnRequiresAbort e = abortTransaction (Timeout 5000)
  | kafkaErrorIsRetriable e      = liftIO $ putStrLn "retry"
  | kafkaErrorIsFatal e          = throwError (getKafkaError e)
  | otherwise                    = liftIO $ putStrLn (show (getKafkaError e))
Scenario 6 — Keyed partitioning for ordering

Set prKey and leave prPartition = UnassignedPartition so the default hash partitioner routes every record for the same key to the same partition. Enable idempotence alongside, so a retry does not reorder.

orderedByKey userId event = ProducerRecord
  { prTopic     = TopicName "user-events"
  , prPartition = UnassignedPartition
  , prKey       = Just userId
  , prValue     = Just (encode event)
  , prHeaders   = mempty
  }
Scenario 7 — Custom partitioning and headers

Target a specific partition with SpecifiedPartition and attach per-message metadata via headersFromList.

shardedRecord (Shard n) payload = ProducerRecord
  { prTopic     = TopicName "sharded-events"
  , prPartition = SpecifiedPartition n
  , prKey       = Nothing
  , prValue     = Just payload
  , prHeaders   = headersFromList
      [ ("schema-version", "v3")
      , ("source",         "billing-api")
      ]
  }
Scenario 8 — Graceful shutdown

runKafkaProducer already brackets the handle — it flushes and closes the producer on normal scope exit, so enqueued records drain before the program continues. No explicit cleanup code is needed.

main =
  runEff . runError @KafkaError $
    runKafkaProducer props $ do
      forM_ events (produceMessage . toRecord)
      -- no explicit flush needed; runKafkaProducer flushes on exit

Consumer

import Kafka.Effectful

example :: (IOE :> es, Error KafkaError :> es) => Eff es ()
example =
  runKafkaConsumer consumerProps subscription loop
  where
    loop = do
      mbMsg <- pollMessage (Timeout 1000)
      case mbMsg of
        Nothing  -> loop
        Just msg -> do
          commitOffsetMessage OffsetCommit msg
          loop

pollMessage returns Nothing when the timeout elapses without a message arriving; non-timeout failures are thrown via the Error KafkaError effect.

Running it

The effect handlers runKafkaProducer and runKafkaConsumer require IOE and Error KafkaError in the effect stack. A complete program wires them with runEff and runError:

{-# LANGUAGE TypeApplications #-}

import Effectful
import Effectful.Error.Static (runError)
import Kafka.Effectful

main :: IO ()
main = do
  result <- runEff . runError @KafkaError $ runProgram
  case result of
    Left (_, err) -> putStrLn ("Kafka error: " <> show err)
    Right ()      -> pure ()
  where
    runProgram =
      runKafkaProducer producerProps $ do
        produceMessage record
        flushProducer

Replace producerProps and record with your own ProducerProperties and ProducerRecord values (see the Kafka.Effectful.Producer module for the available builders).

OpenTelemetry tracing

Swap runKafkaProducer for runKafkaProducerTraced tracer and runKafkaConsumer for runKafkaConsumerTraced tracer to add distributed tracing without changing any effect-level code. The traced interpreters open a Producer-kind span around every record-sending operation and a Consumer-kind span around every successful pollMessage / per-record success of pollMessageBatch, populated with the OpenTelemetry messaging semantic conventions (messaging.system, messaging.destination.name, messaging.operation, messaging.kafka.destination.partition, messaging.kafka.message.offset, messaging.kafka.message.key, messaging.kafka.consumer.group). The current OTel context is injected into the outgoing record's headers as W3C traceparent / tracestate so downstream consumers can extract it and continue the trace.

{-# LANGUAGE TypeApplications #-}

import Effectful
import Effectful.Error.Static (runError)
import Kafka.Effectful
import Kafka.Effectful.OpenTelemetry (runKafkaProducerTraced)
import OpenTelemetry.Trace
  ( initializeGlobalTracerProvider
  , makeTracer
  , tracerOptions
  )

main :: IO ()
main = do
  tp <- initializeGlobalTracerProvider
  let tracer = makeTracer tp "my-app" tracerOptions
  result <- runEff . runError @KafkaError $
    runKafkaProducerTraced tracer producerProps $ do
      produceMessage record
      flushProducer
  case result of
    Left (_, err) -> putStrLn ("Kafka error: " <> show err)
    Right ()      -> pure ()

Span names are "send <topic>" and "process <topic>". The default interpreters (runKafkaProducer, runKafkaConsumer) are unchanged and zero-cost for users who do not want tracing.

Compatibility with shibuya-kafka-adapter. The attribute keys this library emits (messaging.system, messaging.kafka.destination.partition, messaging.kafka.message.offset) agree with what shibuya-kafka-adapter's envelope-level attributes already produce. Layering the two yields a Receive→Process span split: kafka-effectful's poll span as parent, Shibuya's framework per-message span as child. If you want only one span per message instead of two, use either kafka-effectful's traced runner or Shibuya's framework span — not both.

The end-to-end demo in examples/OtelTracing.hs produces a record through the traced producer and reads it back through the traced consumer, printing both trace IDs and asserting they match. Run it against a local broker:

cabal run example-otel-tracing -f examples -- \
  --bootstrap-servers localhost:9092 \
  --topic otel-demo

Module Structure

Module Description
Kafka.Effectful Convenience re-export of both effects and common types
Kafka.Effectful.Producer Producer effect, interpreter, and types
Kafka.Effectful.Consumer Consumer effect, interpreter, and types
Kafka.Effectful.Producer.Effect KafkaProducer effect definition and operations
Kafka.Effectful.Producer.Interpreter runKafkaProducer interpreter
Kafka.Effectful.Producer.Transaction Cross-effect commitOffsetMessageTransaction helper
Kafka.Effectful.Consumer.Effect KafkaConsumer effect definition and operations
Kafka.Effectful.Consumer.Interpreter runKafkaConsumer interpreter
Kafka.Effectful.OpenTelemetry Single-import facade for the traced interpreters and helpers
Kafka.Effectful.OpenTelemetry.Producer.Interpreter runKafkaProducerTraced
Kafka.Effectful.OpenTelemetry.Consumer.Interpreter runKafkaConsumerTraced
Kafka.Effectful.OpenTelemetry.Semantic Pure attribute-builder helpers
Kafka.Effectful.OpenTelemetry.Propagation W3C trace-context header bridges

Requirements

  • GHC >= 9.12
  • librdkafka (system dependency)

License

MIT