Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend-data-elaborator/api/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ services:
environment:
- DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
- REDIS_URL=redis://redis:6379/0
- PYTHONUNBUFFERED=1
depends_on:
- redis
- postgres
Expand Down
3 changes: 2 additions & 1 deletion backend-data-elaborator/api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ shapely>=2.0.0
ecdsa>=0.18.0
redis>=5.0.0
hiredis>=2.2.0
websockets>=11.0.3
websockets>=11.0.3
aiohttp
28 changes: 26 additions & 2 deletions backend-data-elaborator/api/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
# ⚠️ SECURITY: API Key for IoT Endpoints
IOT_API_KEY = os.getenv("IOT_API_KEY", "SuperSecretIoTKey2024")

# ⚠️ SECURITY: Token for Mobile App WebSocket connections
MOBILE_WS_TOKEN = os.getenv("MOBILE_WS_TOKEN", "SecretMobileAppToken2024")

# ==========================================
# INFRASTRUCTURE INITIALIZATION
# ==========================================
Expand Down Expand Up @@ -172,10 +175,18 @@ async def rate_limiter(request: Request):
@app.websocket("/ws/alerts")
async def websocket_endpoint(websocket: WebSocket):
"""Clients connect here to receive real-time updates."""

# Extract the token from the connection query parameters
token = websocket.query_params.get("token")

# Reject unauthorized WebSocket connections instantly
if token != MOBILE_WS_TOKEN:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return

await manager.connect(websocket)
try:
while True:
# Keep the connection open. We only send data out, we don't expect data in.
await websocket.receive_text()
except (WebSocketDisconnect, Exception):
manager.disconnect(websocket)
Expand Down Expand Up @@ -282,4 +293,17 @@ async def create_misuration_async(misuration: schemas.MisurationCreate, db: Sess

# Offload the rest to the Redis queue for async processing
await redis_client.lpush("seismic_events", json.dumps(payload))
return {"status": "accepted"}
return {"status": "accepted"}

@app.get("/sensors/{id}/statistics", tags=["Data Retrieval"])
def get_sensor_statistics(id: int, db: Session = Depends(get_db)):
"""
Returns the total number of readings for a specific sensor.
"""
count = db.query(models.Misuration).filter(models.Misuration.misurator_id == id).count()

# We can expand this later with Avg/Min/Max
return {
"sensor_id": id,
"total_readings": count
}
112 changes: 43 additions & 69 deletions backend-data-elaborator/api/src/worker.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,52 @@
"""
QuakeGuard Background Worker
-----------------------------
Consumes seismic events from the Redis queue, persists data to PostgreSQL,
and detects critical seismic thresholds to generate persistent Alerts.
"""

import os
import json
import redis
import time
from datetime import datetime
from typing import Dict, Any

from src.database import SessionLocal
from src.models import Misuration, Alert

# --- CONFIGURATION ---
REDIS_HOST = 'redis'
REDIS_PORT = 6379
ALERT_THRESHOLD = 50
ALERT_WINDOW_SECONDS = 10
ALERT_COOLDOWN = 60

redis_sync = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)

def process_event(event: Dict[str, Any]) -> None:
"""Processes a single seismic event from the queue."""
zone_id = event['zone_id']

with SessionLocal() as db:
# 1. Persist raw measurement
new_misuration = Misuration(
value=event['value'],
misurator_id=event['misurator_id']
)
db.add(new_misuration)

# 2. Update real-time alert counter
zone_counter_key = f"zone:{zone_id}:alerts"
pipe = redis_sync.pipeline()
pipe.incr(zone_counter_key)
pipe.expire(zone_counter_key, ALERT_WINDOW_SECONDS)
current_count = pipe.execute()[0]

# 3. Check Threshold & Generate Alert
if current_count >= ALERT_THRESHOLD:
cooldown_key = f"zone:{zone_id}:alarm_cooldown"

if not redis_sync.exists(cooldown_key):
print(f"🚨 CRITICAL ALARM! Zone {zone_id} has {current_count} events!")

new_alert = Alert(
zone_id=zone_id,
severity=float(current_count) / 10.0,
message=f"Seismic Swarm Detected: {current_count} sensors triggered.",
timestamp=datetime.utcnow()
)
db.add(new_alert)
redis_sync.setex(cooldown_key, ALERT_COOLDOWN, "active")

db.commit()

def run_worker() -> None:
"""Continuous loop consuming messages."""
print(f"👷 Worker started. Threshold: {ALERT_THRESHOLD} events / {ALERT_WINDOW_SECONDS}s")
import redis
from sqlalchemy.orm import Session
from src.database import SessionLocal, engine
from src.models import Misuration

# Redis Config
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
redis_sync = redis.from_url(REDIS_URL, decode_responses=True)

def process_event(event: dict, db: Session):
"""Inserts a single sensor measurement into PostGIS."""
# Note: 'device_timestamp' and 'signature_hex' are not present in the models.Misuration ORM.
# We only inject the fields supported by the database model. 'created_at' is auto-generated by the DB.
new_entry = Misuration(
value=event.get("value"),
misurator_id=event.get("misurator_id")
)
db.add(new_entry)
db.commit()

def run_worker():
print("👷 Worker started. Listening for 'seismic_events'...")
db = SessionLocal()

while True:
try:
_, data = redis_sync.brpop("seismic_events")
event = json.loads(data)
process_event(event)
# Block until data is available in the queue
result = redis_sync.brpop("seismic_events", timeout=0)
if result:
_, data = result
event = json.loads(data)

try:
process_event(event, db)
print(f"✅ Processed sensor {event.get('misurator_id')} -> {event.get('value')}")
except Exception as e:
print(f"❌ DB Error: {e}. Moving to DLQ.")
db.rollback()
# Save to Dead Letter Queue for later recovery
redis_sync.lpush("seismic_events_dlq", data)

except Exception as e:
print(f"❌ Error processing event: {e}")
time.sleep(1)
print(f"❌ Redis Connection Error: {e}")
time.sleep(2)

if __name__ == "__main__":
time.sleep(5) # Warm-up delay to let Postgres initialize
# Wait for DB to be ready
time.sleep(5)
run_worker()
4 changes: 2 additions & 2 deletions backend-data-elaborator/api/tests/stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ async def main():
load_stats = await run_load_test(session, sensors, sem)

# FIX: Sleep to let the 1-second Redis Rate Limiter reset so Phase 2 doesn't get blocked by 429
print("\n⏳ Letting Redis Rate Limiter cool down for 2 seconds...")
await asyncio.sleep(2)
print("\n⏳ Letting Redis Rate Limiter cool down for 10 seconds...")
await asyncio.sleep(10)

sec_stats = await run_security_test(session, zone_id, sem)
e2e_passed = await verify_persistence_with_polling(session, sensors, sem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import redis.asyncio as aioredis

# Assumes the backend is running locally on port 8000
WS_URI = "ws://127.0.0.1:8000/ws/alerts"
WS_URI = "ws://127.0.0.1:8000/ws/alerts?token=SecretMobileAppToken2024"
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
NUM_CLIENTS = 100

Expand Down
Loading