66
77# pyre-unsafe
88
9+ import hashlib
910import logging
1011import os
1112import pickle
1516import sys
1617import tempfile
1718from abc import ABC , abstractmethod
19+ from pathlib import Path
1820from typing import Dict , List , Literal , NamedTuple , Optional , Sequence
1921
2022from monarch ._src .actor .bootstrap import attach_to_workers
@@ -118,7 +120,7 @@ def apply(self, client_script: Optional[str] = None):
118120 def active (self ) -> bool :
119121 return self ._running is not None
120122
121- def state (self , cached_path : Optional [str ] = ".monarch/job_state.pkl " ) -> JobState :
123+ def state (self , cached_path : Optional [str ] = "default " ) -> JobState :
122124 """
123125 Get the current state of this job, containing the host mesh objects of its requires that were requested
124126 host_meshes = self.state()
@@ -128,14 +130,24 @@ def state(self, cached_path: Optional[str] = ".monarch/job_state.pkl") -> JobSta
128130 host_meshes.dataloaders
129131 This is a dictionary so that meshes can hold different machine types.
130132
131- cached_path: if cached_path is not None and the job has yet to be applied,
133+ cached_path: Path to cache job state. Options:
134+ - None: Don't cache (job state won't be saved)
135+ - "default": Cache in ~/.monarch/job_cache/{cwd_hash}/job_state.pkl (unique per working directory)
136+ - str: Explicit path to cache file
137+ If cached_path is not None and the job has yet to be applied,
132138 we will first check `cached_path` for an existing created job state.
133- If it exists and `saved_job.can_run(self)`, we will connect to the cached job.
139+ If it exists and `saved_job.can_run(self)`, we will connect to the cached job.
134140 Otherwise, we will apply this job and connect to it, saving the job in `cached_path` if it is not None.
135141
136142
137143 Raises: JobExpiredException - when the job has finished and this connection cannot be made.
138144 """
145+ if cached_path == "default" :
146+ # Create unique cache location based on current working directory to use with different codebases
147+ cwd_hash = hashlib .sha256 (os .getcwd ().encode ()).hexdigest ()[:8 ]
148+ cached_path = str (
149+ Path .home () / ".monarch" / "job_cache" / cwd_hash / "job_state.pkl"
150+ )
139151 # this is implemented uniquely for each scheduler, but it will ultimately make
140152 # calls to attach_to_workers and return the HostMeshes
141153 running_job = self ._running
@@ -270,7 +282,12 @@ def _create(self, client_script: Optional[str]):
270282 return # noop, because LocalJob always 'exists'
271283
272284 b = BatchJob (self )
273- b .dump (".monarch/job_state.pkl" )
285+ # Use default cache location (unique per working directory)
286+ cwd_hash = hashlib .sha256 (os .getcwd ().encode ()).hexdigest ()[:8 ]
287+ cache_path = str (
288+ Path .home () / ".monarch" / "job_cache" / cwd_hash / "job_state.pkl"
289+ )
290+ b .dump (cache_path )
274291
275292 log_dir = self ._setup_log_directory ()
276293 self ._run_client_as_daemon (client_script , log_dir )
@@ -282,7 +299,7 @@ def _create(self, client_script: Optional[str]):
282299
283300 def _setup_log_directory (self ) -> str :
284301 """Create a log directory for the batch job."""
285- log_base_dir = ".monarch/ logs"
302+ log_base_dir = str ( Path . home () / ".monarch" / " logs")
286303 os .makedirs (log_base_dir , exist_ok = True )
287304 # Create a unique subdirectory for this job run
288305 self ._log_dir = tempfile .mkdtemp (prefix = "job_" , dir = log_base_dir )
0 commit comments