diff --git a/python/monarch/_src/job/job.py b/python/monarch/_src/job/job.py index 31d34ea9d..9f35668ba 100644 --- a/python/monarch/_src/job/job.py +++ b/python/monarch/_src/job/job.py @@ -6,6 +6,7 @@ # pyre-unsafe +import hashlib import logging import os import pickle @@ -15,6 +16,7 @@ import sys import tempfile from abc import ABC, abstractmethod +from pathlib import Path from typing import Dict, List, Literal, NamedTuple, Optional, Sequence from monarch._src.actor.bootstrap import attach_to_workers @@ -118,7 +120,7 @@ def apply(self, client_script: Optional[str] = None): def active(self) -> bool: return self._running is not None - def state(self, cached_path: Optional[str] = ".monarch/job_state.pkl") -> JobState: + def state(self, cached_path: Optional[str] = "default") -> JobState: """ Get the current state of this job, containing the host mesh objects of its requires that were requested host_meshes = self.state() @@ -128,14 +130,24 @@ def state(self, cached_path: Optional[str] = ".monarch/job_state.pkl") -> JobSta host_meshes.dataloaders This is a dictionary so that meshes can hold different machine types. - cached_path: if cached_path is not None and the job has yet to be applied, + cached_path: Path to cache job state. Options: + - None: Don't cache (job state won't be saved) + - "default": Cache in ~/.monarch/job_cache/{cwd_hash}/job_state.pkl (unique per working directory) + - str: Explicit path to cache file + If cached_path is not None and the job has yet to be applied, we will first check `cached_path` for an existing created job state. - If it exists and `saved_job.can_run(self)`, we will connect to the cached job. + If it exists and `saved_job.can_run(self)`, we will connect to the cached job. Otherwise, we will apply this job and connect to it, saving the job in `cached_path` if it is not None. Raises: JobExpiredException - when the job has finished and this connection cannot be made. """ + if cached_path == "default": + # Create unique cache location based on current working directory to use with different codebases + cwd_hash = hashlib.sha256(os.getcwd().encode()).hexdigest()[:8] + cached_path = str( + Path.home() / ".monarch" / "job_cache" / cwd_hash / "job_state.pkl" + ) # this is implemented uniquely for each scheduler, but it will ultimately make # calls to attach_to_workers and return the HostMeshes running_job = self._running @@ -270,7 +282,12 @@ def _create(self, client_script: Optional[str]): return # noop, because LocalJob always 'exists' b = BatchJob(self) - b.dump(".monarch/job_state.pkl") + # Use default cache location (unique per working directory) + cwd_hash = hashlib.sha256(os.getcwd().encode()).hexdigest()[:8] + cache_path = str( + Path.home() / ".monarch" / "job_cache" / cwd_hash / "job_state.pkl" + ) + b.dump(cache_path) log_dir = self._setup_log_directory() self._run_client_as_daemon(client_script, log_dir) @@ -282,7 +299,7 @@ def _create(self, client_script: Optional[str]): def _setup_log_directory(self) -> str: """Create a log directory for the batch job.""" - log_base_dir = ".monarch/logs" + log_base_dir = str(Path.home() / ".monarch" / "logs") os.makedirs(log_base_dir, exist_ok=True) # Create a unique subdirectory for this job run self._log_dir = tempfile.mkdtemp(prefix="job_", dir=log_base_dir)