Skip to content

monotonic_dict

MonotonicDict

Bases: MutableMapping, Callback

A dictionary-like object whose true state is defined by an append-only, monotonically ordered log of operations.

  • Each mutating operation appends an Op to the log keyed by a UUID.
  • Any read operation materializes the dictionary by replaying the log.
Source code in src/mesh/monotonic_dict.py
class MonotonicDict(MutableMapping, Callback):
    """
    A dictionary-like object whose *true* state is defined by an append-only,
    monotonically ordered log of operations.

    - Each mutating operation appends an Op to the log keyed by a UUID.
    - Any read operation materializes the dictionary by replaying the log.
    """

    def __init__(self, *args, **kwargs):
        super().__init__()

        # Monotonic ordered log: UUID -> Op
        self._commit_keys: list[str] = []
        self._commit_values: list[Op] = []

        if args or kwargs:
            # Use our own update, which will be logged.
            self.update(*args, **kwargs)

        self._materialized_cache: Dict[Any, Any] = {}
        self._materialized_cursor = None

    def _append_op(self, op: Op) -> None:
        """Append a new operation to the log (monotonic, append-only)."""
        op_id = uuid.uuid4().hex

        self._commit_keys.append(op_id)
        self._commit_values.append(op)

        # Trigger callbacks based on operation type  
        if op.kind == "set":
            key, value = op.args
            self._trigger_callbacks(key, value, "set")
        elif op.kind == "del":
            (key,) = op.args
            self._trigger_callbacks(key, None, "del")
        elif op.kind == "update":
            (mapping,) = op.args
            for key, value in mapping.items():
                self._trigger_callbacks(key, value, "update")
        elif op.kind == "clear":
            # Trigger callbacks for all keys being cleared
            state = self._materialize()
            for key in state.keys():
                self._trigger_callbacks(key, None, "clear")

    def _materialize(self) -> Dict[Any, Any]:
        """
        Rebuild the effective dictionary state from the log.
        This is done fresh on every read as per the requirements.
        """

        _commits = self._commit_keys
        _operations = self._commit_values

        if _commits and _commits[-1] == self._materialized_cursor:
            return self._materialized_cache

        if self._materialized_cursor in self._commit_keys:
            _idx_cursor = _commits.index(self._materialized_cursor)
        else:
            _idx_cursor = -1

        _pending_commits = _commits[_idx_cursor + 1: ]
        _pending_ops = _operations[_idx_cursor + 1: ]

        for commit, op in zip(_pending_commits, _pending_ops):
            if op.kind == "set":
                key, value = op.args
                self._materialized_cache[key] = value
            elif op.kind == "del":
                (key,) = op.args
                self._materialized_cache.pop(key, None)
            elif op.kind == "update":
                (mapping,) = op.args
                for k, v in mapping.items():
                    self._materialized_cache[k] = v
            elif op.kind == "clear":
                self._materialized_cache.clear()
            else:
                raise ValueError(f"Unknown op kind: {op.kind}")

            self._materialized_cursor = commit

        return self._materialized_cache

    def to_dict(self) -> Dict[Any, Any]:
        """Convenience: get the current materialized state as a regular dict."""
        return self._materialize()

    def __getitem__(self, key: Any) -> Any:
        state = self._materialize()
        return state[key]

    def __setitem__(self, key: Any, value: Any) -> None:
        self._append_op(Op("set", (key, value)))

    def __delitem__(self, key: Any) -> None:
        # To respect normal dict semantics, we first verify key exists.
        state = self._materialize()
        if key not in state:
            raise KeyError(key)
        self._append_op(Op("del", (key,)))

    def __iter__(self):
        state = self._materialize()
        return iter(state)

    def __len__(self) -> int:
        return len(self._commit_keys)

    def __contains__(self, key: object) -> bool:
        state = self._materialize()
        return key in state

    def get(self, key: Any, default: Optional[Any] = None) -> Any:
        state = self._materialize()
        return state.get(key, default)

    def clear(self) -> None:
        """Logically clear the dictionary (but keep the commit append-only)."""
        self._append_op(Op("clear", ()))

    def update(self, *args, **kwargs) -> None:
        """
        Log an 'update' operation. We collect the data into a mapping and
        append a single Op('update').
        """
        mapping: Dict[Any, Any] = {}

        if args:
            if len(args) > 1:
                raise TypeError("update expected at most 1 positional argument")
            other = args[0]
            if isinstance(other, Mapping):
                mapping.update(other)
            else:
                # Assume an iterable of (key, value)
                for k, v in other:
                    mapping[k] = v

        if kwargs:
            mapping.update(kwargs)

        if mapping:
            self._append_op(Op("update", (mapping,)))

    def commit_history(self) -> OrderedDict[str, Op]:
        """Returns the commit history of the dict."""
        return OrderedDict(zip(self._commit_keys, self._commit_values))

    @property
    def last_commit(self, ) -> str:
        '''Get the last commit
        '''
        return self._commit_keys[-1] if self._commit_keys else None

    def fork(self, commit: str):
        if commit not in self._commit_keys:
            raise Exception()
        _commit_idx = self._commit_keys.index(commit)

        _forked_mdict = MonotonicDict()
        _forked_mdict._commit_keys = self._commit_keys[:_commit_idx + 1]
        _forked_mdict._commit_values = self._commit_values[:_commit_idx + 1]

        return _forked_mdict

    def accept(self, incoming_data, from_node = None):
        # Try Merge
        merged_data = try_accept(self, incoming_data)

        # Validate data sanity
        is_sane = _check(merged_data, self, incoming_data)

        if is_sane:
            # Find new operations that were added  
            existing_commits = set(self._commit_keys)  
            new_ops = [  
                op for commit_id, op in zip(merged_data._commit_keys, merged_data._commit_values)  
                if commit_id not in existing_commits  
            ]

            self._commit_keys = merged_data._commit_keys
            self._commit_values = merged_data._commit_values

            # Trigger callbacks for the new operations  
            self._trigger_callbacks_for_ops(new_ops, src_node=from_node)
        return is_sane

    def merge(self, incoming_data, from_node = None):

        # Track incoming operations
        existing_commits = set(self._commit_keys)  
        incoming_ops = [  
            op for commit_id, op in zip(incoming_data._commit_keys, incoming_data._commit_values)  
            if commit_id not in existing_commits  
        ]  

        # Try Merge
        merged_data = {**self.to_dict(), **incoming_data.to_dict()}

        merged_monotonic_dict = MonotonicDict()
        merged_monotonic_dict._commit_keys = self._commit_keys
        merged_monotonic_dict._commit_values = self._commit_values
        merged_monotonic_dict._commit_keys += incoming_data._commit_keys
        merged_monotonic_dict._commit_values += incoming_data._commit_values
        merged_monotonic_dict._append_op(Op("update", (merged_data, )))

        # Validate data sanity
        is_sane = _check(merged_monotonic_dict, self, incoming_data)
        if is_sane:

            # Trigger callbacks for incoming operations and the final update  
            # self._trigger_callbacks_for_ops(incoming_ops, src_node=from_node)
            self._trigger_callbacks_for_ops([Op("update", (merged_data, ))], src_node=from_node)

            self._commit_keys = merged_monotonic_dict._commit_keys
            self._commit_values = merged_monotonic_dict._commit_values

        return is_sane

    def __eq__(self, other: object) -> bool:
        """
        Equality compares materialized states. Supports comparison with
        regular dicts and other mappings.
        """
        if isinstance(other, MonotonicDict):
            return self.to_dict() == other.to_dict()
        if isinstance(other, Mapping):
            return self.to_dict() == dict(other)
        return NotImplemented

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.to_dict()!r})"

    def _trigger_callbacks_for_ops(self, ops: List[Op], src_node = None) -> None:  
        """Trigger callbacks for a list of operations."""  
        for op in ops:
            if op.kind == "set":
                key, value = op.args
                self._trigger_callbacks(key, value, "set", src_node=src_node)
            elif op.kind == "del":
                (key,) = op.args
                self._trigger_callbacks(key, None, "del", src_node=src_node)
            elif op.kind == "update":
                (mapping,) = op.args
                for key, value in mapping.items():
                    self._trigger_callbacks(key, value, "update", src_node=src_node)
            elif op.kind == "clear":
                state = self._materialize()
                for key in state.keys():
                    self._trigger_callbacks(key, None, "clear", src_node=src_node)

last_commit property

Get the last commit

__eq__(other)

Equality compares materialized states. Supports comparison with regular dicts and other mappings.

Source code in src/mesh/monotonic_dict.py
def __eq__(self, other: object) -> bool:
    """
    Equality compares materialized states. Supports comparison with
    regular dicts and other mappings.
    """
    if isinstance(other, MonotonicDict):
        return self.to_dict() == other.to_dict()
    if isinstance(other, Mapping):
        return self.to_dict() == dict(other)
    return NotImplemented

clear()

Logically clear the dictionary (but keep the commit append-only).

Source code in src/mesh/monotonic_dict.py
def clear(self) -> None:
    """Logically clear the dictionary (but keep the commit append-only)."""
    self._append_op(Op("clear", ()))

commit_history()

Returns the commit history of the dict.

Source code in src/mesh/monotonic_dict.py
def commit_history(self) -> OrderedDict[str, Op]:
    """Returns the commit history of the dict."""
    return OrderedDict(zip(self._commit_keys, self._commit_values))

to_dict()

Convenience: get the current materialized state as a regular dict.

Source code in src/mesh/monotonic_dict.py
def to_dict(self) -> Dict[Any, Any]:
    """Convenience: get the current materialized state as a regular dict."""
    return self._materialize()

update(*args, **kwargs)

Log an 'update' operation. We collect the data into a mapping and append a single Op('update').

Source code in src/mesh/monotonic_dict.py
def update(self, *args, **kwargs) -> None:
    """
    Log an 'update' operation. We collect the data into a mapping and
    append a single Op('update').
    """
    mapping: Dict[Any, Any] = {}

    if args:
        if len(args) > 1:
            raise TypeError("update expected at most 1 positional argument")
        other = args[0]
        if isinstance(other, Mapping):
            mapping.update(other)
        else:
            # Assume an iterable of (key, value)
            for k, v in other:
                mapping[k] = v

    if kwargs:
        mapping.update(kwargs)

    if mapping:
        self._append_op(Op("update", (mapping,)))

Op dataclass

Represents a single log operation.

Source code in src/mesh/monotonic_dict.py
@dataclass(frozen=True)
class Op:
    """Represents a single log operation."""
    kind: str            # 'set', 'del', 'update', 'clear'
    args: Tuple[Any, ...]

try_accept(existing_data, incoming_data)

Merge incoming commit history into existing history.

Assumptions: - Commit IDs are globally unique. - Histories are append-only. - Forks preserve commit ordering.

Source code in src/mesh/monotonic_dict.py
def try_accept(existing_data: MonotonicDict,
              incoming_data: MonotonicDict) -> MonotonicDict:
    """
    Merge incoming commit history into existing history.

    Assumptions:
    - Commit IDs are globally unique.
    - Histories are append-only.
    - Forks preserve commit ordering.
    """

    dummy_dict = MonotonicDict()
    dummy_dict._commit_keys = existing_data._commit_keys.copy()
    dummy_dict._commit_values = existing_data._commit_values.copy()

    existing_commits = set(existing_data._commit_keys)

    for commit_id, op in zip(
        incoming_data._commit_keys,
        incoming_data._commit_values,
    ):
        if commit_id not in existing_commits:
            dummy_dict._commit_keys.append(commit_id)
            dummy_dict._commit_values.append(op)

    return dummy_dict