class MonotonicDict(MutableMapping, Callback):
"""
A dictionary-like object whose *true* state is defined by an append-only,
monotonically ordered log of operations.
- Each mutating operation appends an Op to the log keyed by a UUID.
- Any read operation materializes the dictionary by replaying the log.
"""
def __init__(self, *args, **kwargs):
super().__init__()
# Monotonic ordered log: UUID -> Op
self._commit_keys: list[str] = []
self._commit_values: list[Op] = []
if args or kwargs:
# Use our own update, which will be logged.
self.update(*args, **kwargs)
self._materialized_cache: Dict[Any, Any] = {}
self._materialized_cursor = None
def _append_op(self, op: Op) -> None:
"""Append a new operation to the log (monotonic, append-only)."""
op_id = uuid.uuid4().hex
self._commit_keys.append(op_id)
self._commit_values.append(op)
# Trigger callbacks based on operation type
if op.kind == "set":
key, value = op.args
self._trigger_callbacks(key, value, "set")
elif op.kind == "del":
(key,) = op.args
self._trigger_callbacks(key, None, "del")
elif op.kind == "update":
(mapping,) = op.args
for key, value in mapping.items():
self._trigger_callbacks(key, value, "update")
elif op.kind == "clear":
# Trigger callbacks for all keys being cleared
state = self._materialize()
for key in state.keys():
self._trigger_callbacks(key, None, "clear")
def _materialize(self) -> Dict[Any, Any]:
"""
Rebuild the effective dictionary state from the log.
This is done fresh on every read as per the requirements.
"""
_commits = self._commit_keys
_operations = self._commit_values
if _commits and _commits[-1] == self._materialized_cursor:
return self._materialized_cache
if self._materialized_cursor in self._commit_keys:
_idx_cursor = _commits.index(self._materialized_cursor)
else:
_idx_cursor = -1
_pending_commits = _commits[_idx_cursor + 1: ]
_pending_ops = _operations[_idx_cursor + 1: ]
for commit, op in zip(_pending_commits, _pending_ops):
if op.kind == "set":
key, value = op.args
self._materialized_cache[key] = value
elif op.kind == "del":
(key,) = op.args
self._materialized_cache.pop(key, None)
elif op.kind == "update":
(mapping,) = op.args
for k, v in mapping.items():
self._materialized_cache[k] = v
elif op.kind == "clear":
self._materialized_cache.clear()
else:
raise ValueError(f"Unknown op kind: {op.kind}")
self._materialized_cursor = commit
return self._materialized_cache
def to_dict(self) -> Dict[Any, Any]:
"""Convenience: get the current materialized state as a regular dict."""
return self._materialize()
def __getitem__(self, key: Any) -> Any:
state = self._materialize()
return state[key]
def __setitem__(self, key: Any, value: Any) -> None:
self._append_op(Op("set", (key, value)))
def __delitem__(self, key: Any) -> None:
# To respect normal dict semantics, we first verify key exists.
state = self._materialize()
if key not in state:
raise KeyError(key)
self._append_op(Op("del", (key,)))
def __iter__(self):
state = self._materialize()
return iter(state)
def __len__(self) -> int:
return len(self._commit_keys)
def __contains__(self, key: object) -> bool:
state = self._materialize()
return key in state
def get(self, key: Any, default: Optional[Any] = None) -> Any:
state = self._materialize()
return state.get(key, default)
def clear(self) -> None:
"""Logically clear the dictionary (but keep the commit append-only)."""
self._append_op(Op("clear", ()))
def update(self, *args, **kwargs) -> None:
"""
Log an 'update' operation. We collect the data into a mapping and
append a single Op('update').
"""
mapping: Dict[Any, Any] = {}
if args:
if len(args) > 1:
raise TypeError("update expected at most 1 positional argument")
other = args[0]
if isinstance(other, Mapping):
mapping.update(other)
else:
# Assume an iterable of (key, value)
for k, v in other:
mapping[k] = v
if kwargs:
mapping.update(kwargs)
if mapping:
self._append_op(Op("update", (mapping,)))
def commit_history(self) -> OrderedDict[str, Op]:
"""Returns the commit history of the dict."""
return OrderedDict(zip(self._commit_keys, self._commit_values))
@property
def last_commit(self, ) -> str:
'''Get the last commit
'''
return self._commit_keys[-1] if self._commit_keys else None
def fork(self, commit: str):
if commit not in self._commit_keys:
raise Exception()
_commit_idx = self._commit_keys.index(commit)
_forked_mdict = MonotonicDict()
_forked_mdict._commit_keys = self._commit_keys[:_commit_idx + 1]
_forked_mdict._commit_values = self._commit_values[:_commit_idx + 1]
return _forked_mdict
def accept(self, incoming_data, from_node = None):
# Try Merge
merged_data = try_accept(self, incoming_data)
# Validate data sanity
is_sane = _check(merged_data, self, incoming_data)
if is_sane:
# Find new operations that were added
existing_commits = set(self._commit_keys)
new_ops = [
op for commit_id, op in zip(merged_data._commit_keys, merged_data._commit_values)
if commit_id not in existing_commits
]
self._commit_keys = merged_data._commit_keys
self._commit_values = merged_data._commit_values
# Trigger callbacks for the new operations
self._trigger_callbacks_for_ops(new_ops, src_node=from_node)
return is_sane
def merge(self, incoming_data, from_node = None):
# Track incoming operations
existing_commits = set(self._commit_keys)
incoming_ops = [
op for commit_id, op in zip(incoming_data._commit_keys, incoming_data._commit_values)
if commit_id not in existing_commits
]
# Try Merge
merged_data = {**self.to_dict(), **incoming_data.to_dict()}
merged_monotonic_dict = MonotonicDict()
merged_monotonic_dict._commit_keys = self._commit_keys
merged_monotonic_dict._commit_values = self._commit_values
merged_monotonic_dict._commit_keys += incoming_data._commit_keys
merged_monotonic_dict._commit_values += incoming_data._commit_values
merged_monotonic_dict._append_op(Op("update", (merged_data, )))
# Validate data sanity
is_sane = _check(merged_monotonic_dict, self, incoming_data)
if is_sane:
# Trigger callbacks for incoming operations and the final update
# self._trigger_callbacks_for_ops(incoming_ops, src_node=from_node)
self._trigger_callbacks_for_ops([Op("update", (merged_data, ))], src_node=from_node)
self._commit_keys = merged_monotonic_dict._commit_keys
self._commit_values = merged_monotonic_dict._commit_values
return is_sane
def __eq__(self, other: object) -> bool:
"""
Equality compares materialized states. Supports comparison with
regular dicts and other mappings.
"""
if isinstance(other, MonotonicDict):
return self.to_dict() == other.to_dict()
if isinstance(other, Mapping):
return self.to_dict() == dict(other)
return NotImplemented
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.to_dict()!r})"
def _trigger_callbacks_for_ops(self, ops: List[Op], src_node = None) -> None:
"""Trigger callbacks for a list of operations."""
for op in ops:
if op.kind == "set":
key, value = op.args
self._trigger_callbacks(key, value, "set", src_node=src_node)
elif op.kind == "del":
(key,) = op.args
self._trigger_callbacks(key, None, "del", src_node=src_node)
elif op.kind == "update":
(mapping,) = op.args
for key, value in mapping.items():
self._trigger_callbacks(key, value, "update", src_node=src_node)
elif op.kind == "clear":
state = self._materialize()
for key in state.keys():
self._trigger_callbacks(key, None, "clear", src_node=src_node)