oMMP Framework QuickStart Guide
Observer-Prefixed Progressive Refinement Architecture
Building substrate-agnostic anomaly classification systems with cryptographic observer protocols
1. Overview
The oMMP implementation focuses on creating systems where each observation is uniquely prefixed with observer data (o), accepts observations from any substrate type (human, AI, sensor), implements cryptographic verification with Byzantine fault tolerance, and maintains absolute record integrity with physics constraints.
2. Progressive Refinement Architecture
2.1 Living Record System with Observer Prefix
Core Rules
Progressive Refinement
Records evolve through refinement without losing history:
Ω(t+1) = Ω(t) ⊕ Δ(source_anonymous)
With constraint: I(Ω(t+1)) ≥ I(Ω(t))
Implementation
Key Requirements
- Immutable observation history
- Cryptographic signatures required
- Entropy must never decrease
- Full audit trail maintained
import hashlib
import time
import numpy as np
from typing import Dict, Tuple, Any
from datetime import datetime
class oMMPRecord:
"""Observer-prefixed MMP records with uncertainty handling"""
def __init__(self, observer_id: str, initial_observation: Dict[str, Any],
spacetime: Tuple[float, float, float, float],
uncertainty: Tuple[float, float, float, float],
reference_frame: str = "Earth_Frame",
substrate_type: str = "unknown"):
# Observer prefix (o)
self.observer_header = {
'id': observer_id,
'spacetime': spacetime, # (x, y, z, t)
'uncertainty': uncertainty, # (δx, δy, δz, δt)
'reference_frame': reference_frame,
'substrate': substrate_type
}
# MMP data
self.mmp_data = initial_observation
self.record_id = self._hash(self.observer_header, self.mmp_data)
self.history = [(self._timestamp(), self.record_id)]
self.entropy = self.calculate_shannon_entropy()
def _hash(self, *args) -> str:
"""Generate SHA256 hash of arguments"""
hasher = hashlib.sha256()
for arg in args:
hasher.update(str(arg).encode())
return hasher.hexdigest()
def _timestamp(self) -> float:
"""Get current timestamp"""
return time.time()
def calculate_shannon_entropy(self) -> float:
"""Calculate Shannon entropy of MMP data"""
# Simplified entropy calculation
data_str = str(self.mmp_data)
prob_dict = {}
for char in data_str:
prob_dict[char] = prob_dict.get(char, 0) + 1
total = len(data_str)
entropy = 0
for count in prob_dict.values():
if count > 0:
p = count / total
entropy -= p * np.log2(p)
return entropy
def verify_signature(self, signature: str) -> bool:
"""Verify cryptographic signature (simplified)"""
# In production, use proper cryptographic verification
return len(signature) == 64 # Basic check
def validate_consistency(self, new_data: Dict[str, Any]) -> bool:
"""Validate data consistency"""
# Check for required fields
required_fields = ['observation_type', 'data']
return all(field in new_data for field in required_fields)
def merge_observations(self, existing: Dict[str, Any],
new: Dict[str, Any]) -> Dict[str, Any]:
"""Merge two observations"""
merged = existing.copy()
merged.update(new)
return merged
def update(self, new_data: Dict[str, Any], signature: str):
"""Progressive refinement with cryptographic verification"""
if self.verify_signature(signature) and \
self.validate_consistency(new_data):
# Store previous entropy
prev_entropy = self.entropy
self.mmp_data = self.merge_observations(
self.mmp_data,
new_data
)
self.history.append({
'timestamp': self._timestamp(),
'signature': signature,
'delta': new_data
})
# Update entropy
self.entropy = self.calculate_shannon_entropy()
# Ensure entropy doesn't decrease (2nd law)
assert self.entropy >= prev_entropy, \
"Entropy cannot decrease"
2.2 Cryptographic Observer Protocol
Observer Identity Rules
Required Elements
- Ed25519 public key
- Substrate type declaration
- Calibration data hash
- Initial reputation: 0.5
Signature Protocol
Every Observation
Must be cryptographically signed:
Ω_signed = Sign(Ω_data, private_key)
Enables trust without central authority
import hashlib
import json
import time
from typing import Dict, Any
# Simulated blockchain for demo purposes
class SimpleBlockchain:
def __init__(self):
self.observers = {}
self.block_height = 0
def add_observer(self, observer_id: Dict[str, Any]):
self.observers[observer_id['public_key']] = observer_id
self.block_height += 1
def get_current_block_height(self) -> int:
return self.block_height
blockchain = SimpleBlockchain()
def Hash(data: Any) -> str:
"""Generate hash of data"""
return hashlib.sha256(str(data).encode()).hexdigest()
def get_current_block_height() -> int:
"""Get current blockchain height"""
return blockchain.get_current_block_height()
def register_observer(public_key: str, substrate_type: str,
calibration_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Register observer with blockchain-based identity
"""
observer_id = {
'public_key': public_key,
'substrate_type': substrate_type,
'calibration_hash': Hash(calibration_data),
'reputation_score': 0.5, # Initial neutral score
'registration_block': get_current_block_height()
}
# Store in distributed ledger
blockchain.add_observer(observer_id)
return observer_id
def create_observer_header(public_key: str) -> Dict[str, Any]:
"""Create observer header from public key"""
# In production, derive from actual key
return {
'id': public_key[:8], # Shortened for display
'timestamp': time.time()
}
def sign(data: Dict[str, Any], private_key: str) -> str:
"""Sign data with private key (simplified)"""
# In production, use proper cryptographic signing
return Hash(str(data) + private_key)
def submit_to_ledger(record: Dict[str, Any], signature: str) -> Dict[str, Any]:
"""Submit record to ledger"""
return {
'status': 'submitted',
'block': get_current_block_height(),
'record_hash': Hash(record)
}
def submit_observation(observation_data: Dict[str, Any],
private_key: str) -> Dict[str, Any]:
"""Submit signed oMMP observation"""
# Create observer header
observer_header = create_observer_header(private_key[:64])
# Sign the complete oMMP record
ommp_record = {
'o': observer_header,
'MMP': observation_data
}
signature = sign(ommp_record, private_key)
return submit_to_ledger(ommp_record, signature)
3. Multi-Substrate Data Structures with Physics Constraints
Substrate Definition Rules
Each substrate S_i has inherent observational constraints:
Where:
- Λ_i = Spectral range accessible
- Τ_i = Temporal resolution limits
- Σ_i = Spatial resolution limits
- Ε_i = Inherent uncertainty function
3.1 Substrate-Aware Observations with Uncertainty
from dataclasses import dataclass
from typing import Tuple, Callable
import numpy as np
def human_uncertainty(measurement: float) -> float:
"""Human substrate uncertainty function"""
# Weber-Fechner law approximation
return 0.1 * measurement
def ai_uncertainty(measurement: float) -> float:
"""AI substrate uncertainty function"""
# Sensor-dependent, typically much lower
return 0.001 * measurement
def heisenberg_limit(position: float, momentum: float) -> Tuple[float, float]:
"""Heisenberg uncertainty principle"""
hbar = 1.054571817e-34 # Reduced Planck constant
# δx·δp ≥ ℏ/2
min_uncertainty = hbar / 2
return (min_uncertainty / momentum, min_uncertainty / position)
@dataclass
class SubstrateCapabilities:
spectral_range: Tuple[float, float] # min/max wavelength
temporal_resolution: float # seconds
spatial_resolution: float # meters
uncertainty_function: Callable # substrate-specific
heisenberg_limit: bool # quantum constraint
reference_frame: str = "Earth_Frame" # default reference frame
# Pre-defined substrate types with physics constraints
HUMAN_SUBSTRATE = SubstrateCapabilities(
spectral_range=(380e-9, 750e-9), # visible light
temporal_resolution=0.1, # 100ms
spatial_resolution=0.1, # 10cm at 10m
uncertainty_function=human_uncertainty,
heisenberg_limit=False
)
AI_SUBSTRATE = SubstrateCapabilities(
spectral_range=(0, float('inf')), # sensor dependent
temporal_resolution=1e-9, # nanosecond
spatial_resolution=0.001, # millimeter
uncertainty_function=ai_uncertainty,
heisenberg_limit=False
)
QUANTUM_SUBSTRATE = SubstrateCapabilities(
spectral_range=(0, float('inf')),
temporal_resolution=1e-15, # femtosecond
spatial_resolution=1e-10, # angstrom
uncertainty_function=heisenberg_limit,
heisenberg_limit=True # δx·δp ≥ ℏ/2
)
3.2 Gateway Interface with Reference Frames
Gateway Transform Rules
Inter-substrate communication via gateway functions:
Where information preservation requires:
import numpy as np
from typing import Dict, Any
class GatewayInterface:
"""Enable communication between different substrate types"""
def __init__(self):
self.transform_cache = {}
def shannon_entropy(self, data: Any) -> float:
"""Calculate Shannon entropy of data"""
data_str = str(data)
prob_dict = {}
for char in data_str:
prob_dict[char] = prob_dict.get(char, 0) + 1
total = len(data_str)
entropy = 0
for count in prob_dict.values():
if count > 0:
p = count / total
entropy -= p * np.log2(p)
return entropy
def transform_reference_frame(self, from_rf: str, to_rf: str) -> Dict[str, Any]:
"""Transform between reference frames"""
# Simplified transformation matrix
if from_rf == to_rf:
return {'matrix': np.eye(4), 'type': 'identity'}
# In production, implement proper Lorentz transformations
return {
'matrix': np.eye(4), # Identity for now
'type': f"{from_rf}_to_{to_rf}"
}
def calculate_substrate_overlap(self, from_sub: SubstrateCapabilities,
to_sub: SubstrateCapabilities) -> Dict[str, Any]:
"""Calculate capability overlap between substrates"""
# Spectral overlap
spectral_overlap = (
max(from_sub.spectral_range[0], to_sub.spectral_range[0]),
min(from_sub.spectral_range[1], to_sub.spectral_range[1])
)
# Resolution overlap (use coarser resolution)
temporal_res = max(from_sub.temporal_resolution, to_sub.temporal_resolution)
spatial_res = max(from_sub.spatial_resolution, to_sub.spatial_resolution)
return {
'spectral': spectral_overlap,
'temporal_resolution': temporal_res,
'spatial_resolution': spatial_res
}
def project_to_substrate(self, ommp_record: Dict[str, Any],
to_substrate: SubstrateCapabilities,
overlap: Dict[str, Any],
transformed_rf: Dict[str, Any]) -> Dict[str, Any]:
"""Project observation to target substrate capabilities"""
# Apply reference frame transformation
transformed = ommp_record.copy()
# Apply capability constraints
if 'spectral_data' in transformed.get('MMP', {}):
# Filter spectral data to overlap range
transformed['MMP']['spectral_data'] = {
'range': overlap['spectral'],
'filtered': True
}
# Update uncertainty based on target substrate
transformed['o']['projected_substrate'] = to_substrate.__class__.__name__
return transformed
def transform(self, ommp_record: Dict[str, Any], to_substrate: SubstrateCapabilities):
"""Transform oMMP record between substrates"""
from_substrate = ommp_record['o']['substrate']
# Handle reference frame transformation
transformed_rf = self.transform_reference_frame(
ommp_record['o'].get('reference_frame', 'Earth_Frame'),
to_substrate.reference_frame
)
# Calculate information overlap
# Note: from_substrate is a string, need to map to capabilities
# In production, maintain a registry of substrate capabilities
overlap = {
'spectral': (380e-9, 750e-9),
'temporal_resolution': 0.1,
'spatial_resolution': 0.1
}
# Transform within overlapping capabilities
transformed = self.project_to_substrate(
ommp_record,
to_substrate,
overlap,
transformed_rf
)
# Ensure information preservation constraint
original_entropy = self.shannon_entropy(ommp_record)
transformed_entropy = self.shannon_entropy(transformed)
overlap_entropy = self.shannon_entropy(overlap)
# Allow for small numerical errors
assert transformed_entropy >= original_entropy - overlap_entropy - 0.01, \
"Information preservation violation"
return transformed
4. Byzantine Fault Tolerance Implementation
4.1 Consensus Mechanism
Byzantine Rule
2f+1 Consensus
Accept observation Ω only if:
- At least 2f+1 observers report consistent data
- Where f = maximum number of faulty observers
- Consistency measured by: C(Ω_i, Ω_j) > 0.7
Outlier Detection
3σ Rule
- If |Ω_i - Ω_median| > 3σ: flag as potential Byzantine observer
- Reduce reputation_score
- Require additional validation
import numpy as np
from typing import List, Dict, Any
class ByzantineConsensus:
"""Handle potentially faulty or malicious observers"""
def __init__(self):
self.reputation_scores = {}
self.suspicious_observers = set()
def find_similar_observations(self, new_obs: Dict[str, Any],
existing: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Find observations of similar phenomena"""
similar = []
# Check spacetime proximity and observation type
new_spacetime = new_obs['o']['spacetime']
new_type = new_obs['MMP'].get('observation_type', 'unknown')
for record in existing:
record_spacetime = record['o']['spacetime']
record_type = record['MMP'].get('observation_type', 'unknown')
# Calculate spacetime distance
distance = np.sqrt(sum((a - b)**2 for a, b in
zip(new_spacetime, record_spacetime)))
# Similar if nearby and same type
if distance < 10.0 and new_type == record_type:
similar.append(record)
return similar
def calculate_consistency(self, obs1: Dict[str, Any],
obs2: Dict[str, Any]) -> float:
"""Calculate consistency between two observations"""
# Simplified consistency metric
consistency = 1.0
# Check key fields match
for key in ['observation_type', 'phenomenon_class']:
if key in obs1['MMP'] and key in obs2['MMP']:
if obs1['MMP'][key] != obs2['MMP'][key]:
consistency *= 0.5
# Check numerical values are within uncertainty
if 'value' in obs1['MMP'] and 'value' in obs2['MMP']:
val1 = obs1['MMP']['value']
val2 = obs2['MMP']['value']
uncertainty = max(obs1['o']['uncertainty'][0],
obs2['o']['uncertainty'][0])
if abs(val1 - val2) > 3 * uncertainty:
consistency *= 0.3
return consistency
def update_reputation(self, observer_id: str, consistency: float):
"""Update observer reputation based on consistency"""
if observer_id not in self.reputation_scores:
self.reputation_scores[observer_id] = 0.5
# Exponential moving average
alpha = 0.1
self.reputation_scores[observer_id] = (
alpha * consistency +
(1 - alpha) * self.reputation_scores[observer_id]
)
def flag_suspicious_observer(self, observer_id: str):
"""Flag observer as potentially malicious"""
self.suspicious_observers.add(observer_id)
# Reduce reputation
if observer_id in self.reputation_scores:
self.reputation_scores[observer_id] *= 0.5
def validate_observation(self, new_observation: Dict[str, Any],
existing_records: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Apply Byzantine fault tolerance rules"""
# Find similar observations (same event, different observers)
similar = self.find_similar_observations(
new_observation,
existing_records
)
if len(similar) < 3:
# Not enough data for consensus
return {'status': 'pending', 'weight': 0.1}
# Calculate consistency with existing records
consistencies = []
for record in similar:
c = self.calculate_consistency(new_observation, record)
consistencies.append(c)
median_consistency = np.median(consistencies)
# Byzantine rule: need 2f+1 consistent observations
f = int(len(similar) / 3) # Max faulty observers
consistent_count = sum(1 for c in consistencies if c > 0.7)
observer_id = new_observation['o']['id']
if consistent_count >= 2 * f + 1:
# Update observer reputation
self.update_reputation(observer_id, median_consistency)
return {'status': 'accepted', 'weight': median_consistency}
else:
# Potential Byzantine observer
self.flag_suspicious_observer(observer_id)
return {'status': 'rejected', 'reason': 'byzantine'}
5. Zero-Knowledge Proof Implementation
5.1 Anonymous Observations with ZK Proofs
Anonymous Rules
ZK Proof Requirements
Support zero-knowledge proofs for sensitive observations:
ZK_Proof = Prove{"I observed Ω with properties P" WITHOUT revealing observer identity}
Weight Adjustment
Anonymous Weighting
Anonymous observations receive lower initial weight:
w_anonymous = 0.1 × w_identified
Can gain trust through consistency
import hashlib
import time
import random
from typing import Dict, Any, Tuple
class ZeroKnowledgeObserver:
"""Submit observations without revealing identity"""
def __init__(self):
self.commitment_nonces = {}
def pedersen_commitment(self, data: Any) -> Tuple[str, str]:
"""Create Pedersen commitment to data"""
# Simplified commitment scheme
nonce = str(random.random())
commitment = hashlib.sha256(
(str(data) + nonce).encode()
).hexdigest()
# Store nonce for later reveal
self.commitment_nonces[commitment] = nonce
return commitment, nonce
def extract_properties(self, observation_data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract observable properties without revealing details"""
properties = {
'has_spectral_data': 'spectral_data' in observation_data,
'observation_type': observation_data.get('observation_type', 'unknown'),
'data_dimensions': len(observation_data.get('data', [])),
'timestamp_decade': int(time.time() // (10 * 365 * 24 * 3600))
}
return properties
def blur_timestamp(self, timestamp: float) -> Tuple[float, float]:
"""Blur timestamp to protect privacy"""
# Add random noise within 1 hour
noise = random.uniform(-3600, 3600)
return (timestamp + noise - 3600, timestamp + noise + 3600)
def blur_location(self, x: float, y: float, z: float,
t: float) -> Tuple[float, float, float, float]:
"""Blur spacetime location for privacy"""
# Add random noise to each coordinate
blur_radius = 1000 # 1km
return (
x + random.uniform(-blur_radius, blur_radius),
y + random.uniform(-blur_radius, blur_radius),
z + random.uniform(-blur_radius, blur_radius),
t + random.uniform(-3600, 3600) # 1 hour
)
def inflate_uncertainty(self, dx: float, dy: float, dz: float,
dt: float) -> Tuple[float, float, float, float]:
"""Inflate uncertainty to account for blurring"""
inflation_factor = 10
return (
dx * inflation_factor,
dy * inflation_factor,
dz * inflation_factor,
dt * inflation_factor
)
def create_zk_proof(self, proof_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create zero-knowledge proof (simplified)"""
# In production, use proper ZK proof systems
proof = {
'commitment': proof_data['commitment'],
'statement_hash': hashlib.sha256(
proof_data['statement'].encode()
).hexdigest(),
'properties_hash': hashlib.sha256(
str(proof_data['properties']).encode()
).hexdigest(),
'timestamp_range': proof_data['timestamp_range'],
'proof_type': 'simplified_zkp'
}
return proof
def create_anonymous_observation(self, observation_data: Dict[str, Any],
spacetime: Tuple[float, float, float, float] = (0, 0, 0, 0),
uncertainty: Tuple[float, float, float, float] = (1, 1, 1, 1)):
"""Create ZK proof of observation"""
# Generate commitment to observation
commitment, _ = self.pedersen_commitment(observation_data)
# Create proof of knowledge
proof = self.create_zk_proof({
'statement': 'I observed phenomenon with properties P',
'commitment': commitment,
'properties': self.extract_properties(observation_data),
'timestamp_range': self.blur_timestamp(time.time())
})
# Create anonymous oMMP record
anon_header = {
'id': 'anonymous',
'spacetime': self.blur_location(*spacetime),
'uncertainty': self.inflate_uncertainty(*uncertainty),
'substrate': 'unknown'
}
return {
'o': anon_header,
'MMP': observation_data,
'zkp': proof,
'weight': 0.1 # Lower initial weight
}
6. Adaptive Resolution with Shannon Entropy
Resolution Optimization Rules
For any continuous parameter p, find optimal discretization:
Where:
- H(p,r) = -Σ n_i/N log₂(n_i/N) [Shannon entropy]
- S(p,r) = 1 - Var(∇²p)/⟨∇²p⟩ [Smoothness metric]
- R(p,r) = P(pattern reproduces) [Reproducibility score]
import numpy as np
from typing import List, Dict, Any
from math import log2
def get_distribution(values: List[float], resolution: float) -> List[float]:
"""Get probability distribution at given resolution"""
if not values:
return []
# Create bins
min_val = min(values)
max_val = max(values)
n_bins = int((max_val - min_val) / resolution) + 1
# Count values in each bin
bins = [0] * n_bins
for val in values:
bin_idx = int((val - min_val) / resolution)
bins[bin_idx] += 1
# Convert to probabilities
total = sum(bins)
return [count / total for count in bins if count > 0]
def second_derivative(values: List[float], resolution: float) -> List[float]:
"""Calculate second derivative at given resolution"""
if len(values) < 3:
return []
# Sort values
sorted_vals = sorted(values)
# Calculate discrete second derivative
second_deriv = []
for i in range(1, len(sorted_vals) - 1):
d2 = sorted_vals[i+1] - 2*sorted_vals[i] + sorted_vals[i-1]
second_deriv.append(d2 / (resolution ** 2))
return second_deriv
def variance(values: List[float]) -> float:
"""Calculate variance of values"""
if not values:
return 0
mean_val = sum(values) / len(values)
return sum((x - mean_val) ** 2 for x in values) / len(values)
def mean(values: List[float]) -> float:
"""Calculate mean of values"""
return sum(values) / len(values) if values else 0
def pattern_reproducibility_score(values: List[float], resolution: float) -> float:
"""Calculate pattern reproducibility at given resolution"""
# Simplified: check how many values fall into same bins
distribution = get_distribution(values, resolution)
if not distribution:
return 0
# Higher score for more concentrated distributions
max_prob = max(distribution)
return max_prob
def find_optimal_resolution(ommp_records: List[Dict[str, Any]],
parameter: str) -> float:
"""
Find resolution that maximizes information while
maintaining smoothness and reproducibility
"""
# Extract parameter values from all oMMP records
values = []
for r in ommp_records:
if parameter in r.get('MMP', {}):
values.append(r['MMP'][parameter])
if not values:
return 1.0 # Default resolution
# Generate candidate resolutions
value_range = max(values) - min(values)
candidate_resolutions = [
value_range / n for n in range(5, min(50, len(values)))
]
best_quality = -float('inf')
optimal_resolution = candidate_resolutions[0] if candidate_resolutions else 1.0
for resolution in candidate_resolutions:
# Calculate Shannon entropy
distribution = get_distribution(values, resolution)
H = 0
for p in distribution:
if p > 0:
H -= p * log2(p)
# Calculate smoothness (low variance of second derivative)
second_deriv = second_derivative(values, resolution)
if second_deriv:
var = variance(second_deriv)
mean_val = mean([abs(d) for d in second_deriv])
S = 1 - (var / (mean_val + 1e-10)) # Avoid division by zero
else:
S = 0.5
# Calculate reproducibility
R = pattern_reproducibility_score(values, resolution)
# Combined quality metric (Equation 6)
quality = H * max(0, S) * R
if quality > best_quality:
best_quality = quality
optimal_resolution = resolution
return optimal_resolution
Implementation Summary
The oMMP Framework enables substrate-agnostic anomaly classification through observer-prefixed records, cryptographic verification, Byzantine fault tolerance, and physics-compliant progressive refinement.