Message Bus Live Coding Exercise: Difference between revisions

From EDURange
Jump to navigationJump to search
Line 72: Line 72:
** Build the capture history helper.
** Build the capture history helper.


Estimated time: 1.5-3 hours (1-2 hours work, 0.5-1 hours setup)
Estimated time: 1-3 hours (1-2 hours work, 0.5-1 hours setup)


; Session 2 - Shell commands and milestones
; Session 2 - Shell commands and milestones
Line 655: Line 655:


The dictionary contains the information, but it does not document the shape of the event. A typo is discovered only when the field is used. A named event class makes the processor code easier to read and easier to explain.
The dictionary contains the information, but it does not document the shape of the event. A typo is discovered only when the field is used. A named event class makes the processor code easier to read and easier to explain.
Positional encoding like tuples and CSV-like records also suffer from this innate limitation. They are inherently detached from the structural contract of the record type; they describe the data they carry, not any particular schema or type contract to which it belongs.


== Phase 4: Persist the event record classes ==
== Phase 4: Persist the event record classes ==

Revision as of 16:48, 16 June 2026

Introduction

Audience and scope

This exercise is intended for EDURange developers learning how to write event processors with the message bus.

The packaged mbus and aostore modules are treated as read-only for this exercise. All new code belongs under a demonstration directory such as:

workspace/
    mbus/
    aostore/
    demos/
        __init__.py
        event_processors/
            __init__.py

Pre-release copies of mbus and aostore are available on the Discord fileshare channel. Modules may be released under other package names; see also the EDURange GitHub.

The exercise uses two problem families.

  • 1. Shell and TTY event processing
    • Send and receive messages through the bus.
    • Capture and inspect message history.
    • Represent high-level shell command events.
    • Recreate the regex-based milestone role in a simplified form.
    • Mock up TTY-derived observations such as keystroke timing.
  • 2. Graph event processing
    • Represent graph edge relationships as source events.
    • Derive direct path facts from edge relation events.
    • Add feedback by deriving longer paths from known paths.
    • Simulate nondeterministic processor scheduling while keeping source input fixed.

The shell examples draw from EDURange's institutional knowledge. The graph examples provide a common model problem family for meaningful use of feedback loops.

Running the examples

Adjust paths to match the local checkout layout if necessary. Start an interpreter from the workspace root with project packages available.

PYTHONPATH="$PWD/mbus:$PWD/aostore:$PWD" python3.14

When module files are changed, restart the interpreter unless the group is deliberately practicing importlib.reload().

Create the demonstration package

Before the first durable file edit, create the demonstration package directories:

mkdir -p demos/event_processors
touch demos/__init__.py
touch demos/event_processors/__init__.py

Important note about capture records

The current demonstration works close to the low-level capture layer. As a result, it exposes MessageSymbolRegistration records while building the temporary history helper.

That is not the intended ordinary user experience. In a more complete release, higher-level capture and history infrastructure should handle symbol registration bookkeeping on behalf of processor authors. In this exercise, symbol registration records are visible because the demonstration history helper reconstructs readable messages from compact captured records directly.

Suggested group session flow

Session 1 - Bus and capture basics
  • Phase 1:
    • Signal over the bus by hand.
    • Inspect received messages.
    • Inspect low-level captured records.
  • Phase 2:
    • Discuss why symbol registration appears in this temporary demo.
    • Build the capture history helper.

Estimated time: 1-3 hours (1-2 hours work, 0.5-1 hours setup)

Session 2 - Shell commands and milestones
  • Phase 3:
    • Represent a shell command as a dictionary.
  • Phase 4:
    • Replace it with ShellCommandEvent.
  • Phase 5:
    • Manually evaluate milestone expressions.
  • Phase 6:
    • Implement RegexMilestoneProcessor.
    • Inspect source and analysis events in history.

Estimated time: 1.5-3 hours (1.5-2.5 hours work, 0-0.5 hours setup)

Session 3 - TTY-derived observations
  • Phase 7:
    • Create raw TTY read, write, and line events.
    • Show why milestones need reconstructed shell command events.
  • Phase 8:
    • Implement keystroke timing.
    • Discuss shell command reconstruction as the next derived-event problem (to be continued in Session 6 onward).

Estimated time: 1-2.5 hours (1-2 hours work, 0-0.5 hours setup)

Session 4 - Graph-derived facts
  • Phase 9:
    • Define graph edges and paths.
  • Phase 10:
    • Add the demo-only nonblocking receive helper.
  • Phase 11:
    • Implement direct path derivation.
  • Phase 12:
    • Explore the extension rule by hand.
  • Phase 13:
    • Add path extension feedback.
    • Discuss duplicate prevention.

Estimated time: 1.5-3.5 hours (1.5-3 hours work, 0-0.5 hours setup)

Session 5 - Nondeterministic processor scheduling
  • Phase 14:
    • Add the randomized processor runner.
  • Phase 15:
    • Keep source events fixed.
    • Vary processor scheduling by seed.
    • Compare final path facts across runs.

Estimated time: 1.5-3.5 hours (1.5-3 hours work, 0-0.5 hours setup)

Session 6 - Follow-up refinements
  • Phase 16:
    • Rebuild a graph index from history.
  • Phase 17:
    • Implement shell command reconstruction.
    • Replace local Python payload assumptions with explicit payload projection formats when the bus format layer is ready for the exercise.

Estimated time: 1.5-4.5 hours (1.5-4 hours work, 0-0.5 hours setup)


Event Processor Demonstration Plan

Phase 1: Signal over the bus by hand

Goal

Send one message through the direct bus and inspect what the receiver sees.

Discussion points
  • A bus is useful only when something emits and something receives.
  • Public message setup uses readable strings: topic, producer, and message type.
  • The direct broker delivers local Python payloads in this demonstration.
  • Capture must be attached before delivery in the current prototype.
Interpreter exploration

Import the bus and capture interfaces:

from dataclasses import dataclass, field

from mbus.broker.direct import DirectMessageBus
from mbus.capture.sink import CaptureSink
from mbus.message.records import CapturedMessage
from mbus.message.symbols import MessageSymbolRegistration

Create a temporary capture sink in the interpreter. This sink stores whatever the bus sends to the capture layer:

CapturedRecord = MessageSymbolRegistration | CapturedMessage


@dataclass
class ListCaptureSink(CaptureSink):
    records: list[CapturedRecord] = field(default_factory=list)

    def capture_symbol_registration(
        self,
        registration: MessageSymbolRegistration,
    ) -> None:
        self.records.append(registration)

    def capture(self, message: CapturedMessage) -> None:
        self.records.append(message)

Create the bus and attach capture:

bus = DirectMessageBus()
sink = ListCaptureSink()
bus.set_capture_sink(sink)

Inspect the empty capture sink:

sink
sink.records
len(sink.records)

Expected output shape:

ListCaptureSink(records=[])
[]
0

Create a receiver. This describes the messages the receiver wants to hear:

receiver = bus.subscribe(
    msg_topic="demo.signals",
    msg_producer="demo-source",
    msg_type="ping",
)

Inspect the receiver and confirm that capture is still empty:

type(receiver)
vars(receiver)
sink.records

Expected output shape:

<class 'mbus.broker.direct._BrokerReceiver'>
{'_queue': ...}
[]

Create an emitter. Registering the emitter gives the bus enough information to bind readable names to compact internal IDs:

emitter = bus.register_emitter(
    msg_topic="demo.signals",
    msg_producer="demo-source",
    default_msg_type="ping",
)

Inspect the captured registration records:

sink.records
[type(record).__name__ for record in sink.records]

for record in sink.records:
    print(record)

Expected output shape:

['MessageSymbolRegistration', 'MessageSymbolRegistration', 'MessageSymbolRegistration']
MessageSymbolRegistration(... symbol='demo.signals' ...)
MessageSymbolRegistration(... symbol='ping' ...)
MessageSymbolRegistration(... symbol='demo-source' ...)

Emit one message:

emitter.emit({"text": "hello bus"})

Inspect capture before receiving:

len(sink.records)
[type(record).__name__ for record in sink.records]
sink.records[-1]

Expected output shape:

4
['MessageSymbolRegistration', 'MessageSymbolRegistration', 'MessageSymbolRegistration', 'CapturedMessage']
CapturedMessage(... payload={'text': 'hello bus'} ...)

Receive the message:

message = receiver.receive()

message
message.msg_topic
message.msg_producer
message.msg_type
message.payload

Expected output shape:

ReceivedMessage(...)
'demo.signals'
'demo-source'
'ping'
{'text': 'hello bus'}

Compare the received message with the captured record:

captured = sink.records[-1]

captured
captured.msg_topic_id
captured.msg_type_id
captured.msg_producer_id
captured.payload

Expected output shape:

CapturedMessage(...)
TopicID(...)
MessageTypeID(...)
ProducerID(...)
{'text': 'hello bus'}
What this phase shows

The receiver gets the readable message view. The capture sink sees compact IDs and local payload data. The readable names are still available through symbol registration records, but manually reconstructing them is awkward.

That awkwardness motivates the next phase.

Phase 2: Build a temporary capture history helper

Goal

Create a small demonstration helper that turns captured records back into readable history messages.

Discussion points
  • Capture records are useful, but raw captured messages use compact IDs.
  • A history helper can rebuild readable message history from registration records.
  • This is demonstration infrastructure, not the final persistence/query API.
  • The helper writes captured records into an in-memory aostore sequential unit.
Interpreter exploration before the file edit

List only the captured messages:

captured_messages = []

for record in sink.records:
    if isinstance(record, CapturedMessage):
        captured_messages.append(record)

captured_messages

Expected output shape:

[CapturedMessage(... payload={'text': 'hello bus'} ...)]

Inspect the first captured message:

captured_message = captured_messages[0]

captured_message.msg_topic_id
captured_message.msg_type_id
captured_message.msg_producer_id

Expected output shape:

TopicID(...)
MessageTypeID(...)
ProducerID(...)

Manually reconstruct a topic name:

topic_names = {}

for record in sink.records:
    if isinstance(record, MessageSymbolRegistration):
        if record.symbol_kind.name == "TOPIC":
            topic_names[record.symbol_id] = record.symbol

topic_names
topic_names[captured_message.msg_topic_id]

Expected output shape:

{TopicID(...): 'demo.signals'}
'demo.signals'

This works, but repeating it by hand would distract from writing event processors.

File edit

Create:

demos/event_processors/capture_history.py
#!/usr/bin/env python3
# demos/event_processors/capture_history.py

"""Capture and history helpers for event processor demonstrations."""

from collections.abc import Iterable
from dataclasses import dataclass, field

from aostore.sequentialunit.listsequentialunit import ListSequentialUnit
from mbus.capture.sink import CaptureSink
from mbus.message.records import CapturedMessage
from mbus.message.symbols import (
    MessageSymbolKind,
    MessageSymbolRegistration,
    MessageTypeID,
    ProducerID,
    TopicID,
)

CapturedRecord = MessageSymbolRegistration | CapturedMessage


@dataclass
class AOStoreCaptureSink(CaptureSink):
    records: ListSequentialUnit[CapturedRecord] = field(
        default_factory=ListSequentialUnit
    )

    def capture_symbol_registration(
        self,
        registration: MessageSymbolRegistration,
    ) -> None:
        self.records.append(registration)

    def capture(self, message: CapturedMessage) -> None:
        self.records.append(message)

    def all_records(self) -> tuple[CapturedRecord, ...]:
        records = self.records.sequential_read(0, len(self.records))
        result = tuple(records)
        return result


@dataclass(frozen=True, kw_only=True)
class HistoryMessage:
    msg_topic: str
    msg_type: str
    msg_producer: str
    payload: object
    bus_sequence: int
    topic_sequence: int


class CaptureHistory:
    _records: tuple[CapturedRecord, ...]
    _topics: dict[TopicID, str]
    _types: dict[MessageTypeID, str]
    _producers: dict[ProducerID, str]

    def __init__(self, records: Iterable[CapturedRecord]) -> None:
        self._records = tuple(records)
        self._topics = {}
        self._types = {}
        self._producers = {}
        self._index_symbols()

    def _index_symbols(self) -> None:
        for record in self._records:
            if isinstance(record, MessageSymbolRegistration):
                self._record_symbol(record)

    def _record_symbol(self, record: MessageSymbolRegistration) -> None:
        if record.symbol_kind is MessageSymbolKind.TOPIC:
            self._topics[record.symbol_id] = record.symbol
        elif record.symbol_kind is MessageSymbolKind.MSG_TYPE:
            self._types[record.symbol_id] = record.symbol
        elif record.symbol_kind is MessageSymbolKind.PRODUCER:
            self._producers[record.symbol_id] = record.symbol

    def messages(self) -> list[HistoryMessage]:
        messages = []
        for record in self._records:
            if isinstance(record, CapturedMessage):
                message = self._decode_message(record)
                messages.append(message)
        return messages

    def by_topic(self, msg_topic: str) -> list[HistoryMessage]:
        messages = []
        for message in self.messages():
            if message.msg_topic == msg_topic:
                messages.append(message)
        return messages

    def since(self, bus_sequence: int) -> list[HistoryMessage]:
        messages = []
        for message in self.messages():
            if message.bus_sequence >= bus_sequence:
                messages.append(message)
        return messages

    def _decode_message(self, message: CapturedMessage) -> HistoryMessage:
        history_message = HistoryMessage(
            msg_topic=self._topics[message.msg_topic_id],
            msg_type=self._types[message.msg_type_id],
            msg_producer=self._producers[message.msg_producer_id],
            payload=message.payload,
            bus_sequence=message.bus_sequence,
            topic_sequence=message.topic_sequence,
        )
        return history_message
Interpreter check

Restart the interpreter, then run:

from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
    AOStoreCaptureSink,
    CaptureHistory,
)

bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)

receiver = bus.subscribe(
    msg_topic="demo.signals",
    msg_producer="demo-source",
    msg_type="ping",
)

emitter = bus.register_emitter(
    msg_topic="demo.signals",
    msg_producer="demo-source",
    default_msg_type="ping",
)

emitter.emit({"text": "history helper"})
message = receiver.receive()

history = CaptureHistory(sink.all_records())
history.messages()

Expected output shape:

[HistoryMessage(msg_topic='demo.signals', msg_type='ping', msg_producer='demo-source', payload={'text': 'history helper'}, ...)]

Inspect the decoded history:

history_message = history.messages()[0]

history_message.msg_topic
history_message.msg_type
history_message.msg_producer
history_message.payload

Expected output:

'demo.signals'
'ping'
'demo-source'
{'text': 'history helper'}

Phase 3: Explore shell command data by hand

Goal

Identify the high-level event shape needed by a milestone processor.

Discussion points
  • The old milestone strategy operated on command-level records.
  • A command-level record includes context such as host and working directory.
  • It also includes shell input and shell output.
  • Raw TTY events do not directly have this shape; reconstruction comes later.
Interpreter exploration

Represent a shell command as a dictionary:

command = {
    "session_id": "session-1",
    "command_index": 0,
    "host": "alpha",
    "cwd": "/home/student",
    "input_text": "ls -la\n",
    "output_text": "total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
    "started_at_ns": 1000,
    "ended_at_ns": 2000,
}

Inspect the fields used by milestone checks:

command["host"]
command["cwd"]
command["input_text"]
command["output_text"]

Expected output:

'alpha'
'/home/student'
'ls -la\n'
'total 12\n-rw-r--r-- 1 student student 0 notes.txt\n'

Try an accidental wrong key:

command["working_directory"]

Expected output:

KeyError: 'working_directory'
What this phase shows

The dictionary contains the information, but it does not document the shape of the event. A typo is discovered only when the field is used. A named event class makes the processor code easier to read and easier to explain.

Positional encoding like tuples and CSV-like records also suffer from this innate limitation. They are inherently detached from the structural contract of the record type; they describe the data they carry, not any particular schema or type contract to which it belongs.

Phase 4: Persist the event record classes

Goal

Create reusable payload classes for the TTY, shell command, and milestone examples.

File edit

Create:

demos/event_processors/events.py
#!/usr/bin/env python3
# demos/event_processors/events.py

"""Payload records for event processor demonstrations."""

from dataclasses import dataclass
from typing import Literal


@dataclass(frozen=True, kw_only=True)
class TTYReadEvent:
    session_id: str
    source_index: int
    timestamp_ns: int
    data: bytes


@dataclass(frozen=True, kw_only=True)
class TTYWriteEvent:
    session_id: str
    source_index: int
    timestamp_ns: int
    data: bytes


@dataclass(frozen=True, kw_only=True)
class LineDisciplineEvent:
    session_id: str
    source_index: int
    timestamp_ns: int
    line: str


@dataclass(frozen=True, kw_only=True)
class KeystrokeTimingEvent:
    session_id: str
    source_index: int
    timestamp_ns: int
    previous_timestamp_ns: int | None
    delta_ns: int | None
    text: str


@dataclass(frozen=True, kw_only=True)
class ShellCommandEvent:
    session_id: str
    command_index: int
    host: str
    cwd: str
    input_text: str
    output_text: str
    started_at_ns: int
    ended_at_ns: int


RegexMilestoneField = Literal[
    "host",
    "cwd",
    "input_text",
    "output_text",
]


@dataclass(frozen=True, kw_only=True)
class RegexMilestoneExpression:
    name: str
    field: RegexMilestoneField
    pattern: str


@dataclass(frozen=True, kw_only=True)
class RegexMilestoneResult:
    expression_name: str
    field: RegexMilestoneField
    matched: bool
    match_text: str | None = None


@dataclass(frozen=True, kw_only=True)
class RegexMilestoneAnalysisEvent:
    session_id: str
    command_index: int
    results: tuple[RegexMilestoneResult, ...]
    matched_count: int
    expression_count: int
Interpreter check

Restart the interpreter, then run:

from demos.event_processors.events import ShellCommandEvent

command = ShellCommandEvent(
    session_id="session-1",
    command_index=0,
    host="alpha",
    cwd="/home/student",
    input_text="ls -la\n",
    output_text="total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
    started_at_ns=1000,
    ended_at_ns=2000,
)

command
command.host
command.cwd
command.input_text
command.output_text

Expected output shape:

ShellCommandEvent(session_id='session-1', command_index=0, ...)
'alpha'
'/home/student'
'ls -la\n'
'total 12\n-rw-r--r-- 1 student student 0 notes.txt\n'
What this phase shows

The event shape is now explicit. Other code can depend on a named ShellCommandEvent instead of assuming that a loose dictionary has the expected fields.

Phase 5: Evaluate milestones manually

Goal

Recreate the core milestone check in a small, transparent form.

Discussion points
  • A milestone expression names one check against one shell command field.
  • The field may be command input, command output, working directory, or host context.
  • The result records whether the expression matched and, when available, the matched text.
  • Manual evaluation is useful for learning the shape of the problem before writing a processor.
Interpreter exploration

Import regular expressions and the payload classes:

import re

from demos.event_processors.events import (
    RegexMilestoneExpression,
    ShellCommandEvent,
)

Create one command event:

command = ShellCommandEvent(
    session_id="session-1",
    command_index=0,
    host="alpha",
    cwd="/home/student",
    input_text="ls -la\n",
    output_text="total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
    started_at_ns=1000,
    ended_at_ns=2000,
)

Create three milestone expressions:

expressions = (
    RegexMilestoneExpression(
        name="used-ls",
        field="input_text",
        pattern=r"\bls\b",
    ),
    RegexMilestoneExpression(
        name="saw-notes",
        field="output_text",
        pattern=r"notes\.txt",
    ),
    RegexMilestoneExpression(
        name="home-directory",
        field="cwd",
        pattern=r"^/home/student$",
    ),
)

Evaluate one expression:

expression = expressions[0]
text = getattr(command, expression.field)
match = re.search(expression.pattern, text)

expression.name
expression.field
text
match.group(0)

Expected output:

'used-ls'
'input_text'
'ls -la\n'
'ls'

Evaluate all expressions:

manual_results = []

for expression in expressions:
    text = getattr(command, expression.field)
    match = re.search(expression.pattern, text)
    manual_results.append((expression.name, match is not None))

manual_results

Expected output:

[('used-ls', True), ('saw-notes', True), ('home-directory', True)]
What this phase shows

Manual evaluation is simple for one command, but the loop is already event processor logic. The next phase preserves the loop in a reusable processor.

Phase 6: Persist the RegexMilestone processor

Goal

Create a processor that receives shell command events and emits regex milestone analysis events.

File edit

Create:

demos/event_processors/regex_milestones.py
#!/usr/bin/env python3
# demos/event_processors/regex_milestones.py

"""Regex milestone processor for shell command events."""

import re

from mbus.broker.direct import DirectMessageBus
from mbus.broker.endpoints import Emitter, Receiver

from demos.event_processors.events import (
    RegexMilestoneAnalysisEvent,
    RegexMilestoneExpression,
    RegexMilestoneResult,
    ShellCommandEvent,
)


class RegexMilestoneProcessor:
    _receiver: Receiver
    _emitter: Emitter
    _expressions: tuple[RegexMilestoneExpression, ...]

    def __init__(
        self,
        bus: DirectMessageBus,
        expressions: tuple[RegexMilestoneExpression, ...],
    ) -> None:
        self._receiver = bus.subscribe(
            msg_topic="demo.shell.command",
            msg_producer="shell-reconstructor",
            msg_type="shell-command",
        )
        self._emitter = bus.register_emitter(
            msg_topic="demo.shell.regex_milestone",
            msg_producer="regex-milestone-processor",
            default_msg_type="regex-milestone-analysis",
        )
        self._expressions = expressions

    def process_one(self) -> RegexMilestoneAnalysisEvent:
        message = self._receiver.receive()
        command = message.payload

        if not isinstance(command, ShellCommandEvent):
            raise TypeError(
                f"expected ShellCommandEvent, got {type(command).__name__}"
            )

        analysis = analyze_regex_milestones(command, self._expressions)
        self._emitter.emit(analysis)
        return analysis


def analyze_regex_milestones(
    command: ShellCommandEvent,
    expressions: tuple[RegexMilestoneExpression, ...],
) -> RegexMilestoneAnalysisEvent:
    results = []
    for expression in expressions:
        result = evaluate_regex_milestone(command, expression)
        results.append(result)

    matched_count = sum(result.matched for result in results)
    analysis = RegexMilestoneAnalysisEvent(
        session_id=command.session_id,
        command_index=command.command_index,
        results=tuple(results),
        matched_count=matched_count,
        expression_count=len(expressions),
    )
    return analysis


def evaluate_regex_milestone(
    command: ShellCommandEvent,
    expression: RegexMilestoneExpression,
) -> RegexMilestoneResult:
    text = getattr(command, expression.field)
    match = re.search(expression.pattern, text)
    match_text = None

    if match is not None:
        match_text = match.group(0)

    result = RegexMilestoneResult(
        expression_name=expression.name,
        field=expression.field,
        matched=match is not None,
        match_text=match_text,
    )
    return result
Interpreter check

Restart the interpreter, then set up the bus:

from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
    AOStoreCaptureSink,
    CaptureHistory,
)
from demos.event_processors.events import (
    RegexMilestoneExpression,
    ShellCommandEvent,
)
from demos.event_processors.regex_milestones import RegexMilestoneProcessor

bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)

Subscribe to the analysis topic before running the processor:

analysis_receiver = bus.subscribe(
    msg_topic="demo.shell.regex_milestone",
    msg_producer="regex-milestone-processor",
    msg_type="regex-milestone-analysis",
)

Create the shell command emitter and processor:

command_emitter = bus.register_emitter(
    msg_topic="demo.shell.command",
    msg_producer="shell-reconstructor",
    default_msg_type="shell-command",
)

expressions = (
    RegexMilestoneExpression(
        name="used-ls",
        field="input_text",
        pattern=r"\bls\b",
    ),
    RegexMilestoneExpression(
        name="saw-notes",
        field="output_text",
        pattern=r"notes\.txt",
    ),
    RegexMilestoneExpression(
        name="home-directory",
        field="cwd",
        pattern=r"^/home/student$",
    ),
)

processor = RegexMilestoneProcessor(bus, expressions)

Emit a shell command event:

command = ShellCommandEvent(
    session_id="session-1",
    command_index=0,
    host="alpha",
    cwd="/home/student",
    input_text="ls -la\n",
    output_text="total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
    started_at_ns=1000,
    ended_at_ns=2000,
)

command_emitter.emit(command)

Process the command:

analysis = processor.process_one()
analysis
analysis.results
analysis.matched_count
analysis.expression_count

Expected output shape:

RegexMilestoneAnalysisEvent(... matched_count=3, expression_count=3)
(RegexMilestoneResult(...), RegexMilestoneResult(...), RegexMilestoneResult(...))
3
3

Receive the derived analysis event:

analysis_message = analysis_receiver.receive()

analysis_message.msg_topic
analysis_message.msg_type
analysis_message.msg_producer
analysis_message.payload

Expected output shape:

'demo.shell.regex_milestone'
'regex-milestone-analysis'
'regex-milestone-processor'
RegexMilestoneAnalysisEvent(... matched_count=3, expression_count=3)

Inspect history:

history = CaptureHistory(sink.all_records())
messages = history.messages()

for message in messages:
    print(message.bus_sequence, message.msg_topic, message.msg_type)

Expected output shape:

0 demo.shell.command shell-command
1 demo.shell.regex_milestone regex-milestone-analysis
What this phase shows

The regex milestone processor consumes one event and emits another. The derived analysis event is available to any later subscriber. History contains both the source command event and the derived analysis event.

Phase 7: Explore why shell reconstruction is separate

Goal

Show that raw TTY observations are not the same thing as high-level shell command events.

Discussion points
  • Regex milestones should inspect high-level command events.
  • TTY reads, TTY writes, and line-discipline lines are lower-level observations.
  • A shell command event is reconstructed by correlating those observations.
  • Keeping reconstruction separate prevents the milestone processor from becoming a large TTY parser.
Interpreter exploration

Create a few low-level observations:

from demos.event_processors.events import (
    LineDisciplineEvent,
    TTYReadEvent,
    TTYWriteEvent,
)

read_1 = TTYReadEvent(
    session_id="session-1",
    source_index=0,
    timestamp_ns=1000,
    data=b"l",
)

read_2 = TTYReadEvent(
    session_id="session-1",
    source_index=1,
    timestamp_ns=1100,
    data=b"s",
)

line = LineDisciplineEvent(
    session_id="session-1",
    source_index=2,
    timestamp_ns=1200,
    line="ls\n",
)

write = TTYWriteEvent(
    session_id="session-1",
    source_index=3,
    timestamp_ns=1600,
    data=b"notes.txt\n",
)

Inspect the raw pieces:

read_1.data
read_2.data
line.line
write.data

Expected output:

b'l'
b's'
'ls\n'
b'notes.txt\n'

Assemble command fields manually:

input_text = line.line
output_text = write.data.decode("utf-8", errors="replace")

input_text
output_text

Expected output:

'ls\n'
'notes.txt\n'
What this phase shows

The useful command event is derived from several lower-level observations. A later shell reconstructor can subscribe to line-discipline events, query recent reads/writes/timing observations, and emit ShellCommandEvent records.

Phase 8: Add keystroke timing as a smaller TTY-derived processor

Goal

Create a simple stateful processor before attempting full shell reconstruction.

Discussion points
  • Timing requires comparing a TTY read with the previous read in the same session.
  • A derived timing event makes the result explicit.
  • Other processors can consume timing events instead of recalculating deltas.
Interpreter exploration before the file edit

Calculate timing manually from the two read events:

read_events = [read_1, read_2]
previous_timestamp = None
timing_rows = []

for event in read_events:
    delta = None
    if previous_timestamp is not None:
        delta = event.timestamp_ns - previous_timestamp
    timing_rows.append((event.source_index, previous_timestamp, delta))
    previous_timestamp = event.timestamp_ns

timing_rows

Expected output:

[(0, None, None), (1, 1000, 100)]
File edit

Create:

demos/event_processors/keystroke_timing.py
#!/usr/bin/env python3
# demos/event_processors/keystroke_timing.py

"""Keystroke timing processor for TTY read events."""

from mbus.broker.direct import DirectMessageBus
from mbus.broker.endpoints import Emitter, Receiver

from demos.event_processors.events import (
    KeystrokeTimingEvent,
    TTYReadEvent,
)


class KeystrokeTimingProcessor:
    _receiver: Receiver
    _emitter: Emitter
    _last_timestamp_by_session: dict[str, int]

    def __init__(self, bus: DirectMessageBus) -> None:
        self._receiver = bus.subscribe(
            msg_topic="demo.tty.read",
            msg_producer="tty-source",
            msg_type="tty-read",
        )
        self._emitter = bus.register_emitter(
            msg_topic="demo.keystroke.timing",
            msg_producer="keystroke-timing",
            default_msg_type="keystroke-timing",
        )
        self._last_timestamp_by_session = {}

    def process_one(self) -> KeystrokeTimingEvent:
        message = self._receiver.receive()
        event = message.payload

        if not isinstance(event, TTYReadEvent):
            raise TypeError(f"expected TTYReadEvent, got {type(event).__name__}")

        previous_timestamp = self._last_timestamp_by_session.get(
            event.session_id
        )
        delta = None

        if previous_timestamp is not None:
            delta = event.timestamp_ns - previous_timestamp

        self._last_timestamp_by_session[event.session_id] = event.timestamp_ns

        text = event.data.decode("utf-8", errors="replace")
        timing = KeystrokeTimingEvent(
            session_id=event.session_id,
            source_index=event.source_index,
            timestamp_ns=event.timestamp_ns,
            previous_timestamp_ns=previous_timestamp,
            delta_ns=delta,
            text=text,
        )
        self._emitter.emit(timing)
        return timing
Interpreter check

Restart the interpreter, then run:

from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import AOStoreCaptureSink
from demos.event_processors.events import TTYReadEvent
from demos.event_processors.keystroke_timing import KeystrokeTimingProcessor

bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)

timing_receiver = bus.subscribe(
    msg_topic="demo.keystroke.timing",
    msg_producer="keystroke-timing",
    msg_type="keystroke-timing",
)

read_emitter = bus.register_emitter(
    msg_topic="demo.tty.read",
    msg_producer="tty-source",
    default_msg_type="tty-read",
)

processor = KeystrokeTimingProcessor(bus)

Emit reads and process them:

read_emitter.emit(
    TTYReadEvent(
        session_id="session-1",
        source_index=0,
        timestamp_ns=1000,
        data=b"l",
    )
)

read_emitter.emit(
    TTYReadEvent(
        session_id="session-1",
        source_index=1,
        timestamp_ns=1100,
        data=b"s",
    )
)

processor.process_one()
processor.process_one()

Expected output shape:

KeystrokeTimingEvent(... previous_timestamp_ns=None, delta_ns=None, text='l')
KeystrokeTimingEvent(... previous_timestamp_ns=1000, delta_ns=100, text='s')

Receive timing events:

timing_receiver.receive().payload
timing_receiver.receive().payload

Expected output shape:

KeystrokeTimingEvent(... source_index=0 ...)
KeystrokeTimingEvent(... source_index=1 ...)

Phase 9: Introduce graph events

Goal

Switch to a small graph problem that can demonstrate safe feedback loops in an illustratable form.

Discussion points
  • An edge is a source fact.
  • A path is a derived fact.
  • A direct edge implies a direct path.
  • Longer paths can be derived later from known paths and edges.
File edit

Create:

demos/event_processors/graph_events.py
#!/usr/bin/env python3
# demos/event_processors/graph_events.py

"""Graph event payloads for message bus demonstrations."""

from dataclasses import dataclass


@dataclass(frozen=True, kw_only=True)
class EdgeDeclaredEvent:
    graph_id: str
    source: str
    target: str


@dataclass(frozen=True, kw_only=True)
class PathKnownEvent:
    graph_id: str
    source: str
    target: str
    hop_count: int

    def fact_key(self) -> tuple[str, str, str]:
        result = (self.graph_id, self.source, self.target)
        return result
Interpreter exploration
from demos.event_processors.graph_events import (
    EdgeDeclaredEvent,
    PathKnownEvent,
)

edge = EdgeDeclaredEvent(
    graph_id="demo-graph",
    source="A",
    target="B",
)

path = PathKnownEvent(
    graph_id=edge.graph_id,
    source=edge.source,
    target=edge.target,
    hop_count=1,
)

edge
path
path.fact_key()

Expected output shape:

EdgeDeclaredEvent(graph_id='demo-graph', source='A', target='B')
PathKnownEvent(graph_id='demo-graph', source='A', target='B', hop_count=1)
('demo-graph', 'A', 'B')

Phase 10: Create a demo-only nonblocking receive helper

Goal

Prepare for processor scheduling without editing mbus.

Discussion points
  • Receiver.receive() blocks until a message is available.
  • A scheduler needs to try a processor and move on if no message is waiting.
  • The current package does not expose a public nonblocking receiver method.
  • This helper is local demonstration code. It can be replaced by a real public API later.
File edit

Create:

demos/event_processors/demo_receive.py
#!/usr/bin/env python3
# demos/event_processors/demo_receive.py

"""Demo-only helpers for receiver inspection and scheduling."""

from queue import Empty

from mbus.broker.endpoints import Receiver
from mbus.message.records import ReceivedMessage


class DemoReceiverError(RuntimeError):
    pass


def try_receive(receiver: Receiver) -> ReceivedMessage | None:
    queue = getattr(receiver, "_queue", None)

    if queue is None:
        raise DemoReceiverError(
            "demo try_receive requires the current direct broker receiver"
        )

    try:
        message = queue.get_nowait()
    except Empty:
        message = None

    return message
Interpreter check

Restart the interpreter, then run:

from mbus.broker.direct import DirectMessageBus
from demos.event_processors.demo_receive import try_receive

bus = DirectMessageBus()

receiver = bus.subscribe(
    msg_topic="demo.signals",
    msg_producer="demo-source",
    msg_type="ping",
)

try_receive(receiver)

Expected output:

None

Now emit a message:

from demos.event_processors.capture_history import AOStoreCaptureSink

sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)

emitter = bus.register_emitter(
    msg_topic="demo.signals",
    msg_producer="demo-source",
    default_msg_type="ping",
)

emitter.emit({"text": "demo nonblocking receive"})
message = try_receive(receiver)

message
message.payload

Expected output shape:

ReceivedMessage(...)
{'text': 'demo nonblocking receive'}

Phase 11: Create the direct path processor

Goal

Implement the graph rule:

edge(A, B) -> path(A, B)
Discussion points
  • The source emits edge events.
  • The direct path processor emits derived path events.
  • Other processors can subscribe to derived path events.
  • The processor can be run by hand or by a scheduler.
File edit

Create:

demos/event_processors/graph_reachability.py
#!/usr/bin/env python3
# demos/event_processors/graph_reachability.py

"""Graph reachability processors for message bus demonstrations."""

from mbus.broker.direct import DirectMessageBus
from mbus.broker.endpoints import Emitter, Receiver

from demos.event_processors.demo_receive import try_receive
from demos.event_processors.graph_events import (
    EdgeDeclaredEvent,
    PathKnownEvent,
)


class DirectPathProcessor:
    _receiver: Receiver
    _emitter: Emitter
    _known_paths: set[tuple[str, str, str]]

    def __init__(self, bus: DirectMessageBus) -> None:
        self._receiver = bus.subscribe(
            msg_topic="demo.graph.edge",
            msg_producer="graph-source",
            msg_type="edge-declared",
        )
        self._emitter = bus.register_emitter(
            msg_topic="demo.graph.path",
            msg_producer="direct-path",
            default_msg_type="path-known",
        )
        self._known_paths = set()

    def process_one(self) -> PathKnownEvent | None:
        message = self._receiver.receive()
        result = self._process_payload(message.payload)
        return result

    def process_available(self) -> int:
        message = try_receive(self._receiver)

        if message is None:
            work_count = 0
        else:
            self._process_payload(message.payload)
            work_count = 1

        return work_count

    def _process_payload(self, payload: object) -> PathKnownEvent | None:
        if not isinstance(payload, EdgeDeclaredEvent):
            raise TypeError(
                f"expected EdgeDeclaredEvent, got {type(payload).__name__}"
            )

        path = PathKnownEvent(
            graph_id=payload.graph_id,
            source=payload.source,
            target=payload.target,
            hop_count=1,
        )

        if path.fact_key() not in self._known_paths:
            self._known_paths.add(path.fact_key())
            self._emitter.emit(path)
            result = path
        else:
            result = None

        return result
Interpreter check

Restart the interpreter, then run:

from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import AOStoreCaptureSink
from demos.event_processors.graph_events import EdgeDeclaredEvent
from demos.event_processors.graph_reachability import DirectPathProcessor

bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)

path_receiver = bus.subscribe(
    msg_topic="demo.graph.path",
    msg_producer="direct-path",
    msg_type="path-known",
)

edge_emitter = bus.register_emitter(
    msg_topic="demo.graph.edge",
    msg_producer="graph-source",
    default_msg_type="edge-declared",
)

processor = DirectPathProcessor(bus)

Call the processor before any edge exists:

processor.process_available()

Expected output:

0

Emit one edge:

edge_emitter.emit(
    EdgeDeclaredEvent(
        graph_id="demo-graph",
        source="A",
        target="B",
    )
)

Process the edge:

processor.process_available()

Expected output:

1

Receive the derived path:

path_message = path_receiver.receive()
path_message.payload

Expected output:

PathKnownEvent(graph_id='demo-graph', source='A', target='B', hop_count=1)

Phase 12: Explore the path extension rule by hand

Goal

Understand the feedback rule before implementing it.

Discussion points
  • If A reaches B, and there is an edge from B to C, then A reaches C.
  • Derived path facts can lead to more derived path facts.
  • Duplicate prevention keeps the feedback loop from repeating the same fact forever.
Interpreter exploration
from demos.event_processors.graph_events import (
    EdgeDeclaredEvent,
    PathKnownEvent,
)

edge_bc = EdgeDeclaredEvent(
    graph_id="demo-graph",
    source="B",
    target="C",
)

path_ab = PathKnownEvent(
    graph_id="demo-graph",
    source="A",
    target="B",
    hop_count=1,
)

Check whether the path and edge connect:

path_ab.target
edge_bc.source
path_ab.target == edge_bc.source

Expected output:

'B'
'B'
True

Derive a new path:

path_ac = PathKnownEvent(
    graph_id="demo-graph",
    source=path_ab.source,
    target=edge_bc.target,
    hop_count=path_ab.hop_count + 1,
)

path_ac
path_ac.fact_key()

Expected output:

PathKnownEvent(graph_id='demo-graph', source='A', target='C', hop_count=2)
('demo-graph', 'A', 'C')

Manually prevent duplicate path facts:

known_paths = set()
known_paths.add(path_ab.fact_key())

path_ac.fact_key() in known_paths

if path_ac.fact_key() not in known_paths:
    known_paths.add(path_ac.fact_key())

known_paths

Expected output shape:

False
{('demo-graph', 'A', 'B'), ('demo-graph', 'A', 'C')}

Phase 13: Add the path extension processor

Goal

Implement the feedback rule:

path(A, B) + edge(B, C) -> path(A, C)
Discussion points
  • The extension processor listens to edge events and path events.
  • It emits new path events when a known path can be extended.
  • It also listens to its own output because an extended path may be extendable again.
  • This is a controlled feedback loop.
File edit

Append this class to:

demos/event_processors/graph_reachability.py

class PathExtensionProcessor:
    _edge_receiver: Receiver
    _direct_path_receiver: Receiver
    _extension_path_receiver: Receiver
    _emitter: Emitter
    _edges: set[tuple[str, str, str]]
    _paths: dict[tuple[str, str, str], PathKnownEvent]

    def __init__(self, bus: DirectMessageBus) -> None:
        self._edge_receiver = bus.subscribe(
            msg_topic="demo.graph.edge",
            msg_producer="graph-source",
            msg_type="edge-declared",
        )
        self._direct_path_receiver = bus.subscribe(
            msg_topic="demo.graph.path",
            msg_producer="direct-path",
            msg_type="path-known",
        )
        self._extension_path_receiver = bus.subscribe(
            msg_topic="demo.graph.path",
            msg_producer="path-extension",
            msg_type="path-known",
        )
        self._emitter = bus.register_emitter(
            msg_topic="demo.graph.path",
            msg_producer="path-extension",
            default_msg_type="path-known",
        )
        self._edges = set()
        self._paths = {}

    def process_edge_available(self) -> int:
        message = try_receive(self._edge_receiver)

        if message is None:
            work_count = 0
        else:
            paths = self._process_edge_payload(message.payload)
            work_count = 1 + len(paths)

        return work_count

    def process_direct_path_available(self) -> int:
        message = try_receive(self._direct_path_receiver)

        if message is None:
            work_count = 0
        else:
            paths = self._process_path_payload(message.payload)
            work_count = 1 + len(paths)

        return work_count

    def process_extension_path_available(self) -> int:
        message = try_receive(self._extension_path_receiver)

        if message is None:
            work_count = 0
        else:
            paths = self._process_path_payload(message.payload)
            work_count = 1 + len(paths)

        return work_count

    def _process_edge_payload(
        self,
        payload: object,
    ) -> list[PathKnownEvent]:
        if not isinstance(payload, EdgeDeclaredEvent):
            raise TypeError(
                f"expected EdgeDeclaredEvent, got {type(payload).__name__}"
            )

        edge = payload
        self._edges.add((edge.graph_id, edge.source, edge.target))
        paths = self._derive_from_new_edge(edge)
        return paths

    def _process_path_payload(
        self,
        payload: object,
    ) -> list[PathKnownEvent]:
        if not isinstance(payload, PathKnownEvent):
            raise TypeError(
                f"expected PathKnownEvent, got {type(payload).__name__}"
            )

        path = payload
        self._paths[path.fact_key()] = path
        paths = self._derive_from_new_path(path)
        return paths

    def _derive_from_new_edge(
        self,
        edge: EdgeDeclaredEvent,
    ) -> list[PathKnownEvent]:
        paths = []

        for known_path in self._paths.values():
            if known_path.graph_id != edge.graph_id:
                continue
            if known_path.target != edge.source:
                continue

            next_path = PathKnownEvent(
                graph_id=edge.graph_id,
                source=known_path.source,
                target=edge.target,
                hop_count=known_path.hop_count + 1,
            )

            if self._emit_if_new(next_path):
                paths.append(next_path)

        return paths

    def _derive_from_new_path(
        self,
        path: PathKnownEvent,
    ) -> list[PathKnownEvent]:
        paths = []

        for graph_id, edge_source, edge_target in self._edges:
            if graph_id != path.graph_id:
                continue
            if edge_source != path.target:
                continue

            next_path = PathKnownEvent(
                graph_id=path.graph_id,
                source=path.source,
                target=edge_target,
                hop_count=path.hop_count + 1,
            )

            if self._emit_if_new(next_path):
                paths.append(next_path)

        return paths

    def _emit_if_new(self, path: PathKnownEvent) -> bool:
        if path.fact_key() in self._paths:
            result = False
        else:
            self._paths[path.fact_key()] = path
            self._emitter.emit(path)
            result = True

        return result
Interpreter check

Restart the interpreter, then run:

from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
    AOStoreCaptureSink,
    CaptureHistory,
)
from demos.event_processors.graph_events import EdgeDeclaredEvent
from demos.event_processors.graph_reachability import (
    DirectPathProcessor,
    PathExtensionProcessor,
)

bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)

extension_receiver = bus.subscribe(
    msg_topic="demo.graph.path",
    msg_producer="path-extension",
    msg_type="path-known",
)

edge_emitter = bus.register_emitter(
    msg_topic="demo.graph.edge",
    msg_producer="graph-source",
    default_msg_type="edge-declared",
)

direct_processor = DirectPathProcessor(bus)
extension_processor = PathExtensionProcessor(bus)

Emit two edges in fixed order:

edge_emitter.emit(
    EdgeDeclaredEvent(
        graph_id="demo-graph",
        source="A",
        target="B",
    )
)

edge_emitter.emit(
    EdgeDeclaredEvent(
        graph_id="demo-graph",
        source="B",
        target="C",
    )
)

Process the direct paths:

direct_processor.process_available()
direct_processor.process_available()

Expected output:

1
1

Let the extension processor learn the edges:

extension_processor.process_edge_available()
extension_processor.process_edge_available()

Expected output:

1
1

Let the extension processor learn the direct paths:

extension_processor.process_direct_path_available()
extension_processor.process_direct_path_available()

Expected output shape:

1
2

The second call may emit the extended A -> C path after the processor has seen both direct paths and both edges.

Receive the extended path:

message = extension_receiver.receive()
message.payload

Expected output:

PathKnownEvent(graph_id='demo-graph', source='A', target='C', hop_count=2)

Inspect captured graph path history:

history = CaptureHistory(sink.all_records())
path_messages = history.by_topic("demo.graph.path")

for message in path_messages:
    path = message.payload
    print(
        message.bus_sequence,
        message.msg_producer,
        path.source,
        path.target,
        path.hop_count,
    )

Expected output shape:

... direct-path A B 1
... direct-path B C 1
... path-extension A C 2

Phase 14: Add a randomized processor scheduler

Goal

Simulate concurrency effects without changing source input order.

Discussion points
  • The source will emit the same edge events in the same order every run.
  • The scheduler varies which processor gets a chance to run first.
  • This simulates uncertainty from processing time, scheduling priority, startup timing, or transmission delay.
  • The graph example should converge to the same final path facts even when processor scheduling - and the order of intermediate derived processor events - differs.
File edit

Create:

demos/event_processors/runners.py
#!/usr/bin/env python3
# demos/event_processors/runners.py

"""Small processor runners for event processor demonstrations."""

from collections.abc import Callable
from dataclasses import dataclass, field
from random import Random


@dataclass(frozen=True, kw_only=True)
class ProcessorStep:
    name: str
    run: Callable[[], int]


@dataclass
class RandomProcessorRunner:
    steps: tuple[ProcessorStep, ...]
    seed: int
    max_rounds: int = 30
    trace: list[tuple[int, str, int]] = field(default_factory=list)

    def run_until_quiet(self) -> list[tuple[int, str, int]]:
        rng = Random(self.seed)

        for round_index in range(self.max_rounds):
            work_count = self._run_round(rng, round_index)

            if work_count == 0:
                break

        result = list(self.trace)
        return result

    def _run_round(self, rng: Random, round_index: int) -> int:
        steps = list(self.steps)
        rng.shuffle(steps)
        round_work_count = 0

        for step in steps:
            step_work_count = step.run()
            self.trace.append((round_index, step.name, step_work_count))
            round_work_count += step_work_count

        return round_work_count
Interpreter check

Restart the interpreter, then inspect how a runner can shuffle placeholder steps:

from demos.event_processors.runners import (
    ProcessorStep,
    RandomProcessorRunner,
)

calls = []


def first_step() -> int:
    calls.append("first")
    return 0


def second_step() -> int:
    calls.append("second")
    return 0


runner = RandomProcessorRunner(
    steps=(
        ProcessorStep(name="first", run=first_step),
        ProcessorStep(name="second", run=second_step),
    ),
    seed=1,
)

runner.run_until_quiet()
calls

Expected output shape:

[(0, '...', 0), (0, '...', 0)]
['...', '...']

Phase 15: Run the graph example with randomized scheduling

Goal

Run the same graph input under different processor schedules and compare the final path facts.

Interpreter setup
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
    AOStoreCaptureSink,
    CaptureHistory,
)
from demos.event_processors.graph_events import (
    EdgeDeclaredEvent,
    PathKnownEvent,
)
from demos.event_processors.graph_reachability import (
    DirectPathProcessor,
    PathExtensionProcessor,
)
from demos.event_processors.runners import (
    ProcessorStep,
    RandomProcessorRunner,
)

Define a helper function in the interpreter:

def run_graph_demo(seed: int) -> tuple[
    list[tuple[int, str, int]],
    list[tuple[int, str, str, int]],
]:
    bus = DirectMessageBus()
    sink = AOStoreCaptureSink()
    bus.set_capture_sink(sink)

    edge_emitter = bus.register_emitter(
        msg_topic="demo.graph.edge",
        msg_producer="graph-source",
        default_msg_type="edge-declared",
    )

    direct_processor = DirectPathProcessor(bus)
    extension_processor = PathExtensionProcessor(bus)

    edges = (
        EdgeDeclaredEvent(
            graph_id="demo-graph",
            source="A",
            target="B",
        ),
        EdgeDeclaredEvent(
            graph_id="demo-graph",
            source="B",
            target="C",
        ),
        EdgeDeclaredEvent(
            graph_id="demo-graph",
            source="C",
            target="D",
        ),
    )

    for edge in edges:
        edge_emitter.emit(edge)

    steps = (
        ProcessorStep(
            name="direct path from edge",
            run=direct_processor.process_available,
        ),
        ProcessorStep(
            name="extension learns edge",
            run=extension_processor.process_edge_available,
        ),
        ProcessorStep(
            name="extension learns direct path",
            run=extension_processor.process_direct_path_available,
        ),
        ProcessorStep(
            name="extension learns extension path",
            run=extension_processor.process_extension_path_available,
        ),
    )

    runner = RandomProcessorRunner(
        steps=steps,
        seed=seed,
    )
    trace = runner.run_until_quiet()

    history = CaptureHistory(sink.all_records())
    path_facts = []

    for message in history.by_topic("demo.graph.path"):
        path = message.payload
        if isinstance(path, PathKnownEvent):
            fact = (
                message.bus_sequence,
                path.source,
                path.target,
                path.hop_count,
            )
            path_facts.append(fact)

    result = (trace, path_facts)
    return result

Run one schedule:

trace_1, paths_1 = run_graph_demo(1)

trace_1
paths_1

Expected output shape:

[(0, '...', ...), (0, '...', ...), ...]
[(..., 'A', 'B', 1), (..., 'B', 'C', 1), (..., 'C', 'D', 1), ...]

Run another schedule:

trace_2, paths_2 = run_graph_demo(2)

trace_2
paths_2

Compare the schedules:

trace_1 == trace_2

Expected output:

False

Compare the final path facts without bus sequence numbers:

final_paths_1 = {
    (source, target, hop_count)
    for _, source, target, hop_count in paths_1
}

final_paths_2 = {
    (source, target, hop_count)
    for _, source, target, hop_count in paths_2
}

final_paths_1
final_paths_2
final_paths_1 == final_paths_2

Expected output:

{('A', 'B', 1), ('B', 'C', 1), ('C', 'D', 1), ('A', 'C', 2), ('B', 'D', 2), ('A', 'D', 3)}
{('A', 'B', 1), ('B', 'C', 1), ('C', 'D', 1), ('A', 'C', 2), ('B', 'D', 2), ('A', 'D', 3)}
True

Inspect the first schedule:

for round_index, step_name, work_count in trace_1:
    print(round_index, step_name, work_count)

Inspect the first captured path order:

for bus_sequence, source, target, hop_count in paths_1:
    print(bus_sequence, source, target, hop_count)

Inspect the second schedule and path order:

for round_index, step_name, work_count in trace_2:
    print(round_index, step_name, work_count)

for bus_sequence, source, target, hop_count in paths_2:
    print(bus_sequence, source, target, hop_count)
What this phase shows

The source input order did not change. The processor scheduling changed. The captured event order may differ. The final reachability facts should match.

This models an important distributed-systems lesson: event history order and final derived state are related, but they are not the same thing.

Phase 16: Later refinement: graph index from history

Goal

Introduce restart/recovery thinking only after the basic graph feedback example is familiar.

Discussion points
  • The first graph processors keep local working state.
  • Local working state is enough for a short demonstration.
  • Longer-lived processors need recovery behavior after restart.
  • Since edge and path facts are captured as events, a graph index can later be rebuilt from history.
  • This points toward projection-style services without changing the basic processor model.
Suggested later file

Create later:

demos/event_processors/graph_index.py

Suggested responsibilities:

read captured history
collect EdgeDeclaredEvent payloads
collect PathKnownEvent payloads
answer whether a path fact exists
find edges starting at a node
find paths ending at a node

This refinement belongs after the team has already run the graph reachability demo successfully.

Phase 17: Later refinement: shell command reconstruction

Goal

Connect the TTY examples back to the shell command event shape used by milestones.

Discussion points
  • ShellCommandEvent is the high-level event consumed by the milestone processor.
  • TTYReadEvent, TTYWriteEvent, and LineDisciplineEvent are lower-level observations.
  • A reconstructor can subscribe to line-discipline events and query recent related observations.
  • The reconstructor should emit shell command events; it should not perform regex analysis itself.
Suggested responsibilities
listen for completed line-discipline events
query history since the previous command boundary for the same session
collect matching reads, writes, and timing events
construct ShellCommandEvent
emit ShellCommandEvent on demo.shell.command

This is an appropriate follow-up exercise after the team understands both milestone processing and history queries.

Related

Top-level topic page

Message Bus Service