Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions python/monarch/_src/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# pyre-unsafe

import hashlib
import logging
import os
import pickle
Expand All @@ -15,6 +16,7 @@
import sys
import tempfile
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Literal, NamedTuple, Optional, Sequence

from monarch._src.actor.bootstrap import attach_to_workers
Expand Down Expand Up @@ -118,7 +120,7 @@ def apply(self, client_script: Optional[str] = None):
def active(self) -> bool:
return self._running is not None

def state(self, cached_path: Optional[str] = ".monarch/job_state.pkl") -> JobState:
def state(self, cached_path: Optional[str] = "default") -> JobState:
"""
Get the current state of this job, containing the host mesh objects of its requires that were requested
host_meshes = self.state()
Expand All @@ -128,14 +130,24 @@ def state(self, cached_path: Optional[str] = ".monarch/job_state.pkl") -> JobSta
host_meshes.dataloaders
This is a dictionary so that meshes can hold different machine types.

cached_path: if cached_path is not None and the job has yet to be applied,
cached_path: Path to cache job state. Options:
- None: Don't cache (job state won't be saved)
- "default": Cache in ~/.monarch/job_cache/{cwd_hash}/job_state.pkl (unique per working directory)
- str: Explicit path to cache file
If cached_path is not None and the job has yet to be applied,
we will first check `cached_path` for an existing created job state.
If it exists and `saved_job.can_run(self)`, we will connect to the cached job.
If it exists and `saved_job.can_run(self)`, we will connect to the cached job.
Otherwise, we will apply this job and connect to it, saving the job in `cached_path` if it is not None.


Raises: JobExpiredException - when the job has finished and this connection cannot be made.
"""
if cached_path == "default":
# Create unique cache location based on current working directory to use with different codebases
cwd_hash = hashlib.sha256(os.getcwd().encode()).hexdigest()[:8]
cached_path = str(
Path.home() / ".monarch" / "job_cache" / cwd_hash / "job_state.pkl"
)
# this is implemented uniquely for each scheduler, but it will ultimately make
# calls to attach_to_workers and return the HostMeshes
running_job = self._running
Expand Down Expand Up @@ -270,7 +282,12 @@ def _create(self, client_script: Optional[str]):
return # noop, because LocalJob always 'exists'

b = BatchJob(self)
b.dump(".monarch/job_state.pkl")
# Use default cache location (unique per working directory)
cwd_hash = hashlib.sha256(os.getcwd().encode()).hexdigest()[:8]
cache_path = str(
Path.home() / ".monarch" / "job_cache" / cwd_hash / "job_state.pkl"
)
b.dump(cache_path)

log_dir = self._setup_log_directory()
self._run_client_as_daemon(client_script, log_dir)
Expand All @@ -282,7 +299,7 @@ def _create(self, client_script: Optional[str]):

def _setup_log_directory(self) -> str:
"""Create a log directory for the batch job."""
log_base_dir = ".monarch/logs"
log_base_dir = str(Path.home() / ".monarch" / "logs")
os.makedirs(log_base_dir, exist_ok=True)
# Create a unique subdirectory for this job run
self._log_dir = tempfile.mkdtemp(prefix="job_", dir=log_base_dir)
Expand Down