Skip to content

Commit 51799ec

Browse files
committed
Add support for queuing source blocks
Instead of sending execute requests immediately when executing multiple source blocks in one go, they are queued such that when an execute_request has received an execute_reply, the next one queued up is sent. If one of the source blocks errors during its execution, all of the queued blocks get aborted. That is they do not get sent to the kernel. closes #498 * README.org: Describe feature. * test/jupyter-test.el (org-babel-src-block-queuing): New test. * jupyter-monads.el (jupyter-message-publisher): Publish non-Jupyter messages as is without processing them. (jupyter-request): Only handle Jupyter messages in the relevant subscriber. * jupyter-org-client.el (jupyter-org-queue-requests): New customization option to determine if client-side queuing should take place. (most-recent-request, last-queued-request): New slots to a `jupyter-org-client`, (jupyter-org-abort): New function that aborts a queued execute request. (jupyter-org-maybe-queued): New function that does the queuing of requests if it's necessary. * ob-jupyter.el (org-babel-jupyter--execute): Use `jupyter-org-maybe-queued`.
1 parent e145a79 commit 51799ec

File tree

5 files changed

+220
-30
lines changed

5 files changed

+220
-30
lines changed

README.org

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,18 @@ This behavior can be suppressed by setting =jupyter-org-auto-connect=
581581
to =nil=. In this case, a connection is attempted upon executing a
582582
source block, for example.
583583

584+
*** Enable client-side queuing of requests
585+
586+
If the customizable variable =jupyter-org-queue-requests= is non-nil,
587+
then perform client side queuing of source block execute requests.
588+
This means that when multiple requests are made, for example by
589+
executing a subtree, the requests are queued locally in Emacs instead
590+
of sending all the requests immediately to the kernel as would happen
591+
when =:async yes= is specified on all the source blocks. It is only
592+
when one request finishes that the next is sent. In addition, if any
593+
request fails all the queued requests that are meant to come after it
594+
are aborted and do not get sent to the kernel.
595+
584596
** Kernel/notebook server
585597
*** Managing live kernels
586598

jupyter-monads.el

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ the callbacks."
400400
(jupyter-publisher
401401
(lambda (msg)
402402
(pcase (jupyter-message-type msg)
403+
;; Send what doesn't appear to be a message as is.
404+
((pred null) (jupyter-content msg))
403405
;; A status message after a request goes idle means there is
404406
;; a new request and there will, theoretically, be no more
405407
;; messages for the idle one.
@@ -466,8 +468,10 @@ list, represents."
466468
(jupyter-subscribe
467469
(jupyter-subscriber
468470
(lambda (msg)
469-
(let ((channel (plist-get msg :channel)))
470-
(jupyter-handle-message client channel msg)))))))
471+
;; Only handle what looks to be a Jupyter message.
472+
(when (jupyter-message-type msg)
473+
(let ((channel (plist-get msg :channel)))
474+
(jupyter-handle-message client channel msg))))))))
471475
(cons req client)))))
472476

473477
(provide 'jupyter-monads)

jupyter-org-client.el

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,26 @@ session by, for example, executing a src-block."
7272
:group 'ob-jupyter
7373
:type 'boolean)
7474

75+
(defcustom jupyter-org-queue-requests nil
76+
"Whether or not source block evaluations should be queued.
77+
When this variable is nil and, for example, multiple source
78+
blocks are executed in rapid succession the underlying
79+
\"execute_request\" messages are sent to the kernel immediately
80+
and are queued on the kernel side so that when one of the source
81+
blocks raises an error, the kernel will typically just execute
82+
the next \"execute_request\" message queued up so the effect is
83+
that source blocks that come after the failed one are executed.
84+
Some may find this behavior undesirable.
85+
86+
Instead, when this variable is non-nil, the \"execute_request\"
87+
messages of the source blocks are queued on the client side and
88+
whenever one of the source blocks raises an error, all of the
89+
queued \"execute_request\" messages are aborted and don't get
90+
sent to the kernel so the effect is that source blocks that come
91+
after the failed one are not executed."
92+
:group 'ob-jupyter
93+
:type 'boolean)
94+
7595
(defcustom jupyter-org-resource-directory "./.ob-jupyter/"
7696
"Directory used to store automatically generated image files.
7797
See `jupyter-org-image-file-name'."
@@ -111,7 +131,16 @@ See also the docstring of `org-image-actual-width' for more details."
111131
"MIME types handled by Jupyter Org.")
112132

113133
(defclass jupyter-org-client (jupyter-repl-client)
114-
())
134+
((most-recent-request
135+
:type (or jupyter-request null)
136+
:initform nil
137+
:initarg :most-recent-request
138+
:documentation "The most recently sent request.")
139+
(last-queued-request
140+
:type (or jupyter-request null)
141+
:initform nil
142+
:initarg :last-queued-request
143+
:documentation "The last queued request.")))
115144

116145
(cl-defstruct (jupyter-org-request
117146
(:include jupyter-request)
@@ -400,6 +429,96 @@ to."
400429
(org-with-point-at (jupyter-org-request-marker req)
401430
(run-hooks 'org-babel-after-execute-hook)))))
402431

432+
;;; Queueing requests
433+
434+
(defun jupyter-org-abort (req)
435+
"Abort REQ.
436+
Set the request as being idle. Remove any indication that REQ is
437+
a running execute_request from the Org buffer. Publish an abort
438+
message down the chain of subscribers to the REQ's message
439+
publisher to indicate that any subsequent, queued, requests
440+
should also be aborted."
441+
(setf (jupyter-request-idle-p req) t)
442+
(let ((client (jupyter-request-client req)))
443+
(when (eq (oref client last-queued-request) req)
444+
(oset client last-queued-request nil)))
445+
(jupyter-org--remove-overlay req)
446+
(jupyter-org--clear-async-indicator req)
447+
(let ((marker (jupyter-org-request-marker req)))
448+
(message (format "Source block execution in %s at position %s canceled"
449+
(buffer-name (marker-buffer marker))
450+
(marker-position marker))))
451+
(with-demoted-errors "Error while aborting subscribers: %S"
452+
(jupyter-run-with-io
453+
(jupyter-request-message-publisher req)
454+
;; Propagate the abort down the chain of queued requests.
455+
(jupyter-publish 'abort)))
456+
(jupyter-unsubscribe))
457+
458+
(defun jupyter-org-maybe-queued (dreq)
459+
"Return a monadic value that either sends or continues to delay DREQ.
460+
DREQ is an already delayed request, as returned by
461+
`jupyter-request' and friends. When the value is bound to a
462+
client, using e.g. `jupyter-run-with-client', send DREQ if there
463+
are no queued requests otherwise queue DREQ. The value returns
464+
the unboxed request contained in DREQ.
465+
466+
If the variable `jupyter-org-queue-requests' is nil, just send
467+
the request immediately instead of attempting to queue it."
468+
(if (not jupyter-org-queue-requests)
469+
(jupyter-sent dreq)
470+
(jupyter-mlet* ((client (jupyter-get-state))
471+
(req dreq))
472+
(let* ((send
473+
(lambda (req)
474+
(jupyter-run-with-client client
475+
(jupyter-mlet* ((req (jupyter-sent
476+
(jupyter-return req))))
477+
(oset client most-recent-request req)
478+
(jupyter-run-with-io
479+
(jupyter-request-message-publisher req)
480+
(jupyter-subscribe
481+
(jupyter-subscriber
482+
(lambda (msg)
483+
(when (or (eq msg 'abort)
484+
(equal (jupyter-message-type msg) "execute_reply"))
485+
(when (eq (oref client most-recent-request) req)
486+
(oset client most-recent-request nil))
487+
(jupyter-unsubscribe))))))
488+
(when (eq (oref client last-queued-request) req)
489+
(oset client last-queued-request nil))
490+
(jupyter-return req)))))
491+
(queue
492+
;; Subscribe REQ to the message publisher of QREQ such that
493+
;; REQ is sent or aborted when QREQ receives an
494+
;; execute_reply.
495+
(lambda (qreq req)
496+
(let ((pub (jupyter-request-message-publisher qreq)))
497+
(jupyter-run-with-io pub
498+
(jupyter-subscribe
499+
(jupyter-subscriber
500+
(lambda (msg)
501+
(if (eq msg 'abort)
502+
(jupyter-org-abort req)
503+
(pcase (jupyter-message-type msg)
504+
("execute_reply"
505+
(jupyter-with-message-content msg (status)
506+
(if (equal status "ok")
507+
(funcall send req)
508+
(jupyter-org-abort req)))
509+
(jupyter-unsubscribe))))))))))))
510+
(let ((mreq (oref client most-recent-request)) qreq)
511+
(cond
512+
((null mreq)
513+
(funcall send req))
514+
((setq qreq (oref client last-queued-request))
515+
(funcall queue qreq req)
516+
(oset client last-queued-request req))
517+
(t
518+
(funcall queue mreq req)
519+
(oset client last-queued-request req)))
520+
(jupyter-return req))))))
521+
403522
;;; Completion in code blocks
404523

405524
(defvar jupyter-org--src-block-cache nil

ob-jupyter.el

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -493,33 +493,34 @@ These parameters are handled internally."
493493

494494
(defun org-babel-jupyter--execute (code async-p)
495495
(jupyter-run-with-client jupyter-current-client
496-
(jupyter-mlet* ((req (jupyter-sent (jupyter-execute-request :code code))))
497-
(jupyter-return
498-
`(,req
499-
,(cond
500-
(async-p
501-
(when (bound-and-true-p org-export-current-backend)
502-
(jupyter-add-idle-sync-hook
503-
'org-babel-after-execute-hook req 'append))
504-
(if (jupyter-org-request-inline-block-p req)
505-
org-babel-jupyter-async-inline-results-pending-indicator
506-
;; This returns the message ID of REQ as an indicator
507-
;; for the pending results.
508-
(jupyter-org-pending-async-results req)))
509-
(t
510-
(jupyter-idle-sync req)
511-
(if (jupyter-org-request-inline-block-p req)
512-
;; When evaluating a source block synchronously, only the
513-
;; :execute-result will be in `jupyter-org-request-results' since
514-
;; stream results and any displayed data will be placed in a separate
515-
;; buffer.
516-
(let ((el (jupyter-org-result
517-
req (car (jupyter-org-request-results req)))))
518-
(if (stringp el) el
519-
(org-element-property :value el)))
520-
;; This returns an Org formatted string of the collected
521-
;; results.
522-
(jupyter-org-sync-results req)))))))))
496+
(let ((dreq (jupyter-execute-request :code code)))
497+
(jupyter-mlet* ((req (jupyter-org-maybe-queued dreq)))
498+
(jupyter-return
499+
`(,req
500+
,(cond
501+
(async-p
502+
(when (bound-and-true-p org-export-current-backend)
503+
(jupyter-add-idle-sync-hook
504+
'org-babel-after-execute-hook req 'append))
505+
(if (jupyter-org-request-inline-block-p req)
506+
org-babel-jupyter-async-inline-results-pending-indicator
507+
;; This returns the message ID of REQ as an indicator
508+
;; for the pending results.
509+
(jupyter-org-pending-async-results req)))
510+
(t
511+
(jupyter-idle-sync req)
512+
(if (jupyter-org-request-inline-block-p req)
513+
;; When evaluating a source block synchronously, only the
514+
;; :execute-result will be in `jupyter-org-request-results' since
515+
;; stream results and any displayed data will be placed in a separate
516+
;; buffer.
517+
(let ((el (jupyter-org-result
518+
req (car (jupyter-org-request-results req)))))
519+
(if (stringp el) el
520+
(org-element-property :value el)))
521+
;; This returns an Org formatted string of the collected
522+
;; results.
523+
(jupyter-org-sync-results req))))))))))
523524

524525
(defvar org-babel-jupyter-current-src-block-params nil
525526
"The block parameters of the most recently executed Jupyter source block.")

test/jupyter-test.el

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3048,6 +3048,60 @@ print(2)"
30483048
(forward-line)
30493049
(should (looking-at-p ": 6"))))))
30503050

3051+
(ert-deftest org-babel-src-block-queuing ()
3052+
:tags '(org)
3053+
(let ((jupyter-org-queue-requests t)
3054+
(src (apply #'format "\
3055+
* Tree
3056+
3057+
#+BEGIN_SRC jupyter-python :session %s :async yes
3058+
1
3059+
#+END_SRC
3060+
3061+
#+BEGIN_SRC jupyter-python :session %s :async yes
3062+
raise Exception(\"This is an error\")
3063+
#+END_SRC
3064+
3065+
#+BEGIN_SRC jupyter-python :session %s :async yes
3066+
3
3067+
#+END_SRC
3068+
3069+
#+BEGIN_SRC jupyter-python :session %s :async yes
3070+
4
3071+
#+END_SRC
3072+
3073+
#+BEGIN_SRC jupyter-python :session %s :async yes
3074+
5
3075+
#+END_SRC
3076+
3077+
#+BEGIN_SRC jupyter-python :session %s :async yes
3078+
6
3079+
#+END_SRC
3080+
"
3081+
(cl-loop repeat 6 collect jupyter-org-test-session))))
3082+
(jupyter-org-test
3083+
(insert src)
3084+
(let ((check-result
3085+
(lambda (result)
3086+
(let ((pos (org-babel-where-is-src-block-result)))
3087+
(save-excursion
3088+
(should pos)
3089+
(goto-char pos)
3090+
(forward-line)
3091+
(should (equal (buffer-substring
3092+
(line-beginning-position)
3093+
(line-end-position))
3094+
result)))))))
3095+
(org-babel-execute-subtree)
3096+
(while (jupyter-org-request-at-point)
3097+
(sleep-for 0.1))
3098+
(funcall check-result "")
3099+
(org-previous-block 1)
3100+
(org-babel-execute-src-block)
3101+
(while (jupyter-org-request-at-point)
3102+
(sleep-for 0.1))
3103+
(funcall check-result ": 6")))))
3104+
30513105
;; Local Variables:
30523106
;; byte-compile-warnings: (unresolved obsolete lexical)
30533107
;; eval: (and (functionp 'aggressive-indent-mode) (aggressive-indent-mode -1))

0 commit comments

Comments
 (0)