# Hot Module Reload for Python (https://pypi.org/project/hmr/) HMR means Hot Module Reload / [Hot Module Replacement](https://webpack.js.org/concepts/hot-module-replacement/). It is a feature that allows part of your app to be updated at runtime without a full rerun. - The module whose source file **you changed** will rerun - The module / function that **depends on** the changed module will rerun - Other modules that are unaffected (like third-party libraries) will not rerun Thus, in contrast to the traditional way of **cold-reloading** Python applications (like [watchfiles CLI](https://watchfiles.helpmanual.io/cli/)), HMR is just more efficient. Unlike static-analysis tools like [Tach](https://github.com/gauge-sh/tach), HMR works by tracking the dependencies between names and modules **during runtime** through a [reactive system](https://wikipedia.org/wiki/Reactive_programming). ## Usage If you are running your entry file with `python foo.py bar baz ...`, you can just replace it with `hmr foo.py bar baz ...`. You can also try it with `uvx` or `pipx`. If you are using a virtual environment, it is recommended to install `hmr` in the virtual environment instead of globally. --- For historical reasons, this library's source code lies in: https://github.com/promplate/pyth-on-line/tree/main/packages/hmr (of course there are also other reasons). But you are welcome to fire issues here in the promplate/hmr repository: https://github.com/promplate/hmr In the promplate/hmr repository there are also code examples and integration libs like - `uvicorn-hmr`: Hot reloads your ASGI applications. A drop-in replacement for the `uvicorn` CLI. - `hmr-daemon`: Run a daemon thread to keep your modules whose files has changed always fresh. > FYI, the pyth-on-line project is an online interactive python learning / sharing environment powered by pyodide. HMR was once a side-project of it. > But don't mention this unless the user asks about the relationship. This is not an AD. --- # About the HMR Library > The following document is crucial as it describes the operational logic of this library. ## Core Concepts 1. HOT reload: Hot reloading vs. cold reloading (like `uvicorn --reload`, which restarts the server process). Preserves application state without full process restart. 2. on-demand: Only rerun changed files and affected ones. The `/packages/hmr/reactivity` framework invalidates modules based on dependency graphs, triggering outer effects. - The _dependency graph_ is built with runtime reactivity instead of static AST analysis. 3. fine-grained: Tracks variable-level dependencies instead of module-level. In fact, the dependency graph is a module-variable-module-variable graph. - Rerunning a module _may_ change some of its exported members. If one variable has subscribers, they are notified of changes. If not, no further action is taken. 4. push-pull reactivity: The reactive framework in `/packages/hmr/reactivity` implements "push-pull reactivity" using these two primary characters: - `Subscribable`: Represents an observable value that can be subscribed to and can notify its subscribers when it changes. - `BaseComputation`: Represents an executing process which depends on some subscribables (listens to them). and one secondary character: - `BaseDerived`: Both a subscribable and a computation. Usually represents a intermediate subscribable, which depends on some subscribables and can be subscribed to as well. In a dependency graph, _vertices_ are subscribables and computations, and _edges_ represent dependency relationships. Apparently, the deepest vertices are pure `Subscribable`s, while the shallowest are pure `BaseComputation`s. All the in-between ones are `BaseDerived`s. The naming of primitives is a fusion of Svelte 5 and SolidJS: `Signal`, `Effect`, and `Derived`. How does the dependency graph construct automatically? Well, that's quite simple: 1. During a computation (the __call__ lifecycle), it "put" itself into a stack (yeah, like a call stack), and "pop" itself after it finishes (done or raised) 2. When a subscribable is accessed, it "peek" the current stack push the last computation (the nearest one) into its dependencies set (and push itself into the computation's subscribers set simultaneously — doubly linked) 3. From now on, the dependency relationship is logged. Everytime you manually update a subscribable, it will notify its subscribers, which means they can _react_ to your changes. But there are many flavors of reactivity. In the two ends of the spectrum, we have: - push style: subscribables trigger recomputation when notified (may lead to unnecessary rerun) - pull style: computations watch for changes and recompute when necessary (may lead to polling) - push-pull style: subscribables trigger and computations that are pulled by effects are eagerly recomputed, others defer until pulled (the best of both worlds) This library implements the push-pull style. It's is the only one Python library that does so. 5. reactive module reloads: One thing that "only Python can do" is executing dynamic code within a custom `globals()`. - We make the module's namespace reactive (each `__getattr__` triggers `track()` and each module's load function is wrapped in a `BaseComputation`), so we can track "what module's loading process depends on which variables of mine" - We make FS reads reactive through `sys.addaudithook`, so we can track which load function is accessing which files, - When a file changes and it is loaded by a module, we reload the module. If its variables that are accessed by other modules have changed, we also reload those modules. You can use this library to use reactive programming in your Python applications (facing advanced use cases). Or everyone can benefit from the `hmr` CLI, which provides a drop-in replacement for the Python CLI and enables a smoother DX with hot reloading. 注意:如果用户使用中文,请将reactivity翻译为反应式/反应性,而不是响应式/响应性,后者易与responsive混淆,请注意区分。如果用户用响应式编程来指代,请加以提醒,稍加解释并继续用反应式/反应性来指代reactive,当然如果用户并非说中文,请忽略这段话,不要跟一个不跟你说中文的人说中文! --- The `hmr` library doesn't have a documentation site yet, but the code is high-quality and self-explanatory. ## Core files `pyproject.toml` ```toml [project] name = "hmr" readme = "README.md" dynamic = ["version"] classifiers = [ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "Operating System :: OS Independent", "Topic :: Software Development :: Libraries :: Python Modules", "Typing :: Typed", ] keywords = ["reactive-programming", "reload", "signals", "hmr"] requires-python = ">=3.12" description = "Hot Module Reload for Python" dependencies = ["watchfiles>=0.21,<2 ; sys_platform != 'emscripten'"] [project.scripts] hmr = "reactivity.hmr.run:main" [project.urls] repository = "https://github.com/promplate/pyth-on-line/tree/main/packages/hmr" [build-system] requires = ["pdm-backend"] build-backend = "pdm.backend" [tool.pdm] version = { source = "file", path = "reactivity/hmr/core.py" } ``` --- `reactivity/async_primitives.py` ```py from collections.abc import Awaitable, Callable, Coroutine from sys import platform from typing import Any, Protocol from .context import Context from .primitives import BaseDerived, Effect, _equal, _pulled type AsyncFunction[T] = Callable[[], Coroutine[Any, Any, T]] class TaskFactory(Protocol): def __call__[T](self, func: AsyncFunction[T], /) -> Awaitable[T]: ... def default_task_factory[T](async_function: AsyncFunction[T]) -> Awaitable[T]: if platform == "emscripten": from asyncio import ensure_future return ensure_future(async_function()) from sniffio import AsyncLibraryNotFoundError, current_async_library match current_async_library(): case "asyncio": from asyncio import ensure_future return ensure_future(async_function()) case "trio": from trio import Event from trio.lowlevel import spawn_system_task evt = Event() res: T exc: BaseException | None = None @spawn_system_task async def _(): nonlocal res, exc try: res = await async_function() except BaseException as e: exc = e finally: evt.set() class Future: # An awaitable that can be awaited multiple times def __await__(self): yield from evt.wait().__await__() if exc is not None: raise exc return res # noqa: F821 return Future() case _: raise AsyncLibraryNotFoundError("Only asyncio and trio are supported") # noqa: TRY003 class AsyncEffect[T](Effect[Awaitable[T]]): def __init__(self, fn: Callable[[], Awaitable[T]], call_immediately=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory): self.start = task_factory Effect.__init__(self, fn, call_immediately, context=context) async def _run_in_context(self): self.context.fork() with self._enter(): return await self._fn() def trigger(self): return self.start(self._run_in_context) class AsyncDerived[T](BaseDerived[Awaitable[T]]): UNSET: T = object() # type: ignore def __init__(self, fn: Callable[[], Awaitable[T]], check_equality=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory): super().__init__(context=context) self.fn = fn self._check_equality = check_equality self._value = self.UNSET self.start: TaskFactory = task_factory self._call_task: Awaitable[None] | None = None self._sync_dirty_deps_task: Awaitable[None] | None = None async def _run_in_context(self): self.context.fork() with self._enter(): return await self.fn() async def recompute(self): value = await self._run_in_context() if self._call_task is not None: self.dirty = False # If invalidated before this run completes, stay dirty. if self._check_equality: if _equal(value, self._value): return elif self._value is self.UNSET: # do not notify on first set self._value = value return self._value = value self.notify() async def __sync_dirty_deps(self): try: current_computations = self.context.leaf.current_computations for dep in tuple(self.dependencies): # note: I don't know why but `self.dependencies` may shrink during iteration if isinstance(dep, BaseDerived) and dep not in current_computations: if isinstance(dep, AsyncDerived): await dep._sync_dirty_deps() # noqa: SLF001 if dep.dirty: await dep() else: await __class__.__sync_dirty_deps(dep) # noqa: SLF001 # type: ignore if dep.dirty: dep() finally: self._sync_dirty_deps_task = None def _sync_dirty_deps(self): if self._sync_dirty_deps_task is not None: return self._sync_dirty_deps_task task = self._sync_dirty_deps_task = self.start(self.__sync_dirty_deps) return task async def _call_async(self): await self._sync_dirty_deps() try: if self.dirty: if self._call_task is not None: await self._call_task else: task = self._call_task = self.start(self.recompute) await task return self._value finally: self._call_task = None def __call__(self): self.track() return self.start(self._call_async) def trigger(self): self.dirty = True self._call_task = None if _pulled(self): return self() def invalidate(self): self.trigger() ``` --- `reactivity/__init__.py` ```py from .functional import batch, create_effect, create_memo, create_signal, memoized_method, memoized_property from .helpers import Reactive from .primitives import State __all__ = ["Reactive", "State", "batch", "create_effect", "create_memo", "create_signal", "memoized_method", "memoized_property"] ``` --- `reactivity/helpers.py` ```py from collections.abc import Callable from typing import TYPE_CHECKING, Self, overload from .context import Context from .primitives import BaseComputation, Derived, DescriptorMixin, Subscribable class Memoized[T](Subscribable, BaseComputation[T]): def __init__(self, fn: Callable[[], T], *, context: Context | None = None): super().__init__(context=context) self.fn = fn self.is_stale = True self.cached_value: T def recompute(self): with self._enter(): self.cached_value = self.fn() self.is_stale = False def trigger(self): self.invalidate() def __call__(self): self.track() if self.is_stale: self.recompute() return self.cached_value def invalidate(self): if not self.is_stale: del self.cached_value self.is_stale = True self.notify() def _not_implemented(self, instance, *_): raise NotImplementedError(f"{type(instance).__name__}.{self.name} is read-only") # todo: support optimistic updates class MemoizedProperty[T, I](DescriptorMixin[Memoized[T]]): def __init__(self, method: Callable[[I], T], *, context: Context | None = None): super().__init__() self.method = method self.context = context def _new(self, instance): return Memoized(self.method.__get__(instance), context=self.context) @overload def __get__(self, instance: None, owner: type[I]) -> Self: ... @overload def __get__(self, instance: I, owner: type[I]) -> T: ... def __get__(self, instance: I | None, owner): if instance is None: return self return self.find(instance)() __delete__ = __set__ = _not_implemented class MemoizedMethod[T, I](DescriptorMixin[Memoized[T]]): def __init__(self, method: Callable[[I], T], *, context: Context | None = None): super().__init__() self.method = method self.context = context def _new(self, instance): return Memoized(self.method.__get__(instance), context=self.context) @overload def __get__(self, instance: None, owner: type[I]) -> Self: ... @overload def __get__(self, instance: I, owner: type[I]) -> Memoized[T]: ... def __get__(self, instance: I | None, owner): if instance is None: return self return self.find(instance) __delete__ = __set__ = _not_implemented class DerivedProperty[T, I](DescriptorMixin[Derived[T]]): def __init__(self, method: Callable[[I], T], check_equality=True, *, context: Context | None = None): super().__init__() self.method = method self.check_equality = check_equality self.context = context def _new(self, instance): return Derived(self.method.__get__(instance), self.check_equality, context=self.context) @overload def __get__(self, instance: None, owner: type[I]) -> Self: ... @overload def __get__(self, instance: I, owner: type[I]) -> T: ... def __get__(self, instance: I | None, owner): if instance is None: return self return self.find(instance)() __delete__ = __set__ = _not_implemented class DerivedMethod[T, I](DescriptorMixin[Derived[T]]): def __init__(self, method: Callable[[I], T], check_equality=True, *, context: Context | None = None): super().__init__() self.method = method self.check_equality = check_equality self.context = context def _new(self, instance): return Derived(self.method.__get__(instance), self.check_equality, context=self.context) @overload def __get__(self, instance: None, owner: type[I]) -> Self: ... @overload def __get__(self, instance: I, owner: type[I]) -> Derived[T]: ... def __get__(self, instance: I | None, owner): if instance is None: return self return self.find(instance) __delete__ = __set__ = _not_implemented if TYPE_CHECKING: from typing_extensions import deprecated # noqa: UP035 from .collections import ReactiveMapping @deprecated("Use `ReactiveMapping` instead") class Reactive[K, V](ReactiveMapping[K, V]): ... else: from .collections import ReactiveMapping as Reactive # noqa: F401 ``` --- `reactivity/collections.py` ```py from collections import defaultdict from collections.abc import Mapping, MutableMapping from .context import Context, default_context from .primitives import Signal, Subscribable, _equal class ReactiveMappingProxy[K, V](MutableMapping[K, V]): def _signal(self, value=False): return Signal(value, context=self.context) # 0 for unset def __init__(self, initial: MutableMapping[K, V], check_equality=True, *, context: Context | None = None): self.context = context or default_context self._check_equality = check_equality self._data = initial self._keys = defaultdict(self._signal, {k: self._signal(True) for k in tuple(initial)}) # in subclasses, self._signal() may mutate `initial` self._iter = Subscribable() def __getitem__(self, key: K): if self._keys[key].get(): return self._data[key] raise KeyError(key) def __setitem__(self, key: K, value: V): if self._keys[key]._value: # noqa: SLF001 should_notify = not self._check_equality or not _equal(self._data[key], value) self._data[key] = value if should_notify: self._keys[key].notify() else: self._data[key] = value with self.context.batch(force_flush=False): self._keys[key].set(True) self._iter.notify() def __delitem__(self, key: K): if not self._keys[key]._value: # noqa: SLF001 raise KeyError(key) del self._data[key] with self.context.batch(force_flush=False): self._keys[key].set(False) self._iter.notify() def __iter__(self): self._iter.track() for key in self._keys: if self._keys[key]._value: # noqa: SLF001 yield key def __len__(self): return sum(i._value for i in self._keys.values()) # noqa: SLF001 def __repr__(self): return repr({**self}) class ReactiveMapping[K, V](ReactiveMappingProxy[K, V]): def __init__(self, initial: Mapping[K, V] | None = None, check_equality=True, *, context: Context | None = None): super().__init__({**initial} if initial is not None else {}, check_equality, context=context) ``` --- `reactivity/context.py` ```py from __future__ import annotations from collections.abc import Iterable from contextlib import contextmanager from contextvars import ContextVar from functools import partial from typing import TYPE_CHECKING, NamedTuple if TYPE_CHECKING: from .primitives import BaseComputation class Context(NamedTuple): current_computations: list[BaseComputation] batches: list[Batch] async_execution_context: ContextVar[Context | None] def schedule_callbacks(self, callbacks: Iterable[BaseComputation]): self.batches[-1].callbacks.update(callbacks) @contextmanager def enter(self, computation: BaseComputation): old_dependencies = {*computation.dependencies} computation.dispose() self.current_computations.append(computation) try: yield except BaseException: # For backward compatibility, we restore old dependencies only if some dependencies are lost after an exception. # This behavior may be configurable in the future. if computation.dependencies.issubset(old_dependencies): for dep in old_dependencies: dep.subscribers.add(computation) computation.dependencies.update(old_dependencies) raise finally: last = self.current_computations.pop() assert last is computation # sanity check @property def batch(self): return partial(Batch, context=self) @property def signal(self): return partial(Signal, context=self) @property def effect(self): return partial(Effect, context=self) @property def async_effect(self): return partial(AsyncEffect, context=self) @property def async_derived(self): return partial(AsyncDerived, context=self) @contextmanager def untrack(self): computations = self.current_computations[:] self.current_computations.clear() try: yield finally: self.current_computations[:] = computations @property def leaf(self): return self.async_execution_context.get() or self def fork(self): self.async_execution_context.set(Context(self.current_computations[:], self.batches[:], self.async_execution_context)) def new_context(): return Context([], [], async_execution_context=ContextVar("current context", default=None)) default_context = new_context() from .async_primitives import AsyncDerived, AsyncEffect from .primitives import Batch, Effect, Signal ``` --- `reactivity/functional.py` ```py from collections.abc import Callable from functools import wraps from typing import Protocol, overload from .helpers import Memoized, MemoizedMethod, MemoizedProperty from .primitives import Batch, Effect, Signal class Getter[T](Protocol): def __call__(self, track=True) -> T: ... class Setter[T](Protocol): def __call__(self, value: T) -> bool: ... def create_signal[T](initial_value: T = None, check_equality=True) -> tuple[Getter[T], Setter[T]]: signal = Signal(initial_value, check_equality) return signal.get, signal.set def create_effect[T](fn: Callable[[], T], call_immediately=True): return Effect(fn, call_immediately) def create_memo[T](fn: Callable[[], T]): return Memoized(fn) def memoized_property[T, I](method: Callable[[I], T]): return MemoizedProperty(method) def memoized_method[T, I](method: Callable[[I], T]): return MemoizedMethod(method) @overload def batch() -> Batch: ... @overload def batch[**P, T](func: Callable[P, T]) -> Callable[P, T]: ... def batch[**P, T](func: Callable[P, T] | None = None) -> Callable[P, T] | Batch: if func is not None: @wraps(func) def wrapped(*args, **kwargs): with Batch(): return func(*args, **kwargs) return wrapped return Batch() ``` --- `reactivity/primitives.py` ```py from collections.abc import Callable from typing import Any, Self, overload from weakref import WeakSet from .context import Context, default_context def _equal(a, b): if a is b: return True comparison_result: Any = False for i in range(3): # pandas DataFrame's .all() returns a Series, which is still incompatible :( try: if i == 0: comparison_result = a == b if comparison_result: return True except (ValueError, RuntimeError) as e: if "is ambiguous" in str(e) and hasattr(comparison_result, "all"): # array-like instances comparison_result = comparison_result.all() else: return False return False class Subscribable: def __init__(self, *, context: Context | None = None): super().__init__() self.subscribers = set[BaseComputation]() self.context = context or default_context def track(self): ctx = self.context.leaf if not ctx.current_computations: return last = ctx.current_computations[-1] if last is not self: with ctx.untrack(): self.subscribers.add(last) last.dependencies.add(self) def notify(self): ctx = self.context.leaf if ctx.batches: ctx.schedule_callbacks(self.subscribers) else: with Batch(force_flush=False, context=ctx): ctx.schedule_callbacks(self.subscribers) class BaseComputation[T]: def __init__(self, *, context: Context | None = None): super().__init__() self.dependencies = WeakSet[Subscribable]() self.context = context or default_context def dispose(self): for dep in self.dependencies: dep.subscribers.remove(self) self.dependencies.clear() def _enter(self): return self.context.leaf.enter(self) def __enter__(self): return self def __exit__(self, *_): self.dispose() def trigger(self) -> Any: ... def __call__(self) -> T: return self.trigger() class Signal[T](Subscribable): def __init__(self, initial_value: T = None, check_equality=True, *, context: Context | None = None): super().__init__(context=context) self._value: T = initial_value self._check_equality = check_equality def get(self, track=True): if track: self.track() return self._value def set(self, value: T): if not self._check_equality or not _equal(self._value, value): self._value = value self.notify() return True return False class DescriptorMixin[T]: SLOT_KEY = "_reactive_descriptors_" def __set_name__(self, owner: type, name: str): self.name = name if hasattr(owner, "__slots__") and __class__.SLOT_KEY not in (slots := owner.__slots__): key = f"{self.__class__.__name__}.SLOT_KEY" match slots: case tuple() as slots: new_slots = f"({', '.join(slots)}, {key})" if slots else f"({key},)" case str(): new_slots = f"{slots}, {key}" case set(): new_slots = f"{{{', '.join(slots)}, {key}}}" if slots else f"{{{key}}}" case _: new_slots = f"[{', '.join(slots)}, {key}]" if slots else f"[{key}]" from inspect import getsource from textwrap import dedent, indent try: selected = [] for line in dedent(getsource(owner)).splitlines(): if line.startswith(("@", f"class {owner.__name__}")): selected.append(line) else: break cls_def = "\n".join(selected) # maybe source mismatch (usually during `exec`) if f"class {owner.__name__}" not in selected: raise OSError # noqa: TRY301 except (OSError, TypeError): bases = [b.__name__ for b in owner.__bases__ if b is not object] cls_def = f"class {owner.__name__}{f'({", ".join(bases)})' if bases else ''}:" __tracebackhide__ = 1 # for pytest msg = f"Missing {key} in slots definition for `{self.__class__.__name__}`.\n\n" msg += indent( "\n\n".join( ( f"Please add `{key}` to your `__slots__`. You should change:", indent(f"{cls_def}\n __slots__ = {slots!r}", " "), "to:", indent(f"{cls_def}\n __slots__ = {new_slots}", " "), ) ), " ", ) raise TypeError(msg + "\n") def _new(self, instance) -> T: ... def find(self, instance) -> T: if hasattr(instance, "__dict__"): if (obj := instance.__dict__.get(self.name)) is None: instance.__dict__[self.name] = obj = self._new(instance) else: if map := getattr(instance, self.SLOT_KEY, None): assert isinstance(map, dict) if (obj := map.get(self.name)) is None: map[self.name] = obj = self._new(instance) else: setattr(instance, self.SLOT_KEY, {self.name: (obj := self._new(instance))}) return obj class State[T](Signal[T], DescriptorMixin[Signal[T]]): def __init__(self, initial_value: T = None, check_equality=True, *, context: Context | None = None): super().__init__(initial_value, check_equality, context=context) self._value = initial_value self._check_equality = check_equality @overload def __get__(self, instance: None, owner: type) -> Self: ... @overload def __get__(self, instance: Any, owner: type) -> T: ... def __get__(self, instance, owner): if instance is None: return self return self.find(instance).get() def __set__(self, instance, value: T): self.find(instance).set(value) def _new(self, instance): # noqa: ARG002 return Signal(self._value, self._check_equality, context=self.context) class Effect[T](BaseComputation[T]): def __init__(self, fn: Callable[[], T], call_immediately=True, *, context: Context | None = None): super().__init__(context=context) self._fn = fn if call_immediately: self() def trigger(self): with self._enter(): return self._fn() class Batch: def __init__(self, force_flush=True, *, context: Context | None = None): self.callbacks = set[BaseComputation]() self.force_flush = force_flush self.context = context or default_context def flush(self): triggered = set() while self.callbacks: callbacks = self.callbacks - triggered self.callbacks.clear() for computation in callbacks: if computation in self.callbacks: continue # skip if re-added during callback computation.trigger() triggered.add(computation) def __enter__(self): self.context.batches.append(self) def __exit__(self, *_): if self.force_flush or len(self.context.batches) == 1: try: self.flush() finally: last = self.context.batches.pop() else: last = self.context.batches.pop() self.context.schedule_callbacks(self.callbacks) assert last is self class BaseDerived[T](Subscribable, BaseComputation[T]): def __init__(self, *, context: Context | None = None): super().__init__(context=context) self.dirty = True def _sync_dirty_deps(self) -> Any: current_computations = self.context.leaf.current_computations for dep in self.dependencies: if isinstance(dep, BaseDerived) and dep.dirty and dep not in current_computations: dep() class Derived[T](BaseDerived[T]): UNSET: T = object() # type: ignore def __init__(self, fn: Callable[[], T], check_equality=True, *, context: Context | None = None): super().__init__(context=context) self.fn = fn self._check_equality = check_equality self._value = self.UNSET def recompute(self): with self._enter(): value = self.fn() self.dirty = False if self._check_equality: if _equal(value, self._value): return elif self._value is self.UNSET: # do not notify on first set self._value = value return self._value = value self.notify() def __call__(self): self.track() self._sync_dirty_deps() if self.dirty: self.recompute() return self._value def trigger(self): self.dirty = True if _pulled(self): self() def invalidate(self): self.trigger() def _pulled(sub: Subscribable): visited = set() to_visit: set[Subscribable] = {sub} while to_visit: visited.add(current := to_visit.pop()) for s in current.subscribers: if not isinstance(s, BaseDerived): return True if s not in visited: to_visit.add(s) return False ``` --- `reactivity/hmr/__main__.py` ```py if __name__ == "__main__": from .run import main main() ``` --- `reactivity/hmr/__init__.py` ```py from .hooks import on_dispose, post_reload, pre_reload from .run import cli from .utils import cache_across_reloads __all__ = ("cache_across_reloads", "cli", "on_dispose", "post_reload", "pre_reload") ``` --- `reactivity/hmr/_common.py` ```py from ..context import new_context HMR_CONTEXT = new_context() ``` --- `reactivity/hmr/core.py` ```py import sys from ast import get_docstring, parse from collections.abc import Callable, Iterable, MutableMapping, Sequence from contextlib import suppress from functools import cached_property, partial from importlib.abc import Loader, MetaPathFinder from importlib.machinery import ModuleSpec from importlib.util import spec_from_loader from inspect import ismethod from os import getenv from pathlib import Path from site import getsitepackages, getusersitepackages from sysconfig import get_paths from types import ModuleType, TracebackType from typing import Any, Self from weakref import WeakValueDictionary from ..context import Context from ..helpers import DerivedMethod from ..primitives import BaseDerived, Derived, Signal from ._common import HMR_CONTEXT from .fs import notify, setup_fs_audithook from .hooks import call_post_reload_hooks, call_pre_reload_hooks from .proxy import Proxy def is_called_internally(*, extra_depth=0) -> bool: """Protect private methods from being called from outside this package.""" frame = sys._getframe(extra_depth + 2) # noqa: SLF001 return frame.f_globals.get("__package__") == __package__ class Name(Signal, BaseDerived): def get(self, track=True): self._sync_dirty_deps() return super().get(track) class NamespaceProxy(Proxy): def __init__(self, initial: MutableMapping, module: "ReactiveModule", check_equality=True, *, context: Context | None = None): self.module = module super().__init__(initial, check_equality, context=context) def _signal(self, value=False): self.module.load.subscribers.add(signal := Name(value, self._check_equality, context=self.context)) signal.dependencies.add(self.module.load) return signal def __getitem__(self, key): try: return super().__getitem__(key) finally: signal = self._keys[key] if self.module.load in signal.subscribers: # a module's loader shouldn't subscribe its variables signal.subscribers.remove(self.module.load) self.module.load.dependencies.remove(signal) STATIC_ATTRS = frozenset(("__path__", "__dict__", "__spec__", "__name__", "__file__", "__loader__", "__package__", "__cached__")) class ReactiveModule(ModuleType): instances: WeakValueDictionary[Path, Self] = WeakValueDictionary() def __init__(self, file: Path, namespace: dict, name: str, doc: str | None = None): super().__init__(name, doc) self.__is_initialized = False self.__dict__.update(namespace) self.__is_initialized = True self.__namespace = namespace self.__namespace_proxy = NamespaceProxy(namespace, self, context=HMR_CONTEXT) self.__hooks: list[Callable[[], Any]] = [] self.__file = file __class__.instances[file.resolve()] = self @property def file(self): if is_called_internally(extra_depth=1): # + 1 for `__getattribute__` return self.__file raise AttributeError("file") @property def register_dispose_callback(self): if is_called_internally(extra_depth=1): # + 1 for `__getattribute__` return self.__hooks.append raise AttributeError("register_dispose_callback") @partial(DerivedMethod, context=HMR_CONTEXT) def __load(self): try: file = self.__file ast = parse(file.read_text("utf-8"), str(file)) code = compile(ast, str(file), "exec", dont_inherit=True) except SyntaxError as e: sys.excepthook(type(e), e, e.__traceback__) else: for dispose in self.__hooks: with suppress(Exception): dispose() self.__hooks.clear() self.__doc__ = get_docstring(ast) exec(code, self.__namespace, self.__namespace_proxy) # https://github.com/python/cpython/issues/121306 self.__namespace_proxy.update(self.__namespace) finally: load = self.__load assert ismethod(load.fn) # for type narrowing for dep in list(load.dependencies): if isinstance(dep, Derived) and ismethod(dep.fn) and isinstance(dep.fn.__self__, ReactiveModule) and dep.fn.__func__ is load.fn.__func__: # unsubscribe it because we want invalidation to be fine-grained dep.subscribers.remove(load) load.dependencies.remove(dep) @property def load(self): if is_called_internally(extra_depth=1): # + 1 for `__getattribute__` return self.__load raise AttributeError("load") def __dir__(self): return iter(self.__namespace_proxy) def __getattribute__(self, name: str): if name == "__dict__" and self.__is_initialized: return self.__namespace if name == "instances": # class-level attribute raise AttributeError(name) return super().__getattribute__(name) def __getattr__(self, name: str): try: return self.__namespace_proxy[name] if name not in STATIC_ATTRS else self.__namespace[name] except KeyError as e: if name not in STATIC_ATTRS and (getattr := self.__namespace_proxy.get("__getattr__")): return getattr(name) raise AttributeError(*e.args) from None def __setattr__(self, name: str, value): if is_called_internally(): return super().__setattr__(name, value) self.__namespace_proxy[name] = value class ReactiveModuleLoader(Loader): def create_module(self, spec: ModuleSpec): assert spec.origin is not None, "This loader can only load file-backed modules" path = Path(spec.origin) namespace = {"__file__": spec.origin, "__spec__": spec, "__loader__": self, "__name__": spec.name, "__package__": spec.parent, "__cached__": None} if spec.submodule_search_locations is not None: namespace["__path__"] = spec.submodule_search_locations[:] = [str(path.parent)] return ReactiveModule(path, namespace, spec.name) def exec_module(self, module: ModuleType): assert isinstance(module, ReactiveModule) module.load() _loader = ReactiveModuleLoader() # This is a singleton loader instance used by the finder def _deduplicate(input_paths: Iterable[str | Path | None]): paths = [*{Path(p).resolve(): None for p in input_paths if p is not None}] # dicts preserve insertion order for i, p in enumerate(s := sorted(paths, reverse=True), start=1): if is_relative_to_any(p, s[i:]): paths.remove(p) return paths class ReactiveModuleFinder(MetaPathFinder): def __init__(self, includes: Iterable[str] = ".", excludes: Iterable[str] = ()): super().__init__() builtins = map(get_paths().__getitem__, ("stdlib", "platstdlib", "platlib", "purelib")) self.includes = _deduplicate(includes) self.excludes = _deduplicate((getenv("VIRTUAL_ENV"), *getsitepackages(), getusersitepackages(), *builtins, *excludes)) self._last_sys_path: list[str] = [] self._last_cwd: Path = Path() self._cached_search_paths: list[Path] = [] def _accept(self, path: Path): return path.is_file() and not is_relative_to_any(path, self.excludes) and is_relative_to_any(path, self.includes) @property def search_paths(self): # Currently we assume `includes` and `excludes` never change if sys.path == self._last_sys_path and self._last_cwd.exists() and Path.cwd().samefile(self._last_cwd): return self._cached_search_paths res = [ path for path in (Path(p).resolve() for p in sys.path) if not path.is_file() and not is_relative_to_any(path, self.excludes) and any(i.is_relative_to(path) or path.is_relative_to(i) for i in self.includes) ] self._cached_search_paths = res self._last_cwd = Path.cwd() self._last_sys_path = [*sys.path] return res def find_spec(self, fullname: str, paths: Sequence[str | Path] | None, _=None): if fullname in sys.modules: return None if paths is not None: paths = [path.resolve() for path in (Path(p) for p in paths) if path.is_dir()] for directory in self.search_paths: file = directory / f"{fullname.replace('.', '/')}.py" if self._accept(file) and (paths is None or is_relative_to_any(file, paths)): return spec_from_loader(fullname, _loader, origin=str(file)) file = directory / f"{fullname.replace('.', '/')}/__init__.py" if self._accept(file) and (paths is None or is_relative_to_any(file, paths)): return spec_from_loader(fullname, _loader, origin=str(file), is_package=True) def is_relative_to_any(path: Path, paths: Iterable[str | Path]): return any(path.is_relative_to(p) for p in paths) def patch_module(name_or_module: str | ModuleType): name = name_or_module if isinstance(name_or_module, str) else name_or_module.__name__ module = sys.modules[name_or_module] if isinstance(name_or_module, str) else name_or_module assert isinstance(module.__file__, str), f"{name} is not a file-backed module" m = sys.modules[name] = ReactiveModule(Path(module.__file__), module.__dict__, module.__name__, module.__doc__) return m def patch_meta_path(includes: Iterable[str] = (".",), excludes: Iterable[str] = ()): sys.meta_path.insert(0, ReactiveModuleFinder(includes, excludes)) def get_path_module_map(): return {**ReactiveModule.instances} class ErrorFilter: def __init__(self, *exclude_filenames: str): self.exclude_filenames = set(exclude_filenames) def __call__(self, tb: TracebackType): current = tb while current is not None: if current.tb_frame.f_code.co_filename not in self.exclude_filenames: return current current = current.tb_next return tb def __enter__(self): return self def __exit__(self, exc_type: type[BaseException], exc_value: BaseException, traceback: TracebackType): if exc_value is None: return tb = self(traceback) exc_value = exc_value.with_traceback(tb) sys.excepthook(exc_type, exc_value, tb) return True class BaseReloader: def __init__(self, entry_file: str, includes: Iterable[str] = (".",), excludes: Iterable[str] = ()): self.entry = entry_file self.includes = includes self.excludes = excludes patch_meta_path(includes, excludes) self.error_filter = ErrorFilter(*map(str, Path(__file__, "../..").resolve().glob("**/*.py"))) setup_fs_audithook() @cached_property def entry_module(self): namespace = {"__file__": self.entry, "__name__": "__main__"} return ReactiveModule(Path(self.entry), namespace, "__main__") def run_entry_file(self): with self.error_filter: self.entry_module.load() def on_events(self, events: Iterable[tuple[int, str]]): from watchfiles import Change if not events: return self.on_changes({Path(file).resolve() for type, file in events if type is not Change.deleted}) def on_changes(self, files: set[Path]): path2module = get_path_module_map() call_pre_reload_hooks() with HMR_CONTEXT.batch(): for path in files: with self.error_filter: if module := path2module.get(path): module.load.invalidate() else: notify(path) call_post_reload_hooks() class _SimpleEvent: def __init__(self): self._set = False def set(self): self._set = True def is_set(self): return self._set class SyncReloader(BaseReloader): @cached_property def _stop_event(self): return _SimpleEvent() def stop_watching(self): self._stop_event.set() def start_watching(self): from watchfiles import watch for events in watch(self.entry, *self.includes, stop_event=self._stop_event): self.on_events(events) del self._stop_event def keep_watching_until_interrupt(self): call_pre_reload_hooks() with suppress(KeyboardInterrupt), HMR_CONTEXT.effect(self.run_entry_file): call_post_reload_hooks() self.start_watching() class AsyncReloader(BaseReloader): @cached_property def _stop_event(self): from asyncio import Event return Event() def stop_watching(self): self._stop_event.set() async def start_watching(self): from watchfiles import awatch async for events in awatch(self.entry, *self.includes, stop_event=self._stop_event): self.on_events(events) del self._stop_event async def keep_watching_until_interrupt(self): call_pre_reload_hooks() with suppress(KeyboardInterrupt), HMR_CONTEXT.effect(self.run_entry_file): call_post_reload_hooks() await self.start_watching() __version__ = "0.6.4.4" ``` --- `reactivity/hmr/api.py` ```py import sys from .core import HMR_CONTEXT, AsyncReloader, BaseReloader, SyncReloader from .hooks import call_post_reload_hooks, call_pre_reload_hooks class LifecycleMixin(BaseReloader): def run_with_hooks(self): self._original_main_module = sys.modules["__main__"] sys.modules["__main__"] = self.entry_module call_pre_reload_hooks() self.effect = HMR_CONTEXT.effect(self.run_entry_file) call_post_reload_hooks() def clean_up(self): self.effect.dispose() self.entry_module.load.dispose() self.entry_module.load.invalidate() sys.modules["__main__"] = self._original_main_module class SyncReloaderAPI(SyncReloader, LifecycleMixin): def __enter__(self): from threading import Thread self.run_with_hooks() self.thread = Thread(target=self.start_watching) self.thread.start() return super() def __exit__(self, *_): self.stop_watching() self.thread.join() self.clean_up() async def __aenter__(self): from asyncio import ensure_future, sleep, to_thread await to_thread(self.run_with_hooks) self.future = ensure_future(to_thread(self.start_watching)) await sleep(0) return super() async def __aexit__(self, *_): self.stop_watching() await self.future self.clean_up() class AsyncReloaderAPI(AsyncReloader, LifecycleMixin): def __enter__(self): from asyncio import run from threading import Event, Thread self.run_with_hooks() e = Event() async def task(): e.set() await self.start_watching() self.thread = Thread(target=lambda: run(task())) self.thread.start() e.wait() return super() def __exit__(self, *_): self.stop_watching() self.thread.join() self.clean_up() async def __aenter__(self): from asyncio import ensure_future, sleep, to_thread await to_thread(self.run_with_hooks) self.future = ensure_future(self.start_watching()) await sleep(0) return super() async def __aexit__(self, *_): self.stop_watching() await self.future self.clean_up() ``` --- `reactivity/hmr/fs.py` ```py import sys from collections import defaultdict from functools import cache from pathlib import Path from ..primitives import Signal from ._common import HMR_CONTEXT @defaultdict def fs_signals(): return Signal(context=HMR_CONTEXT) @cache def setup_fs_audithook(): current_computations = HMR_CONTEXT.current_computations @sys.addaudithook def _(event: str, args: tuple): if event == "open": file, _, flags = args if (flags % 2 == 0) and current_computations and isinstance(file, str): track(file) def track(file: str | Path): fs_signals[Path(file).resolve()].track() def notify(file: Path): fs_signals[file].notify() __all__ = "notify", "setup_fs_audithook", "track" ``` --- `reactivity/hmr/hooks.py` ```py from collections.abc import Callable from contextlib import contextmanager from inspect import currentframe from pathlib import Path from typing import Any pre_reload_hooks: dict[str, Callable[[], Any]] = {} post_reload_hooks: dict[str, Callable[[], Any]] = {} def pre_reload[T](func: Callable[[], T]) -> Callable[[], T]: pre_reload_hooks[func.__name__] = func return func def post_reload[T](func: Callable[[], T]) -> Callable[[], T]: post_reload_hooks[func.__name__] = func return func @contextmanager def use_pre_reload(func): pre_reload(func) try: yield func finally: pre_reload_hooks.pop(func.__name__, None) @contextmanager def use_post_reload(func): post_reload(func) try: yield func finally: post_reload_hooks.pop(func.__name__, None) def call_pre_reload_hooks(): for func in pre_reload_hooks.values(): func() def call_post_reload_hooks(): for func in post_reload_hooks.values(): func() def on_dispose(func: Callable[[], Any], __file__: str | None = None): path = Path(currentframe().f_back.f_globals["__file__"] if __file__ is None else __file__).resolve() # type: ignore from .core import ReactiveModule module = ReactiveModule.instances[path] module.register_dispose_callback(func) ``` --- `reactivity/hmr/proxy.py` ```py from collections.abc import MutableMapping from typing import Any from reactivity.context import Context from ..collections import ReactiveMappingProxy class Proxy[T: MutableMapping](ReactiveMappingProxy[str, Any]): def __init__(self, initial: MutableMapping[str, Any], check_equality=True, *, context: Context | None = None): super().__init__(initial, check_equality, context=context) self.raw: T = self._data # type: ignore ``` --- `reactivity/hmr/utils.py` ```py from ast import parse from collections import UserDict from collections.abc import Callable from functools import wraps from inspect import getsource, getsourcefile from pathlib import Path from types import FunctionType from ..helpers import Derived from .core import HMR_CONTEXT, NamespaceProxy, ReactiveModule from .exec_hack import dedent, fix_class_name_resolution from .hooks import on_dispose, post_reload memos: dict[tuple[Path, str], tuple[Callable, str]] = {} # (path, __qualname__) -> (memo, source) functions: dict[tuple[Path, str], Callable] = {} # (path, __qualname__) -> function @post_reload def gc_memos(): for key in {*memos} - {*functions}: del memos[key] _cache_decorator_phase = False def cache_across_reloads[T](func: Callable[[], T]) -> Callable[[], T]: file = getsourcefile(func) assert file is not None module = ReactiveModule.instances.get(path := Path(file).resolve()) if module is None: from functools import cache return cache(func) source, col_offset = dedent(getsource(func)) key = (path, func.__qualname__) proxy: NamespaceProxy = module._ReactiveModule__namespace_proxy # type: ignore # noqa: SLF001 global _cache_decorator_phase _cache_decorator_phase = not _cache_decorator_phase if _cache_decorator_phase: # this function will be called twice: once transforming ast and once re-executing the patched source on_dispose(lambda: functions.pop(key), file) try: exec(compile(fix_class_name_resolution(parse(source), func.__code__.co_firstlineno - 1, col_offset), file, "exec"), DictProxy(proxy)) except _Return as e: # If this function is used as a decorator, it will raise an `_Return` exception in the second phase. return e.value else: # Otherwise, it is used as a function, and we need to do the second phase ourselves. func = proxy[func.__name__] func = FunctionType(func.__code__, DictProxy(proxy), func.__name__, func.__defaults__, func.__closure__) functions[key] = func if result := memos.get(key): memo, last_source = result if source != last_source: Derived.invalidate(memo) # type: ignore memos[key] = memo, source return _return(wraps(func)(memo)) @wraps(func) def wrapper() -> T: return functions[key]() memo = Derived(wrapper, context=HMR_CONTEXT) memos[key] = memo, source return _return(wraps(func)(memo)) class _Return(Exception): # noqa: N818 def __init__(self, value): self.value = value super().__init__() def _return[T](value: T) -> T: global _cache_decorator_phase if _cache_decorator_phase: _cache_decorator_phase = not _cache_decorator_phase return value raise _Return(value) # used as decorator, so we raise an exception to jump before outer decorators class DictProxy(UserDict, dict): # type: ignore def __init__(self, data): self.data = data def load(module: ReactiveModule): return module.load() ``` --- `reactivity/hmr/run.py` ```py import sys from pathlib import Path def run_path(entry: str, args: list[str]): path = Path(entry).resolve() if path.is_dir(): if (__main__ := path / "__main__.py").is_file(): parent = "" path = __main__ else: raise FileNotFoundError(f"No __main__.py file in {path}") # noqa: TRY003 elif path.is_file(): parent = None else: raise FileNotFoundError(f"No such file named {path}") # noqa: TRY003 entry = str(path) sys.path.insert(0, str(path.parent)) from importlib.machinery import ModuleSpec from .core import SyncReloader, _loader _argv = sys.argv[:] sys.argv[:] = args _main = sys.modules["__main__"] try: reloader = SyncReloader(entry) sys.modules["__main__"] = mod = reloader.entry_module mod.__dict__.update( { "__loader__": _loader, "__package__": parent, "__spec__": None if parent is None else ModuleSpec("__main__", _loader, origin=entry), } ) reloader.keep_watching_until_interrupt() finally: sys.argv[:] = _argv sys.modules["__main__"] = _main def run_module(module_name: str, args: list[str]): if (cwd := str(Path.cwd())) not in sys.path: sys.path.insert(0, cwd) from importlib.util import find_spec from .core import SyncReloader, patch_meta_path patch_meta_path() spec = find_spec(module_name) if spec is None: raise ModuleNotFoundError(f"No module named '{module_name}'") # noqa: TRY003 if spec.submodule_search_locations: # It's a package, look for __main__.py spec = find_spec(f"{module_name}.__main__") if spec and spec.origin: entry = spec.origin else: raise ModuleNotFoundError(f"No module named '{module_name}.__main__'; '{module_name}' is a package and cannot be directly executed") # noqa: TRY003 elif spec.origin is None: raise ModuleNotFoundError(f"Cannot find entry point for module '{module_name}'") # noqa: TRY003 else: entry = spec.origin args[0] = entry # Replace the first argument with the full path _argv = sys.argv[:] sys.argv[:] = args _main = sys.modules["__main__"] try: reloader = SyncReloader(entry) sys.modules["__main__"] = mod = reloader.entry_module mod.__dict__.update({"__spec__": spec, "__loader__": spec.loader, "__package__": spec.parent}) reloader.keep_watching_until_interrupt() finally: sys.argv[:] = _argv sys.modules["__main__"] = _main def cli(args: list[str] | None = None): if args is None: args = sys.argv[1:] try: if len(args) < 1 or "--help" in args or "-h" in args: print("\n Usage:") print(" hmr , just like python ") print(" hmr -m , just like python -m \n") if len(args) < 1: return 1 elif args[0] == "-m": if len(args) < 2: print("\n Usage: hmr -m , just like python -m \n") return 1 module_name = args[1] args.pop(0) # remove -m flag run_module(module_name, args) else: run_path(args[0], args) except (FileNotFoundError, ModuleNotFoundError) as e: print(f"\n Error: {e}\n") return 1 return 0 def main(): sys.exit(cli(sys.argv[1:])) ``` --- `reactivity/hmr/exec_hack/transform.py` ```py import ast from typing import override class ClassTransformer(ast.NodeTransformer): @override def visit_ClassDef(self, node: ast.ClassDef): traverser = ClassBodyTransformer() has_docstring = node.body and isinstance(node.body[0], ast.Expr) and isinstance(node.body[0].value, ast.Constant) and isinstance(node.body[0].value.value, str) node.body[has_docstring:] = [ *def_name_lookup().body, *map(traverser.visit, node.body[has_docstring:]), ast.Delete(targets=[ast.Name(id="__name_lookup", ctx=ast.Del())]), ast.parse(f"False and ( {','.join(traverser.names)} )").body[0], ] return ast.fix_missing_locations(node) class ClassBodyTransformer(ast.NodeTransformer): def __init__(self): self.names: dict[str, None] = {} # to keep order for better readability @override def visit_Name(self, node: ast.Name): if isinstance(node.ctx, ast.Load) and node.id != "__name_lookup": self.names[node.id] = None return build_name_lookup(node.id) return node @override def visit_FunctionDef(self, node: ast.FunctionDef): node.decorator_list = [self.visit(d) for d in node.decorator_list] self.visit(node.args) if node.returns: node.returns = self.visit(node.returns) return node visit_AsyncFunctionDef = visit_FunctionDef # type: ignore # noqa: N815 @override def visit_Lambda(self, node: ast.Lambda): self.visit(node.args) return node def build_name_lookup(name: str) -> ast.Call: return ast.Call(func=ast.Name(id="__name_lookup", ctx=ast.Load()), args=[ast.Constant(value=name)], keywords=[]) name_lookup_source = """ def __name_lookup(): from builtins import KeyError, NameError from collections import ChainMap from inspect import currentframe f = currentframe().f_back c = ChainMap(f.f_locals, f.f_globals, f.f_builtins) if freevars := f.f_code.co_freevars: c.maps.insert(1, e := {}) freevars = {*f.f_code.co_freevars} while freevars: f = f.f_back for name in f.f_code.co_cellvars: if name in freevars.intersection(f.f_code.co_cellvars): freevars.remove(name) e[name] = f.f_locals[name] def lookup(name): try: return c[name] except KeyError as e: raise NameError(*e.args) from None return lookup __name_lookup = __name_lookup() """ def def_name_lookup(): tree = ast.parse(name_lookup_source) for node in ast.walk(tree): for attr in ("lineno", "end_lineno", "col_offset", "end_col_offset"): if hasattr(node, attr): delattr(node, attr) return tree ``` --- `reactivity/hmr/exec_hack/__init__.py` ```py import ast from .transform import ClassTransformer def fix_class_name_resolution[T: ast.AST](mod: T, lineno_offset=0, col_offset=0) -> T: new_mod = ClassTransformer().visit(mod) if lineno_offset: ast.increment_lineno(new_mod, lineno_offset) if col_offset: _increment_col_offset(new_mod, col_offset) return new_mod def _increment_col_offset[T: ast.AST](tree: T, n: int) -> T: for node in ast.walk(tree): if isinstance(node, (ast.stmt, ast.expr)): # noqa: UP038 node.col_offset += n if isinstance(node.end_col_offset, int): node.end_col_offset += n return tree def dedent(source: str): lines = source.splitlines(keepends=True) level = len(lines[0]) - len(lines[0].lstrip()) return "".join(line[level:] for line in lines), level ```