Skip to content

node

BaseMeshNode

Base abstraction for a distributed mesh node.

Responsibilities: - Manage peer connections - Maintain replicated MonotonicDict state - Perform state synchronization

Source code in src/mesh/node.py
class BaseMeshNode:
    """
    Base abstraction for a distributed mesh node.

    Responsibilities:
    - Manage peer connections
    - Maintain replicated MonotonicDict state
    - Perform state synchronization
    """

    def __init__(self, name: Optional[str] = None, action_on_conflict = "warn"):
        self.name = name

        # Active websocket connections:
        # key   -> peer identifier (URL or client address string)
        # value -> WebSocketProtocol wrapper
        self.connections: Dict[str, WebSocketProtocol] = {}

        # Replicated state
        self.data = MonotonicDict()

        self.action_on_conflict = action_on_conflict

    def __repr__(self):
        return f"<Node {self.name} connections({list(self.connections.keys())})>"

    # ------------------------------------------------------------------
    # Synchronization Layer
    # ------------------------------------------------------------------

    async def sync_up(self, nodes_list: Optional[List[str]] = None):
        """
        Push local state to selected peers.

        If no list is provided, sync with all connected peers.
        """
        if not nodes_list:
            nodes_list = list(self.connections.keys())

        for node in nodes_list:
            await self.send(node, self.data)

    async def sync_up_recv(self, sender: str, incoming_data: MonotonicDict, action_on_conflict="warn"):
        """
        Handle incoming state and resolve commit differences.
        """
        analysis = analyze_commit_diff(incoming_data, self.data)

        if analysis.status == "same":
            return

        # List of peers to propagate changes to
        peers = list(self.connections.keys())

        if sender in peers:
            peers.remove(sender)

        if analysis.status == "ahead":
            # Remote state is ahead → adopt it
            self.data.accept(incoming_data, from_node=sender)

        elif analysis.status == "behind":
            # Remote state is behind → push ours back to sender
            peers.append(sender)

        else:
            # Divergent Commits
            match action_on_conflict:
                case "accept":
                    warnings.warn(analysis.message)
                    # logger.warning(analysis.message, source=self.name)
                    self.data.accept(incoming_data, from_node=sender)
                    peers.append(sender)
                case "merge":
                    warnings.warn(analysis.message)
                    # logger.warning(analysis.message, source=self.name)
                    self.data.merge(incoming_data, from_node=sender)
                    peers.append(sender)
                case "warn":
                    warnings.warn(analysis.message)
                case "exception":
                    raise Exception(analysis.message)
                case "ignore":
                    pass

        await self.sync_up(peers)

    # ------------------------------------------------------------------
    # Transport Layer
    # ------------------------------------------------------------------

    async def send(self, peer: str, data: MonotonicDict):
        """
        Send serialized state to a peer.
        """
        if peer not in self.connections:
            return

        ws = self.connections[peer]
        payload = serialize_monotonic_dict(data)

        try:
            await ws.send_text(payload)
        except Exception as ex:
            # Silent failure (unchanged behavior)
            pass

    async def recv(self, peer: str, raw_payload: str):
        """
        Receive serialized state from peer and trigger sync logic.
        """
        incoming_data = deserialize_monotonic_dict(raw_payload)
        await self.sync_up_recv(peer, incoming_data, action_on_conflict=self.action_on_conflict)

    # ------------------------------------------------------------------
    # Public Data API
    # ------------------------------------------------------------------

    async def put_data(self, key_value_pairs: Dict):
        """
        Insert/update key-value pairs and propagate changes.
        """
        for key, value in key_value_pairs.items():
            self.data[key] = value

        await self.sync_up()

    async def get_data(self, key, default=None):
        """
        Retrieve value after ensuring sync.
        """
        await self.sync_up()
        return self.data.get(key, default)

    async def pop_data(self, key, default=None):
        """
        Remove key if present and propagate change.
        """
        if key in self.data:
            value = self.data.pop(key)
            await self.sync_up()
            return value

        return default

get_data(key, default=None) async

Retrieve value after ensuring sync.

Source code in src/mesh/node.py
async def get_data(self, key, default=None):
    """
    Retrieve value after ensuring sync.
    """
    await self.sync_up()
    return self.data.get(key, default)

pop_data(key, default=None) async

Remove key if present and propagate change.

Source code in src/mesh/node.py
async def pop_data(self, key, default=None):
    """
    Remove key if present and propagate change.
    """
    if key in self.data:
        value = self.data.pop(key)
        await self.sync_up()
        return value

    return default

put_data(key_value_pairs) async

Insert/update key-value pairs and propagate changes.

Source code in src/mesh/node.py
async def put_data(self, key_value_pairs: Dict):
    """
    Insert/update key-value pairs and propagate changes.
    """
    for key, value in key_value_pairs.items():
        self.data[key] = value

    await self.sync_up()

recv(peer, raw_payload) async

Receive serialized state from peer and trigger sync logic.

Source code in src/mesh/node.py
async def recv(self, peer: str, raw_payload: str):
    """
    Receive serialized state from peer and trigger sync logic.
    """
    incoming_data = deserialize_monotonic_dict(raw_payload)
    await self.sync_up_recv(peer, incoming_data, action_on_conflict=self.action_on_conflict)

send(peer, data) async

Send serialized state to a peer.

Source code in src/mesh/node.py
async def send(self, peer: str, data: MonotonicDict):
    """
    Send serialized state to a peer.
    """
    if peer not in self.connections:
        return

    ws = self.connections[peer]
    payload = serialize_monotonic_dict(data)

    try:
        await ws.send_text(payload)
    except Exception as ex:
        # Silent failure (unchanged behavior)
        pass

sync_up(nodes_list=None) async

Push local state to selected peers.

If no list is provided, sync with all connected peers.

Source code in src/mesh/node.py
async def sync_up(self, nodes_list: Optional[List[str]] = None):
    """
    Push local state to selected peers.

    If no list is provided, sync with all connected peers.
    """
    if not nodes_list:
        nodes_list = list(self.connections.keys())

    for node in nodes_list:
        await self.send(node, self.data)

sync_up_recv(sender, incoming_data, action_on_conflict='warn') async

Handle incoming state and resolve commit differences.

Source code in src/mesh/node.py
async def sync_up_recv(self, sender: str, incoming_data: MonotonicDict, action_on_conflict="warn"):
    """
    Handle incoming state and resolve commit differences.
    """
    analysis = analyze_commit_diff(incoming_data, self.data)

    if analysis.status == "same":
        return

    # List of peers to propagate changes to
    peers = list(self.connections.keys())

    if sender in peers:
        peers.remove(sender)

    if analysis.status == "ahead":
        # Remote state is ahead → adopt it
        self.data.accept(incoming_data, from_node=sender)

    elif analysis.status == "behind":
        # Remote state is behind → push ours back to sender
        peers.append(sender)

    else:
        # Divergent Commits
        match action_on_conflict:
            case "accept":
                warnings.warn(analysis.message)
                # logger.warning(analysis.message, source=self.name)
                self.data.accept(incoming_data, from_node=sender)
                peers.append(sender)
            case "merge":
                warnings.warn(analysis.message)
                # logger.warning(analysis.message, source=self.name)
                self.data.merge(incoming_data, from_node=sender)
                peers.append(sender)
            case "warn":
                warnings.warn(analysis.message)
            case "exception":
                raise Exception(analysis.message)
            case "ignore":
                pass

    await self.sync_up(peers)

Node

Bases: BaseMeshNode

FastAPI-enabled mesh node.

Extends BaseMeshNode by: - Registering WebSocket route - Accepting inbound connections

Source code in src/mesh/node.py
class Node(BaseMeshNode):
    """
    FastAPI-enabled mesh node.

    Extends BaseMeshNode by:
    - Registering WebSocket route
    - Accepting inbound connections
    """

    def __init__(self, name=None, app=None, client=None, action_on_conflict="warn", endpoint="/mesh"):
        super().__init__(name=name, action_on_conflict=action_on_conflict)

        self.app = app
        self.client = client
        self.endpoint = endpoint

        # Register websocket endpoint if FastAPI app provided
        if self.app:
            self._register_routes()

    def _register_routes(self):
        """
        Register `/mesh` websocket endpoint for inbound connections.
        """

        @self.app.websocket(self.endpoint)
        async def websocket_endpoint(websocket: WebSocket):
            await websocket.accept()

            peer = str(websocket.client)
            self.connections[peer] = WebSocketProtocol(websocket)

            try:
                while True:
                    data = await websocket.receive_text()
                    await self.recv(peer, data)
            except Exception as ex:
                # Silent disconnect handling (unchanged behavior)
                pass

        @self.app.get("/mesh-info", tags=["mesh"])
        async def get_domain(request: Request):
            # Returns something like "https://example.com" or "http://localhost:8000/"
            base_url = str(request.base_url) 

            return {
                "name": self.name,
                "servers": self.app.servers,
                "version": version("mesh"),
                "action_on_conflict": self.action_on_conflict,
                "domain": request.base_url.hostname,
                "docs_url": urljoin(base_url, self.app.docs_url) if self.app.docs_url else None,
                "join_url": urljoin(base_url, self.endpoint),
            }

    # ------------------------------------------------------------------
    # Connection Management
    # ------------------------------------------------------------------

    async def join(self, urls: List[str], token: str = ""):
        """
        Connect to remote mesh nodes via WebSocket.

        Args:
            urls: List of websocket URLs
            token: (unused for now – reserved for auth)
        """
        for url in urls:
            ws = await websockets.connect(url)
            peer = Address(*ws.remote_address)
            # Store connection wrapper
            self.connections[str(peer)] = WebSocketProtocol(ws)

            # Spawn background listener task
            asyncio.create_task(self._listen(ws, str(peer)))

        await asyncio.sleep(1)

        await self.sync_up()

    async def _listen(self, ws, peer_id: str):
        """
        Background listener for an outgoing websocket connection.
        """
        try:
            async for message in ws:
                await self.recv(peer_id, message)
        except Exception as ex:
            # Silent failure (same logic as original)
            pass

join(urls, token='') async

Connect to remote mesh nodes via WebSocket.

Parameters:

Name Type Description Default
urls List[str]

List of websocket URLs

required
token str

(unused for now – reserved for auth)

''
Source code in src/mesh/node.py
async def join(self, urls: List[str], token: str = ""):
    """
    Connect to remote mesh nodes via WebSocket.

    Args:
        urls: List of websocket URLs
        token: (unused for now – reserved for auth)
    """
    for url in urls:
        ws = await websockets.connect(url)
        peer = Address(*ws.remote_address)
        # Store connection wrapper
        self.connections[str(peer)] = WebSocketProtocol(ws)

        # Spawn background listener task
        asyncio.create_task(self._listen(ws, str(peer)))

    await asyncio.sleep(1)

    await self.sync_up()