Tutorial 6: Streaming I/O
Learn how to write and read STL statements as streams — useful for logging, real-time data, and large files.
What you’ll learn:
- Write statements with
STLEmitter - Stream-read with
stream_parse()andSTLReader - Use auto-timestamp and namespace features
- Filter while streaming
- Use tail mode for real-time monitoring
Prerequisites: Tutorial 5: Querying
Step 1: Write with STLEmitter
STLEmitter is a context manager for writing STL statements to files:
from stl_parser import STLEmitter
with STLEmitter("events.stl") as emitter:
emitter.emit("[Sensor_A]", "[Reading_Temperature]",
confidence=0.95, rule="empirical", domain="IoT")
emitter.emit("[Sensor_B]", "[Reading_Humidity]",
confidence=0.90, rule="empirical", domain="IoT")
This creates events.stl with one statement per line, each with an auto-generated timestamp.
Step 2: Auto-Timestamp
By default, STLEmitter injects a timestamp modifier on every statement:
with STLEmitter("log.stl") as emitter:
emitter.emit("[User_Login]", "[Session_Start]", confidence=1.0)
# Writes: [User_Login] -> [Session_Start] ::mod(confidence=1.0, timestamp="2026-02-12T10:30:00Z")
To disable:
with STLEmitter("log.stl", auto_timestamp=False) as emitter:
emitter.emit("[A]", "[B]", confidence=0.9)
# No timestamp added
Step 3: Namespace Prefix
Set a default namespace that applies to all emitted anchors:
with STLEmitter("medical.stl", namespace="Med") as emitter:
emitter.emit("[Symptom_Fever]", "[Condition_Infection]",
rule="causal", confidence=0.80)
# Writes: [Med:Symptom_Fever] -> [Med:Condition_Infection] ::mod(...)
Step 4: Write Comments and Sections
with STLEmitter("knowledge.stl") as emitter:
emitter.section("Physics")
emitter.emit("[Gravity]", "[Acceleration]", rule="causal", confidence=0.99)
emitter.emit("[Mass]", "[Weight]", rule="logical", confidence=0.98)
emitter.section("Chemistry")
emitter.comment("Basic element relationships")
emitter.emit("[Water]", "[H2O]", rule="definitional", confidence=0.99)
Output:
# Physics
[Gravity] -> [Acceleration] ::mod(confidence=0.99, rule="causal", timestamp="...")
[Mass] -> [Weight] ::mod(confidence=0.98, rule="logical", timestamp="...")
# Chemistry
# Basic element relationships
[Water] -> [H2O] ::mod(confidence=0.99, rule="definitional", timestamp="...")
Step 5: Emit Pre-Built Statements
from stl_parser import stl, STLEmitter
stmt = stl("[A]", "[B]").mod(confidence=0.9, rule="causal").build()
with STLEmitter("output.stl") as emitter:
emitter.emit_statement(stmt)
Step 6: Write to stdout
Use the stream parameter instead of log_path:
import sys
from stl_parser import STLEmitter
with STLEmitter(stream=sys.stdout, auto_timestamp=False) as emitter:
emitter.emit("[A]", "[B]", confidence=0.9)
# Prints directly to terminal
Step 7: Stream-Read with stream_parse()
stream_parse() is a generator that yields statements one at a time — memory-efficient for large files:
from stl_parser import stream_parse
for stmt in stream_parse("events.stl"):
print(f"{stmt.source.name} -> {stmt.target.name}")
Filter While Streaming
Apply query filters during streaming:
# Only yield high-confidence statements
for stmt in stream_parse("events.stl", where={"confidence__gte": 0.9}):
print(f" {stmt}")
Error Handling
# Skip unparseable lines (default)
for stmt in stream_parse("messy.stl", on_error="skip"):
print(stmt)
# Raise on first error
try:
for stmt in stream_parse("messy.stl", on_error="raise"):
print(stmt)
except Exception as e:
print(f"Error: {e}")
Step 8: STLReader Context Manager
STLReader provides more control and statistics:
from stl_parser import STLReader
with STLReader("events.stl") as reader:
for stmt in reader:
print(f"{stmt.source.name} -> {stmt.target.name}")
# Access statistics
stats = reader.stats
print(f"\nLines processed: {stats.lines_processed}")
print(f"Statements yielded: {stats.statements_yielded}")
print(f"Errors skipped: {stats.errors_skipped}")
Read All at Once
with STLReader("events.stl", where={"rule": "causal"}) as reader:
result = reader.read_all()
print(f"Loaded {len(result.statements)} causal statements")
Step 9: Tail Mode
STLReader supports tail mode for real-time monitoring — like tail -f:
from stl_parser import STLReader
# Watch for new statements (polls every 0.5 seconds)
with STLReader("live_events.stl", tail=True, tail_interval=0.5) as reader:
for stmt in reader:
print(f"NEW: {stmt.source.name} -> {stmt.target.name}")
# This loop blocks, waiting for new lines appended to the file
# Press Ctrl+C to stop
Tail mode is useful for monitoring live STL event logs written by an STLEmitter in another process.
Step 10: Stream from Various Sources
stream_parse() accepts multiple source types:
from stl_parser import stream_parse
from io import StringIO
# From file path
for stmt in stream_parse("data.stl"):
pass
# From StringIO
text = StringIO('[A] -> [B] ::mod(confidence=0.9)\n[C] -> [D] ::mod(confidence=0.8)')
for stmt in stream_parse(text):
pass
# From list of strings
lines = [
'[A] -> [B] ::mod(confidence=0.9)',
'[C] -> [D] ::mod(confidence=0.8)',
]
for stmt in stream_parse(lines):
pass
Complete Example
from stl_parser import STLEmitter, STLReader, stream_parse
import tempfile
import os
# Create a temp file
path = tempfile.mktemp(suffix=".stl")
# Write structured events
with STLEmitter(path, namespace="Sys", auto_timestamp=True) as emitter:
emitter.section("System Events")
emitter.emit("[Event_UserLogin]", "[Session_Active]",
confidence=1.0, rule="causal")
emitter.emit("[Event_APICall]", "[Response_Success]",
confidence=0.95, rule="empirical", domain="api")
emitter.emit("[Event_Error]", "[Alert_Triggered]",
confidence=0.80, rule="causal", domain="monitoring")
# Stream-read with filter
print("High-confidence events:")
for stmt in stream_parse(path, where={"confidence__gte": 0.9}):
print(f" {stmt.source.name} -> {stmt.target.name}")
# Read with statistics
print("\nFull stats:")
with STLReader(path) as reader:
stmts = list(reader)
stats = reader.stats
print(f" Statements: {stats.statements_yielded}")
print(f" Lines processed: {stats.lines_processed}")
os.unlink(path)
Next: Tutorial 7: Diff & Patch