Message Bus Live Coding Exercise: Difference between revisions
Jwgranville (talk | contribs) |
Jwgranville (talk | contribs) |
||
| (5 intermediate revisions by the same user not shown) | |||
| Line 74: | Line 74: | ||
Estimated time: 1-3 hours (1-2 hours work, 0.5-1 hours setup) | Estimated time: 1-3 hours (1-2 hours work, 0.5-1 hours setup) | ||
; Session 2 - Shell | ; Session 2 - Shell commands and milestones | ||
* '''Phase 3:''' | * '''Phase 3:''' | ||
| Line 81: | Line 81: | ||
** Replace it with <code>ShellCommandEvent</code>. | ** Replace it with <code>ShellCommandEvent</code>. | ||
* '''Phase 5:''' | * '''Phase 5:''' | ||
** Manually evaluate | ** Manually evaluate milestone expressions. | ||
* '''Phase 6:''' | * '''Phase 6:''' | ||
** Implement <code>RegexMilestoneProcessor</code>. | ** Implement <code>RegexMilestoneProcessor</code>. | ||
| Line 92: | Line 92: | ||
* '''Phase 7:''' | * '''Phase 7:''' | ||
** Create raw TTY read, write, and line events. | ** Create raw TTY read, write, and line events. | ||
** Show why | ** Show why milestones need reconstructed shell command events. | ||
* '''Phase 8:''' | * '''Phase 8:''' | ||
** Implement keystroke timing. | ** Implement keystroke timing. | ||
| Line 596: | Line 596: | ||
; Goal | ; Goal | ||
Identify the high-level event shape needed by a | Identify the high-level event shape needed by a milestone processor. | ||
; Discussion points | ; Discussion points | ||
| Line 622: | Line 622: | ||
</pre> | </pre> | ||
Inspect the fields used by | Inspect the fields used by milestone checks: | ||
<pre lang="python"> | <pre lang="python"> | ||
| Line 655: | Line 655: | ||
The dictionary contains the information, but it does not document the shape of the event. A typo is discovered only when the field is used. A named event class makes the processor code easier to read and easier to explain. | The dictionary contains the information, but it does not document the shape of the event. A typo is discovered only when the field is used. A named event class makes the processor code easier to read and easier to explain. | ||
Positional encoding like tuples and CSV-like records also suffer from this innate limitation. They are inherently detached from the structural contract of the record type; they describe only whatever data they carry, not any particular schema or type contract to which the record itself belongs. | |||
== Phase 4: Persist the event record classes == | == Phase 4: Persist the event record classes == | ||
| Line 660: | Line 662: | ||
; Goal | ; Goal | ||
Create reusable payload classes for the TTY, shell command, and | Create reusable payload classes for the TTY, shell command, and milestone examples. | ||
; File edit | ; File edit | ||
| Line 797: | Line 799: | ||
The event shape is now explicit. Other code can depend on a named <code>ShellCommandEvent</code> instead of assuming that a loose dictionary has the expected fields. | The event shape is now explicit. Other code can depend on a named <code>ShellCommandEvent</code> instead of assuming that a loose dictionary has the expected fields. | ||
== Phase 5: Evaluate | == Phase 5: Evaluate milestones manually == | ||
; Goal | ; Goal | ||
| Line 805: | Line 807: | ||
; Discussion points | ; Discussion points | ||
* A | * A milestone expression names one check against one shell command field. | ||
* The field may be command input, command output, working directory, or host context. | * The field may be command input, command output, working directory, or host context. | ||
* The result records whether the expression matched and, when available, the matched text. | * The result records whether the expression matched and, when available, the matched text. | ||
| Line 1,163: | Line 1,165: | ||
* TTY reads, TTY writes, and line-discipline lines are lower-level observations. | * TTY reads, TTY writes, and line-discipline lines are lower-level observations. | ||
* A shell command event is reconstructed by correlating those observations. | * A shell command event is reconstructed by correlating those observations. | ||
* Keeping reconstruction separate prevents the | * Keeping reconstruction separate prevents the milestone processor from becoming a large TTY parser. | ||
; Interpreter exploration | ; Interpreter exploration | ||
| Line 1,431: | Line 1,433: | ||
; Goal | ; Goal | ||
Switch to a small graph problem that can demonstrate feedback loops | Switch to a small graph problem that can demonstrate safe feedback loops in an illustratable form. | ||
; Discussion points | ; Discussion points | ||
| Line 2,212: | Line 2,214: | ||
* The scheduler varies which processor gets a chance to run first. | * The scheduler varies which processor gets a chance to run first. | ||
* This simulates uncertainty from processing time, scheduling priority, startup timing, or transmission delay. | * This simulates uncertainty from processing time, scheduling priority, startup timing, or transmission delay. | ||
* The graph example should converge to the same final path facts even when processor scheduling | * The graph example should converge to the same final path facts even when processor scheduling - and the order of intermediate derived processor events - differs. | ||
; File edit | ; File edit | ||
| Line 2,557: | Line 2,559: | ||
; Goal | ; Goal | ||
Connect the TTY examples back to the shell command event shape used by | Connect the TTY examples back to the shell command event shape used by milestones. | ||
; Discussion points | ; Discussion points | ||
* <code>ShellCommandEvent</code> is the high-level event consumed by the | * <code>ShellCommandEvent</code> is the high-level event consumed by the milestone processor. | ||
* <code>TTYReadEvent</code>, <code>TTYWriteEvent</code>, and <code>LineDisciplineEvent</code> are lower-level observations. | * <code>TTYReadEvent</code>, <code>TTYWriteEvent</code>, and <code>LineDisciplineEvent</code> are lower-level observations. | ||
* A reconstructor can subscribe to line-discipline events and query recent related observations. | * A reconstructor can subscribe to line-discipline events and query recent related observations. | ||
| Line 2,576: | Line 2,578: | ||
</pre> | </pre> | ||
This is an appropriate follow-up exercise after the team understands both | This is an appropriate follow-up exercise after the team understands both milestone processing and history queries. | ||
= Related = | = Related = | ||
Latest revision as of 16:50, 16 June 2026
Introduction
Audience and scope
This exercise is intended for EDURange developers learning how to write event processors with the message bus.
The packaged mbus and aostore modules are treated as read-only for this exercise. All new code belongs under a demonstration directory such as:
workspace/
mbus/
aostore/
demos/
__init__.py
event_processors/
__init__.py
Pre-release copies of mbus and aostore are available on the Discord fileshare channel. Modules may be released under other package names; see also the EDURange GitHub.
The exercise uses two problem families.
- 1. Shell and TTY event processing
- Send and receive messages through the bus.
- Capture and inspect message history.
- Represent high-level shell command events.
- Recreate the regex-based milestone role in a simplified form.
- Mock up TTY-derived observations such as keystroke timing.
- 2. Graph event processing
- Represent graph edge relationships as source events.
- Derive direct path facts from edge relation events.
- Add feedback by deriving longer paths from known paths.
- Simulate nondeterministic processor scheduling while keeping source input fixed.
The shell examples draw from EDURange's institutional knowledge. The graph examples provide a common model problem family for meaningful use of feedback loops.
Running the examples
Adjust paths to match the local checkout layout if necessary. Start an interpreter from the workspace root with project packages available.
PYTHONPATH="$PWD/mbus:$PWD/aostore:$PWD" python3.14
When module files are changed, restart the interpreter unless the group is deliberately practicing importlib.reload().
Create the demonstration package
Before the first durable file edit, create the demonstration package directories:
mkdir -p demos/event_processors
touch demos/__init__.py
touch demos/event_processors/__init__.py
Important note about capture records
The current demonstration works close to the low-level capture layer. As a result, it exposes MessageSymbolRegistration records while building the temporary history helper.
That is not the intended ordinary user experience. In a more complete release, higher-level capture and history infrastructure should handle symbol registration bookkeeping on behalf of processor authors. In this exercise, symbol registration records are visible because the demonstration history helper reconstructs readable messages from compact captured records directly.
Suggested group session flow
- Session 1 - Bus and capture basics
- Phase 1:
- Signal over the bus by hand.
- Inspect received messages.
- Inspect low-level captured records.
- Phase 2:
- Discuss why symbol registration appears in this temporary demo.
- Build the capture history helper.
Estimated time: 1-3 hours (1-2 hours work, 0.5-1 hours setup)
- Session 2 - Shell commands and milestones
- Phase 3:
- Represent a shell command as a dictionary.
- Phase 4:
- Replace it with
ShellCommandEvent.
- Replace it with
- Phase 5:
- Manually evaluate milestone expressions.
- Phase 6:
- Implement
RegexMilestoneProcessor. - Inspect source and analysis events in history.
- Implement
Estimated time: 1.5-3 hours (1.5-2.5 hours work, 0-0.5 hours setup)
- Session 3 - TTY-derived observations
- Phase 7:
- Create raw TTY read, write, and line events.
- Show why milestones need reconstructed shell command events.
- Phase 8:
- Implement keystroke timing.
- Discuss shell command reconstruction as the next derived-event problem (to be continued in Session 6 onward).
Estimated time: 1-2.5 hours (1-2 hours work, 0-0.5 hours setup)
- Session 4 - Graph-derived facts
- Phase 9:
- Define graph edges and paths.
- Phase 10:
- Add the demo-only nonblocking receive helper.
- Phase 11:
- Implement direct path derivation.
- Phase 12:
- Explore the extension rule by hand.
- Phase 13:
- Add path extension feedback.
- Discuss duplicate prevention.
Estimated time: 1.5-3.5 hours (1.5-3 hours work, 0-0.5 hours setup)
- Session 5 - Nondeterministic processor scheduling
- Phase 14:
- Add the randomized processor runner.
- Phase 15:
- Keep source events fixed.
- Vary processor scheduling by seed.
- Compare final path facts across runs.
Estimated time: 1.5-3.5 hours (1.5-3 hours work, 0-0.5 hours setup)
- Session 6 - Follow-up refinements
- Phase 16:
- Rebuild a graph index from history.
- Phase 17:
- Implement shell command reconstruction.
- Replace local Python payload assumptions with explicit payload projection formats when the bus format layer is ready for the exercise.
Estimated time: 1.5-4.5 hours (1.5-4 hours work, 0-0.5 hours setup)
Event Processor Demonstration Plan
Phase 1: Signal over the bus by hand
- Goal
Send one message through the direct bus and inspect what the receiver sees.
- Discussion points
- A bus is useful only when something emits and something receives.
- Public message setup uses readable strings: topic, producer, and message type.
- The direct broker delivers local Python payloads in this demonstration.
- Capture must be attached before delivery in the current prototype.
- Interpreter exploration
Import the bus and capture interfaces:
from dataclasses import dataclass, field
from mbus.broker.direct import DirectMessageBus
from mbus.capture.sink import CaptureSink
from mbus.message.records import CapturedMessage
from mbus.message.symbols import MessageSymbolRegistration
Create a temporary capture sink in the interpreter. This sink stores whatever the bus sends to the capture layer:
CapturedRecord = MessageSymbolRegistration | CapturedMessage
@dataclass
class ListCaptureSink(CaptureSink):
records: list[CapturedRecord] = field(default_factory=list)
def capture_symbol_registration(
self,
registration: MessageSymbolRegistration,
) -> None:
self.records.append(registration)
def capture(self, message: CapturedMessage) -> None:
self.records.append(message)
Create the bus and attach capture:
bus = DirectMessageBus()
sink = ListCaptureSink()
bus.set_capture_sink(sink)
Inspect the empty capture sink:
sink
sink.records
len(sink.records)
Expected output shape:
ListCaptureSink(records=[])
[]
0
Create a receiver. This describes the messages the receiver wants to hear:
receiver = bus.subscribe(
msg_topic="demo.signals",
msg_producer="demo-source",
msg_type="ping",
)
Inspect the receiver and confirm that capture is still empty:
type(receiver)
vars(receiver)
sink.records
Expected output shape:
<class 'mbus.broker.direct._BrokerReceiver'>
{'_queue': ...}
[]
Create an emitter. Registering the emitter gives the bus enough information to bind readable names to compact internal IDs:
emitter = bus.register_emitter(
msg_topic="demo.signals",
msg_producer="demo-source",
default_msg_type="ping",
)
Inspect the captured registration records:
sink.records
[type(record).__name__ for record in sink.records]
for record in sink.records:
print(record)
Expected output shape:
['MessageSymbolRegistration', 'MessageSymbolRegistration', 'MessageSymbolRegistration']
MessageSymbolRegistration(... symbol='demo.signals' ...)
MessageSymbolRegistration(... symbol='ping' ...)
MessageSymbolRegistration(... symbol='demo-source' ...)
Emit one message:
emitter.emit({"text": "hello bus"})
Inspect capture before receiving:
len(sink.records)
[type(record).__name__ for record in sink.records]
sink.records[-1]
Expected output shape:
4
['MessageSymbolRegistration', 'MessageSymbolRegistration', 'MessageSymbolRegistration', 'CapturedMessage']
CapturedMessage(... payload={'text': 'hello bus'} ...)
Receive the message:
message = receiver.receive()
message
message.msg_topic
message.msg_producer
message.msg_type
message.payload
Expected output shape:
ReceivedMessage(...)
'demo.signals'
'demo-source'
'ping'
{'text': 'hello bus'}
Compare the received message with the captured record:
captured = sink.records[-1]
captured
captured.msg_topic_id
captured.msg_type_id
captured.msg_producer_id
captured.payload
Expected output shape:
CapturedMessage(...)
TopicID(...)
MessageTypeID(...)
ProducerID(...)
{'text': 'hello bus'}
- What this phase shows
The receiver gets the readable message view. The capture sink sees compact IDs and local payload data. The readable names are still available through symbol registration records, but manually reconstructing them is awkward.
That awkwardness motivates the next phase.
Phase 2: Build a temporary capture history helper
- Goal
Create a small demonstration helper that turns captured records back into readable history messages.
- Discussion points
- Capture records are useful, but raw captured messages use compact IDs.
- A history helper can rebuild readable message history from registration records.
- This is demonstration infrastructure, not the final persistence/query API.
- The helper writes captured records into an in-memory
aostoresequential unit.
- Interpreter exploration before the file edit
List only the captured messages:
captured_messages = []
for record in sink.records:
if isinstance(record, CapturedMessage):
captured_messages.append(record)
captured_messages
Expected output shape:
[CapturedMessage(... payload={'text': 'hello bus'} ...)]
Inspect the first captured message:
captured_message = captured_messages[0]
captured_message.msg_topic_id
captured_message.msg_type_id
captured_message.msg_producer_id
Expected output shape:
TopicID(...)
MessageTypeID(...)
ProducerID(...)
Manually reconstruct a topic name:
topic_names = {}
for record in sink.records:
if isinstance(record, MessageSymbolRegistration):
if record.symbol_kind.name == "TOPIC":
topic_names[record.symbol_id] = record.symbol
topic_names
topic_names[captured_message.msg_topic_id]
Expected output shape:
{TopicID(...): 'demo.signals'}
'demo.signals'
This works, but repeating it by hand would distract from writing event processors.
- File edit
Create:
demos/event_processors/capture_history.py
#!/usr/bin/env python3
# demos/event_processors/capture_history.py
"""Capture and history helpers for event processor demonstrations."""
from collections.abc import Iterable
from dataclasses import dataclass, field
from aostore.sequentialunit.listsequentialunit import ListSequentialUnit
from mbus.capture.sink import CaptureSink
from mbus.message.records import CapturedMessage
from mbus.message.symbols import (
MessageSymbolKind,
MessageSymbolRegistration,
MessageTypeID,
ProducerID,
TopicID,
)
CapturedRecord = MessageSymbolRegistration | CapturedMessage
@dataclass
class AOStoreCaptureSink(CaptureSink):
records: ListSequentialUnit[CapturedRecord] = field(
default_factory=ListSequentialUnit
)
def capture_symbol_registration(
self,
registration: MessageSymbolRegistration,
) -> None:
self.records.append(registration)
def capture(self, message: CapturedMessage) -> None:
self.records.append(message)
def all_records(self) -> tuple[CapturedRecord, ...]:
records = self.records.sequential_read(0, len(self.records))
result = tuple(records)
return result
@dataclass(frozen=True, kw_only=True)
class HistoryMessage:
msg_topic: str
msg_type: str
msg_producer: str
payload: object
bus_sequence: int
topic_sequence: int
class CaptureHistory:
_records: tuple[CapturedRecord, ...]
_topics: dict[TopicID, str]
_types: dict[MessageTypeID, str]
_producers: dict[ProducerID, str]
def __init__(self, records: Iterable[CapturedRecord]) -> None:
self._records = tuple(records)
self._topics = {}
self._types = {}
self._producers = {}
self._index_symbols()
def _index_symbols(self) -> None:
for record in self._records:
if isinstance(record, MessageSymbolRegistration):
self._record_symbol(record)
def _record_symbol(self, record: MessageSymbolRegistration) -> None:
if record.symbol_kind is MessageSymbolKind.TOPIC:
self._topics[record.symbol_id] = record.symbol
elif record.symbol_kind is MessageSymbolKind.MSG_TYPE:
self._types[record.symbol_id] = record.symbol
elif record.symbol_kind is MessageSymbolKind.PRODUCER:
self._producers[record.symbol_id] = record.symbol
def messages(self) -> list[HistoryMessage]:
messages = []
for record in self._records:
if isinstance(record, CapturedMessage):
message = self._decode_message(record)
messages.append(message)
return messages
def by_topic(self, msg_topic: str) -> list[HistoryMessage]:
messages = []
for message in self.messages():
if message.msg_topic == msg_topic:
messages.append(message)
return messages
def since(self, bus_sequence: int) -> list[HistoryMessage]:
messages = []
for message in self.messages():
if message.bus_sequence >= bus_sequence:
messages.append(message)
return messages
def _decode_message(self, message: CapturedMessage) -> HistoryMessage:
history_message = HistoryMessage(
msg_topic=self._topics[message.msg_topic_id],
msg_type=self._types[message.msg_type_id],
msg_producer=self._producers[message.msg_producer_id],
payload=message.payload,
bus_sequence=message.bus_sequence,
topic_sequence=message.topic_sequence,
)
return history_message
- Interpreter check
Restart the interpreter, then run:
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
AOStoreCaptureSink,
CaptureHistory,
)
bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)
receiver = bus.subscribe(
msg_topic="demo.signals",
msg_producer="demo-source",
msg_type="ping",
)
emitter = bus.register_emitter(
msg_topic="demo.signals",
msg_producer="demo-source",
default_msg_type="ping",
)
emitter.emit({"text": "history helper"})
message = receiver.receive()
history = CaptureHistory(sink.all_records())
history.messages()
Expected output shape:
[HistoryMessage(msg_topic='demo.signals', msg_type='ping', msg_producer='demo-source', payload={'text': 'history helper'}, ...)]
Inspect the decoded history:
history_message = history.messages()[0]
history_message.msg_topic
history_message.msg_type
history_message.msg_producer
history_message.payload
Expected output:
'demo.signals'
'ping'
'demo-source'
{'text': 'history helper'}
Phase 3: Explore shell command data by hand
- Goal
Identify the high-level event shape needed by a milestone processor.
- Discussion points
- The old milestone strategy operated on command-level records.
- A command-level record includes context such as host and working directory.
- It also includes shell input and shell output.
- Raw TTY events do not directly have this shape; reconstruction comes later.
- Interpreter exploration
Represent a shell command as a dictionary:
command = {
"session_id": "session-1",
"command_index": 0,
"host": "alpha",
"cwd": "/home/student",
"input_text": "ls -la\n",
"output_text": "total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
"started_at_ns": 1000,
"ended_at_ns": 2000,
}
Inspect the fields used by milestone checks:
command["host"]
command["cwd"]
command["input_text"]
command["output_text"]
Expected output:
'alpha'
'/home/student'
'ls -la\n'
'total 12\n-rw-r--r-- 1 student student 0 notes.txt\n'
Try an accidental wrong key:
command["working_directory"]
Expected output:
KeyError: 'working_directory'
- What this phase shows
The dictionary contains the information, but it does not document the shape of the event. A typo is discovered only when the field is used. A named event class makes the processor code easier to read and easier to explain.
Positional encoding like tuples and CSV-like records also suffer from this innate limitation. They are inherently detached from the structural contract of the record type; they describe only whatever data they carry, not any particular schema or type contract to which the record itself belongs.
Phase 4: Persist the event record classes
- Goal
Create reusable payload classes for the TTY, shell command, and milestone examples.
- File edit
Create:
demos/event_processors/events.py
#!/usr/bin/env python3
# demos/event_processors/events.py
"""Payload records for event processor demonstrations."""
from dataclasses import dataclass
from typing import Literal
@dataclass(frozen=True, kw_only=True)
class TTYReadEvent:
session_id: str
source_index: int
timestamp_ns: int
data: bytes
@dataclass(frozen=True, kw_only=True)
class TTYWriteEvent:
session_id: str
source_index: int
timestamp_ns: int
data: bytes
@dataclass(frozen=True, kw_only=True)
class LineDisciplineEvent:
session_id: str
source_index: int
timestamp_ns: int
line: str
@dataclass(frozen=True, kw_only=True)
class KeystrokeTimingEvent:
session_id: str
source_index: int
timestamp_ns: int
previous_timestamp_ns: int | None
delta_ns: int | None
text: str
@dataclass(frozen=True, kw_only=True)
class ShellCommandEvent:
session_id: str
command_index: int
host: str
cwd: str
input_text: str
output_text: str
started_at_ns: int
ended_at_ns: int
RegexMilestoneField = Literal[
"host",
"cwd",
"input_text",
"output_text",
]
@dataclass(frozen=True, kw_only=True)
class RegexMilestoneExpression:
name: str
field: RegexMilestoneField
pattern: str
@dataclass(frozen=True, kw_only=True)
class RegexMilestoneResult:
expression_name: str
field: RegexMilestoneField
matched: bool
match_text: str | None = None
@dataclass(frozen=True, kw_only=True)
class RegexMilestoneAnalysisEvent:
session_id: str
command_index: int
results: tuple[RegexMilestoneResult, ...]
matched_count: int
expression_count: int
- Interpreter check
Restart the interpreter, then run:
from demos.event_processors.events import ShellCommandEvent
command = ShellCommandEvent(
session_id="session-1",
command_index=0,
host="alpha",
cwd="/home/student",
input_text="ls -la\n",
output_text="total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
started_at_ns=1000,
ended_at_ns=2000,
)
command
command.host
command.cwd
command.input_text
command.output_text
Expected output shape:
ShellCommandEvent(session_id='session-1', command_index=0, ...)
'alpha'
'/home/student'
'ls -la\n'
'total 12\n-rw-r--r-- 1 student student 0 notes.txt\n'
- What this phase shows
The event shape is now explicit. Other code can depend on a named ShellCommandEvent instead of assuming that a loose dictionary has the expected fields.
Phase 5: Evaluate milestones manually
- Goal
Recreate the core milestone check in a small, transparent form.
- Discussion points
- A milestone expression names one check against one shell command field.
- The field may be command input, command output, working directory, or host context.
- The result records whether the expression matched and, when available, the matched text.
- Manual evaluation is useful for learning the shape of the problem before writing a processor.
- Interpreter exploration
Import regular expressions and the payload classes:
import re
from demos.event_processors.events import (
RegexMilestoneExpression,
ShellCommandEvent,
)
Create one command event:
command = ShellCommandEvent(
session_id="session-1",
command_index=0,
host="alpha",
cwd="/home/student",
input_text="ls -la\n",
output_text="total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
started_at_ns=1000,
ended_at_ns=2000,
)
Create three milestone expressions:
expressions = (
RegexMilestoneExpression(
name="used-ls",
field="input_text",
pattern=r"\bls\b",
),
RegexMilestoneExpression(
name="saw-notes",
field="output_text",
pattern=r"notes\.txt",
),
RegexMilestoneExpression(
name="home-directory",
field="cwd",
pattern=r"^/home/student$",
),
)
Evaluate one expression:
expression = expressions[0]
text = getattr(command, expression.field)
match = re.search(expression.pattern, text)
expression.name
expression.field
text
match.group(0)
Expected output:
'used-ls'
'input_text'
'ls -la\n'
'ls'
Evaluate all expressions:
manual_results = []
for expression in expressions:
text = getattr(command, expression.field)
match = re.search(expression.pattern, text)
manual_results.append((expression.name, match is not None))
manual_results
Expected output:
[('used-ls', True), ('saw-notes', True), ('home-directory', True)]
- What this phase shows
Manual evaluation is simple for one command, but the loop is already event processor logic. The next phase preserves the loop in a reusable processor.
Phase 6: Persist the RegexMilestone processor
- Goal
Create a processor that receives shell command events and emits regex milestone analysis events.
- File edit
Create:
demos/event_processors/regex_milestones.py
#!/usr/bin/env python3
# demos/event_processors/regex_milestones.py
"""Regex milestone processor for shell command events."""
import re
from mbus.broker.direct import DirectMessageBus
from mbus.broker.endpoints import Emitter, Receiver
from demos.event_processors.events import (
RegexMilestoneAnalysisEvent,
RegexMilestoneExpression,
RegexMilestoneResult,
ShellCommandEvent,
)
class RegexMilestoneProcessor:
_receiver: Receiver
_emitter: Emitter
_expressions: tuple[RegexMilestoneExpression, ...]
def __init__(
self,
bus: DirectMessageBus,
expressions: tuple[RegexMilestoneExpression, ...],
) -> None:
self._receiver = bus.subscribe(
msg_topic="demo.shell.command",
msg_producer="shell-reconstructor",
msg_type="shell-command",
)
self._emitter = bus.register_emitter(
msg_topic="demo.shell.regex_milestone",
msg_producer="regex-milestone-processor",
default_msg_type="regex-milestone-analysis",
)
self._expressions = expressions
def process_one(self) -> RegexMilestoneAnalysisEvent:
message = self._receiver.receive()
command = message.payload
if not isinstance(command, ShellCommandEvent):
raise TypeError(
f"expected ShellCommandEvent, got {type(command).__name__}"
)
analysis = analyze_regex_milestones(command, self._expressions)
self._emitter.emit(analysis)
return analysis
def analyze_regex_milestones(
command: ShellCommandEvent,
expressions: tuple[RegexMilestoneExpression, ...],
) -> RegexMilestoneAnalysisEvent:
results = []
for expression in expressions:
result = evaluate_regex_milestone(command, expression)
results.append(result)
matched_count = sum(result.matched for result in results)
analysis = RegexMilestoneAnalysisEvent(
session_id=command.session_id,
command_index=command.command_index,
results=tuple(results),
matched_count=matched_count,
expression_count=len(expressions),
)
return analysis
def evaluate_regex_milestone(
command: ShellCommandEvent,
expression: RegexMilestoneExpression,
) -> RegexMilestoneResult:
text = getattr(command, expression.field)
match = re.search(expression.pattern, text)
match_text = None
if match is not None:
match_text = match.group(0)
result = RegexMilestoneResult(
expression_name=expression.name,
field=expression.field,
matched=match is not None,
match_text=match_text,
)
return result
- Interpreter check
Restart the interpreter, then set up the bus:
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
AOStoreCaptureSink,
CaptureHistory,
)
from demos.event_processors.events import (
RegexMilestoneExpression,
ShellCommandEvent,
)
from demos.event_processors.regex_milestones import RegexMilestoneProcessor
bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)
Subscribe to the analysis topic before running the processor:
analysis_receiver = bus.subscribe(
msg_topic="demo.shell.regex_milestone",
msg_producer="regex-milestone-processor",
msg_type="regex-milestone-analysis",
)
Create the shell command emitter and processor:
command_emitter = bus.register_emitter(
msg_topic="demo.shell.command",
msg_producer="shell-reconstructor",
default_msg_type="shell-command",
)
expressions = (
RegexMilestoneExpression(
name="used-ls",
field="input_text",
pattern=r"\bls\b",
),
RegexMilestoneExpression(
name="saw-notes",
field="output_text",
pattern=r"notes\.txt",
),
RegexMilestoneExpression(
name="home-directory",
field="cwd",
pattern=r"^/home/student$",
),
)
processor = RegexMilestoneProcessor(bus, expressions)
Emit a shell command event:
command = ShellCommandEvent(
session_id="session-1",
command_index=0,
host="alpha",
cwd="/home/student",
input_text="ls -la\n",
output_text="total 12\n-rw-r--r-- 1 student student 0 notes.txt\n",
started_at_ns=1000,
ended_at_ns=2000,
)
command_emitter.emit(command)
Process the command:
analysis = processor.process_one()
analysis
analysis.results
analysis.matched_count
analysis.expression_count
Expected output shape:
RegexMilestoneAnalysisEvent(... matched_count=3, expression_count=3)
(RegexMilestoneResult(...), RegexMilestoneResult(...), RegexMilestoneResult(...))
3
3
Receive the derived analysis event:
analysis_message = analysis_receiver.receive()
analysis_message.msg_topic
analysis_message.msg_type
analysis_message.msg_producer
analysis_message.payload
Expected output shape:
'demo.shell.regex_milestone'
'regex-milestone-analysis'
'regex-milestone-processor'
RegexMilestoneAnalysisEvent(... matched_count=3, expression_count=3)
Inspect history:
history = CaptureHistory(sink.all_records())
messages = history.messages()
for message in messages:
print(message.bus_sequence, message.msg_topic, message.msg_type)
Expected output shape:
0 demo.shell.command shell-command
1 demo.shell.regex_milestone regex-milestone-analysis
- What this phase shows
The regex milestone processor consumes one event and emits another. The derived analysis event is available to any later subscriber. History contains both the source command event and the derived analysis event.
Phase 7: Explore why shell reconstruction is separate
- Goal
Show that raw TTY observations are not the same thing as high-level shell command events.
- Discussion points
- Regex milestones should inspect high-level command events.
- TTY reads, TTY writes, and line-discipline lines are lower-level observations.
- A shell command event is reconstructed by correlating those observations.
- Keeping reconstruction separate prevents the milestone processor from becoming a large TTY parser.
- Interpreter exploration
Create a few low-level observations:
from demos.event_processors.events import (
LineDisciplineEvent,
TTYReadEvent,
TTYWriteEvent,
)
read_1 = TTYReadEvent(
session_id="session-1",
source_index=0,
timestamp_ns=1000,
data=b"l",
)
read_2 = TTYReadEvent(
session_id="session-1",
source_index=1,
timestamp_ns=1100,
data=b"s",
)
line = LineDisciplineEvent(
session_id="session-1",
source_index=2,
timestamp_ns=1200,
line="ls\n",
)
write = TTYWriteEvent(
session_id="session-1",
source_index=3,
timestamp_ns=1600,
data=b"notes.txt\n",
)
Inspect the raw pieces:
read_1.data
read_2.data
line.line
write.data
Expected output:
b'l'
b's'
'ls\n'
b'notes.txt\n'
Assemble command fields manually:
input_text = line.line
output_text = write.data.decode("utf-8", errors="replace")
input_text
output_text
Expected output:
'ls\n'
'notes.txt\n'
- What this phase shows
The useful command event is derived from several lower-level observations. A later shell reconstructor can subscribe to line-discipline events, query recent reads/writes/timing observations, and emit ShellCommandEvent records.
Phase 8: Add keystroke timing as a smaller TTY-derived processor
- Goal
Create a simple stateful processor before attempting full shell reconstruction.
- Discussion points
- Timing requires comparing a TTY read with the previous read in the same session.
- A derived timing event makes the result explicit.
- Other processors can consume timing events instead of recalculating deltas.
- Interpreter exploration before the file edit
Calculate timing manually from the two read events:
read_events = [read_1, read_2]
previous_timestamp = None
timing_rows = []
for event in read_events:
delta = None
if previous_timestamp is not None:
delta = event.timestamp_ns - previous_timestamp
timing_rows.append((event.source_index, previous_timestamp, delta))
previous_timestamp = event.timestamp_ns
timing_rows
Expected output:
[(0, None, None), (1, 1000, 100)]
- File edit
Create:
demos/event_processors/keystroke_timing.py
#!/usr/bin/env python3
# demos/event_processors/keystroke_timing.py
"""Keystroke timing processor for TTY read events."""
from mbus.broker.direct import DirectMessageBus
from mbus.broker.endpoints import Emitter, Receiver
from demos.event_processors.events import (
KeystrokeTimingEvent,
TTYReadEvent,
)
class KeystrokeTimingProcessor:
_receiver: Receiver
_emitter: Emitter
_last_timestamp_by_session: dict[str, int]
def __init__(self, bus: DirectMessageBus) -> None:
self._receiver = bus.subscribe(
msg_topic="demo.tty.read",
msg_producer="tty-source",
msg_type="tty-read",
)
self._emitter = bus.register_emitter(
msg_topic="demo.keystroke.timing",
msg_producer="keystroke-timing",
default_msg_type="keystroke-timing",
)
self._last_timestamp_by_session = {}
def process_one(self) -> KeystrokeTimingEvent:
message = self._receiver.receive()
event = message.payload
if not isinstance(event, TTYReadEvent):
raise TypeError(f"expected TTYReadEvent, got {type(event).__name__}")
previous_timestamp = self._last_timestamp_by_session.get(
event.session_id
)
delta = None
if previous_timestamp is not None:
delta = event.timestamp_ns - previous_timestamp
self._last_timestamp_by_session[event.session_id] = event.timestamp_ns
text = event.data.decode("utf-8", errors="replace")
timing = KeystrokeTimingEvent(
session_id=event.session_id,
source_index=event.source_index,
timestamp_ns=event.timestamp_ns,
previous_timestamp_ns=previous_timestamp,
delta_ns=delta,
text=text,
)
self._emitter.emit(timing)
return timing
- Interpreter check
Restart the interpreter, then run:
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import AOStoreCaptureSink
from demos.event_processors.events import TTYReadEvent
from demos.event_processors.keystroke_timing import KeystrokeTimingProcessor
bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)
timing_receiver = bus.subscribe(
msg_topic="demo.keystroke.timing",
msg_producer="keystroke-timing",
msg_type="keystroke-timing",
)
read_emitter = bus.register_emitter(
msg_topic="demo.tty.read",
msg_producer="tty-source",
default_msg_type="tty-read",
)
processor = KeystrokeTimingProcessor(bus)
Emit reads and process them:
read_emitter.emit(
TTYReadEvent(
session_id="session-1",
source_index=0,
timestamp_ns=1000,
data=b"l",
)
)
read_emitter.emit(
TTYReadEvent(
session_id="session-1",
source_index=1,
timestamp_ns=1100,
data=b"s",
)
)
processor.process_one()
processor.process_one()
Expected output shape:
KeystrokeTimingEvent(... previous_timestamp_ns=None, delta_ns=None, text='l')
KeystrokeTimingEvent(... previous_timestamp_ns=1000, delta_ns=100, text='s')
Receive timing events:
timing_receiver.receive().payload
timing_receiver.receive().payload
Expected output shape:
KeystrokeTimingEvent(... source_index=0 ...)
KeystrokeTimingEvent(... source_index=1 ...)
Phase 9: Introduce graph events
- Goal
Switch to a small graph problem that can demonstrate safe feedback loops in an illustratable form.
- Discussion points
- An edge is a source fact.
- A path is a derived fact.
- A direct edge implies a direct path.
- Longer paths can be derived later from known paths and edges.
- File edit
Create:
demos/event_processors/graph_events.py
#!/usr/bin/env python3
# demos/event_processors/graph_events.py
"""Graph event payloads for message bus demonstrations."""
from dataclasses import dataclass
@dataclass(frozen=True, kw_only=True)
class EdgeDeclaredEvent:
graph_id: str
source: str
target: str
@dataclass(frozen=True, kw_only=True)
class PathKnownEvent:
graph_id: str
source: str
target: str
hop_count: int
def fact_key(self) -> tuple[str, str, str]:
result = (self.graph_id, self.source, self.target)
return result
- Interpreter exploration
from demos.event_processors.graph_events import (
EdgeDeclaredEvent,
PathKnownEvent,
)
edge = EdgeDeclaredEvent(
graph_id="demo-graph",
source="A",
target="B",
)
path = PathKnownEvent(
graph_id=edge.graph_id,
source=edge.source,
target=edge.target,
hop_count=1,
)
edge
path
path.fact_key()
Expected output shape:
EdgeDeclaredEvent(graph_id='demo-graph', source='A', target='B')
PathKnownEvent(graph_id='demo-graph', source='A', target='B', hop_count=1)
('demo-graph', 'A', 'B')
Phase 10: Create a demo-only nonblocking receive helper
- Goal
Prepare for processor scheduling without editing mbus.
- Discussion points
Receiver.receive()blocks until a message is available.- A scheduler needs to try a processor and move on if no message is waiting.
- The current package does not expose a public nonblocking receiver method.
- This helper is local demonstration code. It can be replaced by a real public API later.
- File edit
Create:
demos/event_processors/demo_receive.py
#!/usr/bin/env python3
# demos/event_processors/demo_receive.py
"""Demo-only helpers for receiver inspection and scheduling."""
from queue import Empty
from mbus.broker.endpoints import Receiver
from mbus.message.records import ReceivedMessage
class DemoReceiverError(RuntimeError):
pass
def try_receive(receiver: Receiver) -> ReceivedMessage | None:
queue = getattr(receiver, "_queue", None)
if queue is None:
raise DemoReceiverError(
"demo try_receive requires the current direct broker receiver"
)
try:
message = queue.get_nowait()
except Empty:
message = None
return message
- Interpreter check
Restart the interpreter, then run:
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.demo_receive import try_receive
bus = DirectMessageBus()
receiver = bus.subscribe(
msg_topic="demo.signals",
msg_producer="demo-source",
msg_type="ping",
)
try_receive(receiver)
Expected output:
None
Now emit a message:
from demos.event_processors.capture_history import AOStoreCaptureSink
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)
emitter = bus.register_emitter(
msg_topic="demo.signals",
msg_producer="demo-source",
default_msg_type="ping",
)
emitter.emit({"text": "demo nonblocking receive"})
message = try_receive(receiver)
message
message.payload
Expected output shape:
ReceivedMessage(...)
{'text': 'demo nonblocking receive'}
Phase 11: Create the direct path processor
- Goal
Implement the graph rule:
edge(A, B) -> path(A, B)
- Discussion points
- The source emits edge events.
- The direct path processor emits derived path events.
- Other processors can subscribe to derived path events.
- The processor can be run by hand or by a scheduler.
- File edit
Create:
demos/event_processors/graph_reachability.py
#!/usr/bin/env python3
# demos/event_processors/graph_reachability.py
"""Graph reachability processors for message bus demonstrations."""
from mbus.broker.direct import DirectMessageBus
from mbus.broker.endpoints import Emitter, Receiver
from demos.event_processors.demo_receive import try_receive
from demos.event_processors.graph_events import (
EdgeDeclaredEvent,
PathKnownEvent,
)
class DirectPathProcessor:
_receiver: Receiver
_emitter: Emitter
_known_paths: set[tuple[str, str, str]]
def __init__(self, bus: DirectMessageBus) -> None:
self._receiver = bus.subscribe(
msg_topic="demo.graph.edge",
msg_producer="graph-source",
msg_type="edge-declared",
)
self._emitter = bus.register_emitter(
msg_topic="demo.graph.path",
msg_producer="direct-path",
default_msg_type="path-known",
)
self._known_paths = set()
def process_one(self) -> PathKnownEvent | None:
message = self._receiver.receive()
result = self._process_payload(message.payload)
return result
def process_available(self) -> int:
message = try_receive(self._receiver)
if message is None:
work_count = 0
else:
self._process_payload(message.payload)
work_count = 1
return work_count
def _process_payload(self, payload: object) -> PathKnownEvent | None:
if not isinstance(payload, EdgeDeclaredEvent):
raise TypeError(
f"expected EdgeDeclaredEvent, got {type(payload).__name__}"
)
path = PathKnownEvent(
graph_id=payload.graph_id,
source=payload.source,
target=payload.target,
hop_count=1,
)
if path.fact_key() not in self._known_paths:
self._known_paths.add(path.fact_key())
self._emitter.emit(path)
result = path
else:
result = None
return result
- Interpreter check
Restart the interpreter, then run:
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import AOStoreCaptureSink
from demos.event_processors.graph_events import EdgeDeclaredEvent
from demos.event_processors.graph_reachability import DirectPathProcessor
bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)
path_receiver = bus.subscribe(
msg_topic="demo.graph.path",
msg_producer="direct-path",
msg_type="path-known",
)
edge_emitter = bus.register_emitter(
msg_topic="demo.graph.edge",
msg_producer="graph-source",
default_msg_type="edge-declared",
)
processor = DirectPathProcessor(bus)
Call the processor before any edge exists:
processor.process_available()
Expected output:
0
Emit one edge:
edge_emitter.emit(
EdgeDeclaredEvent(
graph_id="demo-graph",
source="A",
target="B",
)
)
Process the edge:
processor.process_available()
Expected output:
1
Receive the derived path:
path_message = path_receiver.receive()
path_message.payload
Expected output:
PathKnownEvent(graph_id='demo-graph', source='A', target='B', hop_count=1)
Phase 12: Explore the path extension rule by hand
- Goal
Understand the feedback rule before implementing it.
- Discussion points
- If
AreachesB, and there is an edge fromBtoC, thenAreachesC. - Derived path facts can lead to more derived path facts.
- Duplicate prevention keeps the feedback loop from repeating the same fact forever.
- Interpreter exploration
from demos.event_processors.graph_events import (
EdgeDeclaredEvent,
PathKnownEvent,
)
edge_bc = EdgeDeclaredEvent(
graph_id="demo-graph",
source="B",
target="C",
)
path_ab = PathKnownEvent(
graph_id="demo-graph",
source="A",
target="B",
hop_count=1,
)
Check whether the path and edge connect:
path_ab.target
edge_bc.source
path_ab.target == edge_bc.source
Expected output:
'B'
'B'
True
Derive a new path:
path_ac = PathKnownEvent(
graph_id="demo-graph",
source=path_ab.source,
target=edge_bc.target,
hop_count=path_ab.hop_count + 1,
)
path_ac
path_ac.fact_key()
Expected output:
PathKnownEvent(graph_id='demo-graph', source='A', target='C', hop_count=2)
('demo-graph', 'A', 'C')
Manually prevent duplicate path facts:
known_paths = set()
known_paths.add(path_ab.fact_key())
path_ac.fact_key() in known_paths
if path_ac.fact_key() not in known_paths:
known_paths.add(path_ac.fact_key())
known_paths
Expected output shape:
False
{('demo-graph', 'A', 'B'), ('demo-graph', 'A', 'C')}
Phase 13: Add the path extension processor
- Goal
Implement the feedback rule:
path(A, B) + edge(B, C) -> path(A, C)
- Discussion points
- The extension processor listens to edge events and path events.
- It emits new path events when a known path can be extended.
- It also listens to its own output because an extended path may be extendable again.
- This is a controlled feedback loop.
- File edit
Append this class to:
demos/event_processors/graph_reachability.py
class PathExtensionProcessor:
_edge_receiver: Receiver
_direct_path_receiver: Receiver
_extension_path_receiver: Receiver
_emitter: Emitter
_edges: set[tuple[str, str, str]]
_paths: dict[tuple[str, str, str], PathKnownEvent]
def __init__(self, bus: DirectMessageBus) -> None:
self._edge_receiver = bus.subscribe(
msg_topic="demo.graph.edge",
msg_producer="graph-source",
msg_type="edge-declared",
)
self._direct_path_receiver = bus.subscribe(
msg_topic="demo.graph.path",
msg_producer="direct-path",
msg_type="path-known",
)
self._extension_path_receiver = bus.subscribe(
msg_topic="demo.graph.path",
msg_producer="path-extension",
msg_type="path-known",
)
self._emitter = bus.register_emitter(
msg_topic="demo.graph.path",
msg_producer="path-extension",
default_msg_type="path-known",
)
self._edges = set()
self._paths = {}
def process_edge_available(self) -> int:
message = try_receive(self._edge_receiver)
if message is None:
work_count = 0
else:
paths = self._process_edge_payload(message.payload)
work_count = 1 + len(paths)
return work_count
def process_direct_path_available(self) -> int:
message = try_receive(self._direct_path_receiver)
if message is None:
work_count = 0
else:
paths = self._process_path_payload(message.payload)
work_count = 1 + len(paths)
return work_count
def process_extension_path_available(self) -> int:
message = try_receive(self._extension_path_receiver)
if message is None:
work_count = 0
else:
paths = self._process_path_payload(message.payload)
work_count = 1 + len(paths)
return work_count
def _process_edge_payload(
self,
payload: object,
) -> list[PathKnownEvent]:
if not isinstance(payload, EdgeDeclaredEvent):
raise TypeError(
f"expected EdgeDeclaredEvent, got {type(payload).__name__}"
)
edge = payload
self._edges.add((edge.graph_id, edge.source, edge.target))
paths = self._derive_from_new_edge(edge)
return paths
def _process_path_payload(
self,
payload: object,
) -> list[PathKnownEvent]:
if not isinstance(payload, PathKnownEvent):
raise TypeError(
f"expected PathKnownEvent, got {type(payload).__name__}"
)
path = payload
self._paths[path.fact_key()] = path
paths = self._derive_from_new_path(path)
return paths
def _derive_from_new_edge(
self,
edge: EdgeDeclaredEvent,
) -> list[PathKnownEvent]:
paths = []
for known_path in self._paths.values():
if known_path.graph_id != edge.graph_id:
continue
if known_path.target != edge.source:
continue
next_path = PathKnownEvent(
graph_id=edge.graph_id,
source=known_path.source,
target=edge.target,
hop_count=known_path.hop_count + 1,
)
if self._emit_if_new(next_path):
paths.append(next_path)
return paths
def _derive_from_new_path(
self,
path: PathKnownEvent,
) -> list[PathKnownEvent]:
paths = []
for graph_id, edge_source, edge_target in self._edges:
if graph_id != path.graph_id:
continue
if edge_source != path.target:
continue
next_path = PathKnownEvent(
graph_id=path.graph_id,
source=path.source,
target=edge_target,
hop_count=path.hop_count + 1,
)
if self._emit_if_new(next_path):
paths.append(next_path)
return paths
def _emit_if_new(self, path: PathKnownEvent) -> bool:
if path.fact_key() in self._paths:
result = False
else:
self._paths[path.fact_key()] = path
self._emitter.emit(path)
result = True
return result
- Interpreter check
Restart the interpreter, then run:
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
AOStoreCaptureSink,
CaptureHistory,
)
from demos.event_processors.graph_events import EdgeDeclaredEvent
from demos.event_processors.graph_reachability import (
DirectPathProcessor,
PathExtensionProcessor,
)
bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)
extension_receiver = bus.subscribe(
msg_topic="demo.graph.path",
msg_producer="path-extension",
msg_type="path-known",
)
edge_emitter = bus.register_emitter(
msg_topic="demo.graph.edge",
msg_producer="graph-source",
default_msg_type="edge-declared",
)
direct_processor = DirectPathProcessor(bus)
extension_processor = PathExtensionProcessor(bus)
Emit two edges in fixed order:
edge_emitter.emit(
EdgeDeclaredEvent(
graph_id="demo-graph",
source="A",
target="B",
)
)
edge_emitter.emit(
EdgeDeclaredEvent(
graph_id="demo-graph",
source="B",
target="C",
)
)
Process the direct paths:
direct_processor.process_available()
direct_processor.process_available()
Expected output:
1
1
Let the extension processor learn the edges:
extension_processor.process_edge_available()
extension_processor.process_edge_available()
Expected output:
1
1
Let the extension processor learn the direct paths:
extension_processor.process_direct_path_available()
extension_processor.process_direct_path_available()
Expected output shape:
1
2
The second call may emit the extended A -> C path after the processor has seen both direct paths and both edges.
Receive the extended path:
message = extension_receiver.receive()
message.payload
Expected output:
PathKnownEvent(graph_id='demo-graph', source='A', target='C', hop_count=2)
Inspect captured graph path history:
history = CaptureHistory(sink.all_records())
path_messages = history.by_topic("demo.graph.path")
for message in path_messages:
path = message.payload
print(
message.bus_sequence,
message.msg_producer,
path.source,
path.target,
path.hop_count,
)
Expected output shape:
... direct-path A B 1
... direct-path B C 1
... path-extension A C 2
Phase 14: Add a randomized processor scheduler
- Goal
Simulate concurrency effects without changing source input order.
- Discussion points
- The source will emit the same edge events in the same order every run.
- The scheduler varies which processor gets a chance to run first.
- This simulates uncertainty from processing time, scheduling priority, startup timing, or transmission delay.
- The graph example should converge to the same final path facts even when processor scheduling - and the order of intermediate derived processor events - differs.
- File edit
Create:
demos/event_processors/runners.py
#!/usr/bin/env python3
# demos/event_processors/runners.py
"""Small processor runners for event processor demonstrations."""
from collections.abc import Callable
from dataclasses import dataclass, field
from random import Random
@dataclass(frozen=True, kw_only=True)
class ProcessorStep:
name: str
run: Callable[[], int]
@dataclass
class RandomProcessorRunner:
steps: tuple[ProcessorStep, ...]
seed: int
max_rounds: int = 30
trace: list[tuple[int, str, int]] = field(default_factory=list)
def run_until_quiet(self) -> list[tuple[int, str, int]]:
rng = Random(self.seed)
for round_index in range(self.max_rounds):
work_count = self._run_round(rng, round_index)
if work_count == 0:
break
result = list(self.trace)
return result
def _run_round(self, rng: Random, round_index: int) -> int:
steps = list(self.steps)
rng.shuffle(steps)
round_work_count = 0
for step in steps:
step_work_count = step.run()
self.trace.append((round_index, step.name, step_work_count))
round_work_count += step_work_count
return round_work_count
- Interpreter check
Restart the interpreter, then inspect how a runner can shuffle placeholder steps:
from demos.event_processors.runners import (
ProcessorStep,
RandomProcessorRunner,
)
calls = []
def first_step() -> int:
calls.append("first")
return 0
def second_step() -> int:
calls.append("second")
return 0
runner = RandomProcessorRunner(
steps=(
ProcessorStep(name="first", run=first_step),
ProcessorStep(name="second", run=second_step),
),
seed=1,
)
runner.run_until_quiet()
calls
Expected output shape:
[(0, '...', 0), (0, '...', 0)]
['...', '...']
Phase 15: Run the graph example with randomized scheduling
- Goal
Run the same graph input under different processor schedules and compare the final path facts.
- Interpreter setup
from mbus.broker.direct import DirectMessageBus
from demos.event_processors.capture_history import (
AOStoreCaptureSink,
CaptureHistory,
)
from demos.event_processors.graph_events import (
EdgeDeclaredEvent,
PathKnownEvent,
)
from demos.event_processors.graph_reachability import (
DirectPathProcessor,
PathExtensionProcessor,
)
from demos.event_processors.runners import (
ProcessorStep,
RandomProcessorRunner,
)
Define a helper function in the interpreter:
def run_graph_demo(seed: int) -> tuple[
list[tuple[int, str, int]],
list[tuple[int, str, str, int]],
]:
bus = DirectMessageBus()
sink = AOStoreCaptureSink()
bus.set_capture_sink(sink)
edge_emitter = bus.register_emitter(
msg_topic="demo.graph.edge",
msg_producer="graph-source",
default_msg_type="edge-declared",
)
direct_processor = DirectPathProcessor(bus)
extension_processor = PathExtensionProcessor(bus)
edges = (
EdgeDeclaredEvent(
graph_id="demo-graph",
source="A",
target="B",
),
EdgeDeclaredEvent(
graph_id="demo-graph",
source="B",
target="C",
),
EdgeDeclaredEvent(
graph_id="demo-graph",
source="C",
target="D",
),
)
for edge in edges:
edge_emitter.emit(edge)
steps = (
ProcessorStep(
name="direct path from edge",
run=direct_processor.process_available,
),
ProcessorStep(
name="extension learns edge",
run=extension_processor.process_edge_available,
),
ProcessorStep(
name="extension learns direct path",
run=extension_processor.process_direct_path_available,
),
ProcessorStep(
name="extension learns extension path",
run=extension_processor.process_extension_path_available,
),
)
runner = RandomProcessorRunner(
steps=steps,
seed=seed,
)
trace = runner.run_until_quiet()
history = CaptureHistory(sink.all_records())
path_facts = []
for message in history.by_topic("demo.graph.path"):
path = message.payload
if isinstance(path, PathKnownEvent):
fact = (
message.bus_sequence,
path.source,
path.target,
path.hop_count,
)
path_facts.append(fact)
result = (trace, path_facts)
return result
Run one schedule:
trace_1, paths_1 = run_graph_demo(1)
trace_1
paths_1
Expected output shape:
[(0, '...', ...), (0, '...', ...), ...]
[(..., 'A', 'B', 1), (..., 'B', 'C', 1), (..., 'C', 'D', 1), ...]
Run another schedule:
trace_2, paths_2 = run_graph_demo(2)
trace_2
paths_2
Compare the schedules:
trace_1 == trace_2
Expected output:
False
Compare the final path facts without bus sequence numbers:
final_paths_1 = {
(source, target, hop_count)
for _, source, target, hop_count in paths_1
}
final_paths_2 = {
(source, target, hop_count)
for _, source, target, hop_count in paths_2
}
final_paths_1
final_paths_2
final_paths_1 == final_paths_2
Expected output:
{('A', 'B', 1), ('B', 'C', 1), ('C', 'D', 1), ('A', 'C', 2), ('B', 'D', 2), ('A', 'D', 3)}
{('A', 'B', 1), ('B', 'C', 1), ('C', 'D', 1), ('A', 'C', 2), ('B', 'D', 2), ('A', 'D', 3)}
True
Inspect the first schedule:
for round_index, step_name, work_count in trace_1:
print(round_index, step_name, work_count)
Inspect the first captured path order:
for bus_sequence, source, target, hop_count in paths_1:
print(bus_sequence, source, target, hop_count)
Inspect the second schedule and path order:
for round_index, step_name, work_count in trace_2:
print(round_index, step_name, work_count)
for bus_sequence, source, target, hop_count in paths_2:
print(bus_sequence, source, target, hop_count)
- What this phase shows
The source input order did not change. The processor scheduling changed. The captured event order may differ. The final reachability facts should match.
This models an important distributed-systems lesson: event history order and final derived state are related, but they are not the same thing.
Phase 16: Later refinement: graph index from history
- Goal
Introduce restart/recovery thinking only after the basic graph feedback example is familiar.
- Discussion points
- The first graph processors keep local working state.
- Local working state is enough for a short demonstration.
- Longer-lived processors need recovery behavior after restart.
- Since edge and path facts are captured as events, a graph index can later be rebuilt from history.
- This points toward projection-style services without changing the basic processor model.
- Suggested later file
Create later:
demos/event_processors/graph_index.py
Suggested responsibilities:
read captured history
collect EdgeDeclaredEvent payloads
collect PathKnownEvent payloads
answer whether a path fact exists
find edges starting at a node
find paths ending at a node
This refinement belongs after the team has already run the graph reachability demo successfully.
Phase 17: Later refinement: shell command reconstruction
- Goal
Connect the TTY examples back to the shell command event shape used by milestones.
- Discussion points
ShellCommandEventis the high-level event consumed by the milestone processor.TTYReadEvent,TTYWriteEvent, andLineDisciplineEventare lower-level observations.- A reconstructor can subscribe to line-discipline events and query recent related observations.
- The reconstructor should emit shell command events; it should not perform regex analysis itself.
- Suggested responsibilities
listen for completed line-discipline events
query history since the previous command boundary for the same session
collect matching reads, writes, and timing events
construct ShellCommandEvent
emit ShellCommandEvent on demo.shell.command
This is an appropriate follow-up exercise after the team understands both milestone processing and history queries.