diff --git a/mesa/experimental/devs/__init__.py b/mesa/experimental/devs/__init__.py index 6a486558e7c..a3f19f7f2ee 100644 --- a/mesa/experimental/devs/__init__.py +++ b/mesa/experimental/devs/__init__.py @@ -19,6 +19,6 @@ """ from .eventlist import Priority, SimulationEvent -from .simulator import ABMSimulator, DEVSimulator +from .simulator import ABMSimulator, DEVSimulator, Simulator -__all__ = ["ABMSimulator", "DEVSimulator", "Priority", "SimulationEvent"] +__all__ = ["ABMSimulator", "DEVSimulator", "Priority", "SimulationEvent", "Simulator"] diff --git a/mesa/experimental/devs/simulator.py b/mesa/experimental/devs/simulator.py index 0b9a45eeb2b..ba1f848f39c 100644 --- a/mesa/experimental/devs/simulator.py +++ b/mesa/experimental/devs/simulator.py @@ -23,6 +23,7 @@ from __future__ import annotations import numbers +import warnings from collections.abc import Callable from typing import TYPE_CHECKING, Any @@ -61,9 +62,17 @@ def __init__(self, time_unit: type, start_time: int | float): self.event_list = EventList() self.start_time = start_time self.time_unit = time_unit - - self.time = self.start_time - self.model = None + self.model: Model | None = None + + @property + def time(self) -> float: + """Simulator time (deprecated).""" + warnings.warn( + "simulator.time is deprecated, use model.time instead", + DeprecationWarning, + stacklevel=2, + ) + return self.model.time def check_time_unit(self, time: int | float) -> bool: ... # noqa: D102 @@ -78,22 +87,24 @@ def setup(self, model: Model) -> None: Exception if event list is not empty """ - if self.time != self.start_time: + if model.time != self.start_time: raise ValueError( - "trying to setup model, but current time is not equal to start_time, Has the simulator been reset or freshly initialized?" + f"Model time ({model.time}) does not match simulator start_time ({self.start_time}). " + "Has the model already been run?" ) if not self.event_list.is_empty(): - raise ValueError( - "trying to setup model, but events have already been scheduled. Call simulator.setup before any scheduling" - ) + raise ValueError("Events already scheduled. Call setup before scheduling.") self.model = model + model._simulator = self # Register simulator with model def reset(self): - """Reset the simulator by clearing the event list and removing the model to simulate.""" + """Reset the simulator.""" self.event_list.clear() - self.model = None - self.time = self.start_time + if self.model is not None: + self.model._simulator = None + self.model.time = self.start_time + self.model = None def run_until(self, end_time: int | float) -> None: """Run the simulator until the end time. @@ -106,23 +117,23 @@ def run_until(self, end_time: int | float) -> None: """ if self.model is None: - raise Exception( - "simulator has not been setup, call simulator.setup(model) first" + raise RuntimeError( + "Simulator not set up. Call simulator.setup(model) first." ) while True: try: event = self.event_list.pop_event() - except IndexError: # event list is empty - self.time = end_time + except IndexError: + self.model.time = end_time break if event.time <= end_time: - self.time = event.time + self.model.time = event.time event.execute() else: - self.time = end_time - self._schedule_event(event) # reschedule event + self.model.time = end_time + self._schedule_event(event) break def run_next_event(self): @@ -133,17 +144,17 @@ def run_next_event(self): """ if self.model is None: - raise Exception( - "simulator has not been setup, call simulator.setup(model) first" + raise RuntimeError( + "Simulator not set up. Call simulator.setup(model) first." ) try: event = self.event_list.pop_event() - except IndexError: # event list is empty + except IndexError: return - else: - self.time = event.time - event.execute() + + self.model.time = event.time + event.execute() def run_for(self, time_delta: int | float): """Run the simulator for the specified time delta. @@ -154,7 +165,7 @@ def run_for(self, time_delta: int | float): """ # fixme, raise initialization error or something like it if model.setup has not been called - end_time = self.time + time_delta + end_time = self.model.time + time_delta self.run_until(end_time) def schedule_event_now( @@ -240,7 +251,7 @@ def schedule_event_relative( """ event = SimulationEvent( - self.time + time_delta, + self.model.time + time_delta, function, priority=priority, function_args=function_args, @@ -344,21 +355,21 @@ def run_until(self, end_time: int) -> None: """ if self.model is None: - raise Exception( - "simulator has not been setup, call simulator.setup(model) first" + raise RuntimeError( + "Simulator not set up. Call simulator.setup(model) first." ) while True: try: event = self.event_list.pop_event() except IndexError: - self.time = end_time + self.model.time = float(end_time) break - # fixme: the alternative would be to wrap model.step with an annotation which - # handles this scheduling. if event.time <= end_time: - self.time = event.time + self.model.time = float(event.time) + + # Reschedule model.step for next tick if this is a step event if event.fn() == self.model.step: self.schedule_event_next_tick( self.model.step, priority=Priority.HIGH @@ -366,7 +377,7 @@ def run_until(self, end_time: int) -> None: event.execute() else: - self.time = end_time + self.model.time = float(end_time) self._schedule_event(event) break diff --git a/mesa/model.py b/mesa/model.py index f53d92b1633..43c3677fc63 100644 --- a/mesa/model.py +++ b/mesa/model.py @@ -17,6 +17,7 @@ import numpy as np from mesa.agent import Agent, AgentSet +from mesa.experimental.devs import Simulator from mesa.mesa_logging import create_module_logger, method_logger SeedLike = int | np.integer | Sequence[int] | np.random.SeedSequence @@ -52,6 +53,7 @@ def __init__( *args: Any, seed: float | None = None, rng: RNGLike | SeedLike | None = None, + step_duration: float = 1.0, **kwargs: Any, ) -> None: """Create a new model. @@ -65,6 +67,7 @@ def __init__( rng : Pseudorandom number generator state. When `rng` is None, a new `numpy.random.Generator` is created using entropy from the operating system. Types other than `numpy.random.Generator` are passed to `numpy.random.default_rng` to instantiate a `Generator`. + step_duration: How much time advances each step (default 1.0) kwargs: keyword arguments to pass onto super Notes: @@ -72,8 +75,13 @@ def __init__( """ super().__init__(*args, **kwargs) - self.running = True + self.running: bool = True self.steps: int = 0 + self.time: float = 0.0 + self._step_duration: float = step_duration + + # Track if a simulator is controlling time + self._simulator: Simulator | None = None if (seed is not None) and (rng is not None): raise ValueError("you have to pass either rng or seed, not both") @@ -117,7 +125,13 @@ def _wrapped_step(self, *args: Any, **kwargs: Any) -> None: """Automatically increments time and steps after calling the user's step method.""" # Automatically increment time and step counters self.steps += 1 - _mesa_logger.info(f"calling model.step for timestep {self.steps} ") + # Only auto-increment time if no simulator is controlling it + if self._simulator is None: + self.time += self._step_duration + + _mesa_logger.info( + f"calling model.step for step {self.steps} at time {self.time}" + ) # Call the original user-defined step method self._user_step(*args, **kwargs) diff --git a/tests/test_devs.py b/tests/test_devs.py index 883a74d1375..bf962c66ca7 100644 --- a/tests/test_devs.py +++ b/tests/test_devs.py @@ -14,12 +14,12 @@ def test_devs_simulator(): simulator = DEVSimulator() # setup - model = MagicMock(spec=Model) + model = Model() simulator.setup(model) assert len(simulator.event_list) == 0 assert simulator.model == model - assert simulator.time == 0 + assert model.time == 0.0 # schedule_event_now fn1 = MagicMock() @@ -43,30 +43,30 @@ def test_devs_simulator(): simulator.run_for(0.8) fn1.assert_called_once() fn3.assert_called_once() - assert simulator.time == 0.8 + assert model.time == 0.8 simulator.run_for(0.2) fn2.assert_called_once() - assert simulator.time == 1.0 + assert model.time == 1.0 simulator.run_for(0.2) - assert simulator.time == 1.2 + assert model.time == 1.2 with pytest.raises(ValueError): simulator.schedule_event_absolute(fn2, 0.5) # step simulator = DEVSimulator() - model = MagicMock(spec=Model) + model = Model() simulator.setup(model) fn = MagicMock() simulator.schedule_event_absolute(fn, 1.0) simulator.run_next_event() fn.assert_called_once() - assert simulator.time == 1.0 + assert model.time == 1.0 simulator.run_next_event() - assert simulator.time == 1.0 + assert model.time == 1.0 simulator = DEVSimulator() with pytest.raises(Exception): @@ -74,7 +74,7 @@ def test_devs_simulator(): # cancel_event simulator = DEVSimulator() - model = MagicMock(spec=Model) + model = Model() simulator.setup(model) fn = MagicMock() event = simulator.schedule_event_relative(fn, 0.5) @@ -85,7 +85,6 @@ def test_devs_simulator(): simulator.reset() assert len(simulator.event_list) == 0 assert simulator.model is None - assert simulator.time == 0.0 # run without setup simulator = DEVSimulator() @@ -94,15 +93,16 @@ def test_devs_simulator(): # setup with time advanced simulator = DEVSimulator() - simulator.time = simulator.start_time + 1 - model = MagicMock(spec=Model) - with pytest.raises(Exception): + model = Model() + model.time = 1.0 # Advance time before setup + with pytest.raises(ValueError): simulator.setup(model) # setup with event scheduled simulator = DEVSimulator() - simulator.schedule_event_now(Mock()) - with pytest.raises(Exception): + model = Model() + simulator.event_list.add_event(SimulationEvent(1.0, Mock(), Priority.DEFAULT)) + with pytest.raises(ValueError): simulator.setup(model) @@ -111,7 +111,7 @@ def test_abm_simulator(): simulator = ABMSimulator() # setup - model = MagicMock(spec=Model) + model = Model() simulator.setup(model) # schedule_event_next_tick @@ -120,8 +120,8 @@ def test_abm_simulator(): assert len(simulator.event_list) == 2 simulator.run_for(3) - assert model.step.call_count == 3 - assert simulator.time == 3 + assert model.steps == 3 + assert model.time == 3.0 # run without setup simulator = ABMSimulator() @@ -129,6 +129,16 @@ def test_abm_simulator(): simulator.run_until(10) +def test_simulator_time_deprecation(): + """Test that simulator.time emits deprecation warning.""" + simulator = DEVSimulator() + model = Model() + simulator.setup(model) + + with pytest.warns(DeprecationWarning, match="simulator.time is deprecated"): + _ = simulator.time + + def test_simulation_event(): """Tests for SimulationEvent class.""" some_test_function = MagicMock() diff --git a/tests/test_model.py b/tests/test_model.py index a7e054d12b3..614161cc105 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -3,6 +3,7 @@ import numpy as np from mesa.agent import Agent, AgentSet +from mesa.experimental.devs.simulator import DEVSimulator from mesa.model import Model @@ -11,24 +12,80 @@ def test_model_set_up(): model = Model() assert model.running is True assert model.steps == 0 + assert model.time == 0.0 + assert model._step_duration == 1.0 + assert model._simulator is None + + model.step() + assert model.steps == 1 + assert model.time == 1.0 + + +def test_model_time_increment(): + """Test that time increments correctly with steps.""" + model = Model() + + for i in range(5): + model.step() + assert model.steps == i + 1 + assert model.time == float(i + 1) + + +def test_model_step_duration(): + """Test custom step_duration.""" + # Default step_duration + model = Model() + model.step() + assert model.time == 1.0 + + # Custom step_duration + model = Model(step_duration=0.25) + assert model._step_duration == 0.25 + model.step() assert model.steps == 1 + assert model.time == 0.25 + + model.step() + assert model.steps == 2 + assert model.time == 0.5 + + # Larger step_duration + model = Model(step_duration=10.0) + model.step() + assert model.time == 10.0 + model.step() + assert model.time == 20.0 + + +def test_model_time_with_simulator(): + """Test that simulator controls time when attached.""" + model = Model() + simulator = DEVSimulator() + simulator.setup(model) + + # Simulator is now attached + assert model._simulator is simulator + + # Time should not auto-increment when simulator is attached + # (In practice, the simulator controls stepping, but we can test the flag) + model._user_step() # Call user step directly to avoid wrapped_step + # Time unchanged because simulator controls it def test_running(): """Test Model is running.""" class TestModel(Model): - steps = 0 - def step(self): - """Increase steps until 10.""" + """Stop at step 10.""" if self.steps == 10: self.running = False model = TestModel() model.run_model() assert model.steps == 10 + assert model.time == 10.0 def test_seed(seed=23): @@ -78,7 +135,7 @@ def test_reset_rng(newseed=42): def test_agent_types(): - """Test Mode.agent_types property.""" + """Test Model.agent_types property.""" class TestAgent(Agent): pass @@ -102,8 +159,8 @@ class Sheep(Agent): wolf = Wolf(model) sheep = Sheep(model) - assert model.agents_by_type[Wolf] == AgentSet([wolf], model) - assert model.agents_by_type[Sheep] == AgentSet([sheep], model) + assert model.agents_by_type[Wolf] == AgentSet([wolf], random=model.random) + assert model.agents_by_type[Sheep] == AgentSet([sheep], random=model.random) assert len(model.agents_by_type) == 2