|
1 | 1 | (ns durable-queue |
2 | 2 | (:require |
3 | | - [clojure.java.io :as io] |
4 | | - [byte-streams :as bs] |
5 | | - [clojure.string :as str] |
6 | | - [primitive-math :as p] |
7 | | - [taoensso.nippy :as nippy]) |
| 3 | + [clj-commons.byte-streams :as bs] |
| 4 | + [clj-commons.primitive-math :as p] |
| 5 | + [clojure.java.io :as io] |
| 6 | + [taoensso.nippy :as nippy]) |
8 | 7 | (:import |
9 | | - [java.lang.reflect |
10 | | - Method |
11 | | - Field] |
12 | | - [java.util.concurrent |
13 | | - LinkedBlockingQueue |
14 | | - TimeoutException |
15 | | - TimeUnit] |
16 | | - [java.util.concurrent.atomic |
| 8 | + [java.io |
| 9 | + File |
| 10 | + IOException |
| 11 | + RandomAccessFile |
| 12 | + Writer] |
| 13 | + [java.lang.ref |
| 14 | + WeakReference] |
| 15 | + [java.lang.reflect |
| 16 | + Method] |
| 17 | + [java.nio ByteBuffer MappedByteBuffer] |
| 18 | + [java.nio.channels |
| 19 | + FileChannel$MapMode] |
| 20 | + [java.util.concurrent LinkedBlockingQueue TimeUnit TimeoutException] |
| 21 | + [java.util.concurrent.atomic |
17 | 22 | AtomicLong] |
18 | | - [java.util.zip |
19 | | - CRC32] |
20 | | - [java.util.concurrent.locks |
| 23 | + [java.util.concurrent.locks |
21 | 24 | ReentrantReadWriteLock] |
22 | | - [java.io |
23 | | - Writer |
24 | | - File |
25 | | - RandomAccessFile |
26 | | - IOException] |
27 | | - [java.nio.channels |
28 | | - FileChannel |
29 | | - FileChannel$MapMode] |
30 | | - [java.nio |
31 | | - ByteBuffer |
32 | | - MappedByteBuffer] |
33 | | - [java.lang.ref |
34 | | - WeakReference])) |
| 25 | + [java.util.zip |
| 26 | + CRC32])) |
35 | 27 |
|
36 | 28 | ;;; |
37 | 29 |
|
|
78 | 70 | (^:private sync! [_]) |
79 | 71 | (^:private invalidate [_ offset len]) |
80 | 72 | (^:private ^ByteBuffer buffer [_]) |
81 | | - (^:private append-to-slab! [_ descriptor]) |
| 73 | + (^:private append-to-slab! [_ task-descriptor]) |
82 | 74 | (^:private read-write-lock [_])) |
83 | 75 |
|
84 | 76 | (defmacro ^:private with-buffer [[buf slab] & body] |
|
139 | 131 | (.invoke clean |
140 | 132 | (.invoke cleaner buf nil) |
141 | 133 | nil)) |
142 | | - (catch Throwable e |
| 134 | + (catch Throwable _ |
143 | 135 | ;; not much we can do here, sadly |
144 | 136 | ))))) |
145 | 137 |
|
146 | 138 | (defn- force-buffer |
147 | | - [^MappedByteBuffer buf offset length] |
| 139 | + [^MappedByteBuffer buf _offset _length] |
148 | 140 | (.force buf)) |
149 | 141 |
|
150 | 142 | ;;; |
|
232 | 224 | (lazy-seq |
233 | 225 | (with-buffer [buf slab] |
234 | 226 | (let [^ByteBuffer buf' (.position buf (p/inc pos)) |
235 | | - status (.get buf') |
236 | | - checksum (.getLong buf') |
| 227 | + _status (.get buf') |
| 228 | + _checksum (.getLong buf') |
237 | 229 | size (.getInt buf')] |
238 | 230 |
|
239 | 231 | ;; this shouldn't be necessary, but let's not gratuitously |
|
249 | 241 | (slab->task-seq |
250 | 242 | slab |
251 | 243 | (+ pos header-size size))))))))) |
252 | | - (catch Throwable e |
| 244 | + (catch Throwable _ |
253 | 245 | ;; this implies unrecoverable corruption |
254 | 246 | nil |
255 | 247 | ))))) |
|
268 | 260 | (read-write-lock [_] |
269 | 261 | lock) |
270 | 262 |
|
271 | | - (buffer [this] |
| 263 | + (buffer [_] |
272 | 264 | (let [buf (or @buf |
273 | 265 | (swap! buf |
274 | 266 | (fn [buf] |
|
299 | 291 | (compare-and-set! dirty [start end] [Integer/MAX_VALUE 0]) |
300 | 292 | nil))))) |
301 | 293 |
|
302 | | - (append-to-slab! [this descriptor] |
| 294 | + (append-to-slab! [this task-descriptor] |
303 | 295 | (with-buffer [buf this] |
304 | | - (let [ary (nippy/freeze descriptor) |
| 296 | + (let [ary (nippy/freeze task-descriptor) |
305 | 297 | cnt (count ary) |
306 | 298 | pos @position |
307 | 299 | ^ByteBuffer buf (.position buf ^Long pos)] |
|
515 | 507 | queue-name->current-slab (atom {}) |
516 | 508 |
|
517 | 509 | ;; initialize |
518 | | - slabs (->> @queue-name->slabs vals (apply concat)) |
519 | | - slab->count (zipmap |
520 | | - slabs |
521 | | - (map #(atom (count (seq %))) slabs)) |
522 | 510 | create-new-slab (fn [q-name] |
523 | 511 | (let [slab (create-slab directory q-name (queue q-name) slab-size) |
524 | 512 | empty-slabs (->> (@queue-name->slabs q-name) |
|
566 | 554 | (fsync q) |
567 | 555 | (let [end (System/currentTimeMillis)] |
568 | 556 | (Thread/sleep (long (max 0 (- fsync-interval (- end start))))))) |
569 | | - (catch Throwable e |
570 | | - ))))))) |
| 557 | + (catch Throwable _))))))) |
571 | 558 |
|
572 | 559 | ;; populate queues with pre-existing tasks |
573 | 560 | (let [empty-slabs (atom #{})] |
|
620 | 607 |
|
621 | 608 | IQueues |
622 | 609 |
|
623 | | - (delete! [this] |
| 610 | + (delete! [_] |
624 | 611 | (doseq [s (->> @queue-name->slabs vals (apply concat))] |
625 | 612 | (unmap s) |
626 | 613 | (delete-slab s))) |
|
654 | 641 | (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) |
655 | 642 | ks)))) |
656 | 643 |
|
657 | | - (take! [this q-name timeout timeout-val] |
| 644 | + (take! [_ q-name timeout timeout-val] |
658 | 645 | (let [q-name (munge (name q-name)) |
659 | 646 | ^LinkedBlockingQueue q (queue q-name)] |
660 | 647 | (try |
|
704 | 691 |
|
705 | 692 | (when-not task |
706 | 693 | (throw |
707 | | - (IllegalArgumentException. |
708 | | - (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) |
| 694 | + (IllegalArgumentException. |
| 695 | + (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) |
709 | 696 |
|
710 | 697 | (when fsync-put? |
711 | 698 | (sync! slab)) |
|
715 | 702 | (if (zero? timeout) |
716 | 703 | (.offer q task) |
717 | 704 | (.offer q task timeout TimeUnit/MILLISECONDS)))] |
718 | | - (if-let [val (locking q |
719 | | - (queue! |
720 | | - (vary-meta (slab!) assoc |
721 | | - ::this this-ref |
722 | | - ::queue-name q-name |
723 | | - ::queue q |
724 | | - ::fsync? fsync-take?)))] |
| 705 | + (if (locking q |
| 706 | + (queue! |
| 707 | + (vary-meta (slab!) assoc |
| 708 | + ::this this-ref |
| 709 | + ::queue-name q-name |
| 710 | + ::queue q |
| 711 | + ::fsync? fsync-take?))) |
725 | 712 | (do |
726 | 713 | (populate-stats! q-name) |
727 | 714 | (let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])] |
|
0 commit comments