0.4.0 docs





    May 4, 2016

    Sam Pegler
    Infectious Media
    London, England.


    Index of all namespaces

    « Project + dependencies

    Clojure wrapper for Kafka's Java API

    Clojure interface for Kafka Consumer API. For
      complete JavaDocs
    Clojure interface for Kafka Producer API. For
    complete JavaDocs, see:
    The README below is fetched from the published project artifact. Some relative links may be broken.


    WIP fork/update for Kafka 0.9, for the love of god do not use.

    Clojure library for Kafka.

    Current build status: Build Status

    Development is against the 0.8 release of Kafka.


    Add the following to your Leiningen project.clj:

    latest clj-kafka version



    Discovery of Kafka brokers from Zookeeper:

    (brokers {"zookeeper.connect" ""})
    ;; ({:host "localhost", :jmx_port -1, :port 9999, :version 1})
    (use 'clj-kafka.producer)
    (def p (producer {"" "localhost:9999"
                      "serializer.class" "kafka.serializer.DefaultEncoder"
                      "partitioner.class" "kafka.producer.DefaultPartitioner"}))
    (send-message p (message "test" (.getBytes "this is my message")))

    See: clj-kafka.producer

    New Producer

    As of 0.3.1 we also support the “new” pure-Java producer. The interface is superficially similar but we’ve chosen to keep names close to their Java equivalent.

    (use '
    (with-open [p (producer {"bootstrap.servers" ""} (byte-array-serializer) (byte-array-serializer))]
      (send p (record "test-topic" (.getBytes "hello world!"))))

    One key difference is that sending is asynchronous by default. send returns a Future immediately. If you want synchronous behaviour you can deref it right away:

    (with-open [p (producer {"bootstrap.servers" ""} (byte-array-serializer) (byte-array-serializer))]
      @(send p (record "test-topic" (.getBytes "hello world!"))))


    Zookeeper Consumer

    The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they’re not retrieved again.

    (use 'clj-kafka.consumer.zk)
    (use 'clj-kafka.core)
    (def config {"zookeeper.connect" "localhost:2182"
                 "" "clj-kafka.consumer"
                 "auto.offset.reset" "smallest"
                 "auto.commit.enable" "false"})
    (with-resource [c (consumer config)]
      (take 2 (messages c "test")))

    The messages function provides the easy-case of single topic and single thread consumption. This is a stricter form of the same API that was in earlier releases. messages is built on two key other functions: create-message-streams and stream-seq that create the underlying streams and turn them into lazy sequences respectively; this change makes it easier to consume across multiple partitions and threads.

    See: clj-kafka.consumer.zk

    Usage with transducers

    An alternate way of consuming is using create-message-stream or create-message-streams to obtain KafkaStream instances. These are Iterable which means, amongst other things, that they work nicely with transducers.

    Continuing previous example:

    ;; hypothetical transformation
    (def xform (comp (map deserialize-message)
                     (filter production-traffic)
                     (map parse-user-agent-string)))
    (with-resource [c (consumer config)]
      (let [stream (create-message-stream c "test-topic")]
        (run! write-to-database! (eduction xform stream))))

    Administration Operations

    There is support the following simple administration operations:

    • checking if a topic exists
    • creating a topic
    • deleting a topic (requires that the Kafka cluster supports deletion and has delete.topic.enable set to true)
    • retrieving topic configuration
    • changing topic configuration
    (require '[clj-kafka.admin :as admin])
    (with-open [zk (admin/zk-client "")]
      (if-not (admin/topic-exists? zk "test-topic")
        (admin/create-topic zk "test-topic"
                            {:partitions 3
                             :replication-factor 1
                             :config {"cleanup.policy" "compact"}})))

    See: clj-kafka.admin

    Kafka Offset Manager Operations

    There is support the following simple Kafka offset management operations:

    • fetch the current offsets of a consumer group
    • reset the current offsets of a consumer group
    (require '[clj-kafka.offset :as offset])
    (fetch-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer")
    (reset-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer" :earliest)
    (reset-consumer-offsets "broker1:9092,broker1:9092" {"zookeeper.connect" "zkhost:2182"} "my-topic" "my-consumer" :latest)

    See: clj-kafka.admin


    Copyright © 2013 Paul Ingles

    Distributed under the Eclipse Public License, the same as Clojure.


    YourKit is kindly supporting this open source project with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit’s leading software products: YourKit Java Profiler and YourKit .NET Profiler.