Skip to content

Commit 3310ce7

Browse files
authored
Merge pull request #2 from empperi/use-interfaces-in-typehints
Change typehints to use Consumer and Producer
2 parents d7187e4 + a02934f commit 3310ce7

File tree

3 files changed

+27
-27
lines changed

3 files changed

+27
-27
lines changed

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
(defproject net.tbt-post/clj-kafka-x "0.4.0"
1+
(defproject net.tbt-post/clj-kafka-x "0.4.1"
22
:description "A Clojure wrapper for Apache Kafka v2 client"
33
:url "https://github.com/source-c/clj-kafka-x"
44
:license {:name "Apache License 2.0"

src/clj_kafka_x/consumers/simple.clj

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ clj-kafka-x.consumers.simple
55
(:require [clj-kafka-x.data :refer :all])
66
(:import java.util.List
77
java.util.regex.Pattern
8-
[org.apache.kafka.clients.consumer ConsumerRebalanceListener KafkaConsumer OffsetAndMetadata OffsetCommitCallback]
8+
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetAndMetadata OffsetCommitCallback]
99
[org.apache.kafka.common.serialization ByteArrayDeserializer Deserializer StringDeserializer]
1010
org.apache.kafka.common.TopicPartition
1111
(java.util Map)))
@@ -93,7 +93,7 @@ clj-kafka-x.consumers.simple
9393
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
9494
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)
9595
"
96-
[^KafkaConsumer consumer topics & {:keys [assigned-callback revoked-callback]
96+
[^Consumer consumer topics & {:keys [assigned-callback revoked-callback]
9797
:or {assigned-callback (fn [_])
9898
revoked-callback (fn [_])}}]
9999
;;TODO needs to be cleaned up and refactored
@@ -134,7 +134,7 @@ clj-kafka-x.consumers.simple
134134
;; {:topic \"topic-b\", :partitions #{0 1 2}},
135135
;; {:topic \"topic-c\", :partitions #{}}]
136136
"
137-
[^KafkaConsumer consumer]
137+
[^Consumer consumer]
138138
;;TODO is this clear and readable enough ? refactor?
139139
(let [auto-subs (.subscription consumer)
140140
manual-subs (.assignment consumer)
@@ -153,7 +153,7 @@ clj-kafka-x.consumers.simple
153153
(defn unsubscribe
154154
"Unsubcribes the consumer from any subscribed topics and/or partitions.
155155
It works for subscriptions carried out via subscribe-to-topics or subscribe-to-partitions functions"
156-
[^KafkaConsumer consumer]
156+
[^Consumer consumer]
157157
(.unsubscribe consumer))
158158

159159
(defn seek
@@ -190,9 +190,9 @@ clj-kafka-x.consumers.simple
190190
;; => nil
191191
192192
"
193-
([^KafkaConsumer consumer topic partition offset]
193+
([^Consumer consumer topic partition offset]
194194
(seek consumer (vector {:topic topic :partition partition}) offset))
195-
([^KafkaConsumer consumer tp-seq offset]
195+
([^Consumer consumer tp-seq offset]
196196
(let [tp-class-seq (map map->topic-partition tp-seq)
197197
tp-class-array (into-array TopicPartition tp-class-seq)]
198198
(cond
@@ -229,7 +229,7 @@ clj-kafka-x.consumers.simple
229229
;; :value \"Count Zero says 3 at Fri Mar 11 14:34:32 GMT 2016\"}]
230230
231231
"
232-
[^KafkaConsumer consumer & {:keys [timeout] :or {timeout 1000}}]
232+
[^Consumer consumer & {:keys [timeout] :or {timeout 1000}}]
233233

234234
(let [consumer-records (.poll consumer timeout)]
235235
(to-clojure consumer-records)))
@@ -272,13 +272,13 @@ clj-kafka-x.consumers.simple
272272
(println \"Commits passed for \" offsets))))
273273
;; => nil
274274
"
275-
([^KafkaConsumer consumer] (.commitAsync consumer))
276-
([^KafkaConsumer consumer offset-commit-fn]
275+
([^Consumer consumer] (.commitAsync consumer))
276+
([^Consumer consumer offset-commit-fn]
277277
(let [callback (reify OffsetCommitCallback
278278
(onComplete [_ offsets exception]
279279
(offset-commit-fn (tp-om-map->map offsets) exception)))]
280280
(.commitAsync consumer callback)))
281-
([^KafkaConsumer consumer topic-partition-offsets-metadata offset-commit-fn]
281+
([^Consumer consumer topic-partition-offsets-metadata offset-commit-fn]
282282
(let [callback (reify OffsetCommitCallback
283283
(onComplete [_ offsets exception]
284284
(offset-commit-fn (tp-om-map->map offsets) exception)))
@@ -306,8 +306,8 @@ clj-kafka-x.consumers.simple
306306
(commit-sync consumer tp-om)
307307
;; => nil
308308
"
309-
([^KafkaConsumer consumer] (.commitSync consumer))
310-
([^KafkaConsumer consumer topic-partitions-offsets-metadata]
309+
([^Consumer consumer] (.commitSync consumer))
310+
([^Consumer consumer topic-partitions-offsets-metadata]
311311
(let [tp-om-map (map->tp-om-map topic-partitions-offsets-metadata)]
312312
(.commitSync consumer tp-om-map))))
313313

@@ -323,7 +323,7 @@ clj-kafka-x.consumers.simple
323323
(last-committed-offset consumer {:topic \"topic-a\" :partition 2})
324324
;; => {:offset 10, :metadata \"Metadata set during commit\"}
325325
"
326-
[^KafkaConsumer consumer tp]
326+
[^Consumer consumer tp]
327327
(->> tp
328328
map->topic-partition
329329
(.committed consumer)
@@ -362,7 +362,7 @@ clj-kafka-x.consumers.simple
362362
;; :replicas [{:id 2, :host \"172.17.0.3\", :port 9093}],
363363
;; :in-sync-replicas [{:id 2, :host \"172.17.0.3\", :port 9093}]}]}
364364
"
365-
[^KafkaConsumer consumer]
365+
[^Consumer consumer]
366366
(str-pi-map->map (.listTopics consumer)))
367367

368368
(defn list-all-partitions
@@ -390,7 +390,7 @@ clj-kafka-x.consumers.simple
390390
;; :replicas [{:id 2, :host \"172.17.0.3\", :port 9093}],
391391
;; :in-sync-replicas [{:id 2, :host \"172.17.0.3\", :port 9093}]}]
392392
"
393-
[^KafkaConsumer consumer topic]
393+
[^Consumer consumer topic]
394394
(mapv to-clojure (.partitionsFor consumer topic)))
395395

396396

@@ -404,7 +404,7 @@ clj-kafka-x.consumers.simple
404404
(pause consumer {:topic \"topic-a\" :partition 2}
405405
{:topic \"topic-b\" :partition 0})
406406
"
407-
[^KafkaConsumer consumer tp-seq]
407+
[^Consumer consumer tp-seq]
408408
(->> (map map->topic-partition tp-seq)
409409
(into-array TopicPartition)
410410
(.pause consumer)))
@@ -420,7 +420,7 @@ clj-kafka-x.consumers.simple
420420
(resume consumer {:topic \"topic-a\" :partition 2}
421421
{:topic \"topic-b\" :partition 0})
422422
"
423-
[^KafkaConsumer consumer tp-seq]
423+
[^Consumer consumer tp-seq]
424424
(->> (map map->topic-partition tp-seq)
425425
(into-array TopicPartition)
426426
(.resume consumer)))
@@ -445,5 +445,5 @@ clj-kafka-x.consumers.simple
445445
;; :tags {\"client-id\" \"consumer-3\"},
446446
;; :value 0.0}]
447447
"
448-
[^KafkaConsumer consumer]
448+
[^Consumer consumer]
449449
(metrics->map (.metrics consumer)))

src/clj_kafka_x/producer.clj

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
(:refer-clojure :exclude [send flush])
66
(:require [clj-kafka-x.data :refer :all])
77
(:import [java.util.concurrent Future TimeUnit TimeoutException]
8-
[org.apache.kafka.clients.producer Callback KafkaProducer ProducerRecord RecordMetadata]
8+
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord RecordMetadata]
99
[org.apache.kafka.common Metric MetricName]
1010
(org.apache.kafka.common.serialization Serializer ByteArraySerializer StringSerializer)
1111
(java.util Map)))
@@ -95,18 +95,18 @@
9595
;; => #object[string representation of future object]
9696
;; Metadata-> {:topic topic-unknown, :partition 4, :offset 1} Exception-> nil
9797
"
98-
([^KafkaProducer producer record]
98+
([^Producer producer record]
9999
(let [fut (.send producer record)]
100100
(map-future-val fut to-clojure)))
101-
([^KafkaProducer producer record callback]
101+
([^Producer producer record callback]
102102
(let [fut (.send producer record (reify Callback
103103
(onCompletion [_ metadata exception]
104104
(callback (and metadata (to-clojure metadata)) exception))))]
105105
(map-future-val fut to-clojure))))
106106

107107
(defn flush
108108
"See: http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()"
109-
[^KafkaProducer producer]
109+
[^Producer producer]
110110
(.flush producer))
111111

112112
(defn close
@@ -116,9 +116,9 @@
116116
117117
- http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()
118118
- http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close(long,%20java.util.concurrent.TimeUnit)"
119-
([^KafkaProducer producer]
119+
([^Producer producer]
120120
(.close producer))
121-
([^KafkaProducer producer timeout-ms]
121+
([^Producer producer timeout-ms]
122122
(.close producer timeout-ms TimeUnit/MILLISECONDS)))
123123

124124
(defn partitions
@@ -143,7 +143,7 @@
143143
;; :in-sync-replicas [{:id 1, :host \"172.17.0.4\", :port 9092}
144144
;; {:id 2, :host \"172.17.0.3\", :port 9093}]}]
145145
"
146-
[^KafkaProducer producer topic]
146+
[^Producer producer topic]
147147
(mapv to-clojure (.partitionsFor producer topic)))
148148

149149
(defn metrics
@@ -170,6 +170,6 @@
170170
;; :tags {\"client-id\" \"producer-2\", \"node-id\" \"node-3\"},
171171
;; :value 0.23866348448687352}]
172172
"
173-
[^KafkaProducer producer]
173+
[^Producer producer]
174174
(metrics->map (.metrics producer))
175175
)

0 commit comments

Comments
 (0)