Major optimizations: multi-worker OCR, caching, rate limiting, healthchecks, logging

This commit is contained in:
2026-01-13 13:21:00 -03:00
parent 9b15c7a480
commit d7be8d7036
4 changed files with 202 additions and 102 deletions

View File

@@ -8,16 +8,18 @@ import re
import numpy as np
from datetime import datetime
from queue import Queue
from flask import Flask, Response, jsonify
from flask import Flask, Response, request, send_from_directory
from flask_cors import CORS
from ultralytics import YOLO
# Configuration
# Configuration (puede ser sobrescrito por variables de entorno)
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
CAMERA_ID = 0
PROCESS_INTERVAL = 1.5
MODEL_PATH = 'best.pt'
DATASET_DIR = '/app/dataset' # Carpeta para guardar capturas
CAMERA_ID = int(os.environ.get('CAMERA_ID', 0))
PROCESS_INTERVAL = float(os.environ.get('PROCESS_INTERVAL', 1.5))
MODEL_PATH = os.environ.get('MODEL_PATH', 'best.pt')
DATASET_DIR = os.environ.get('DATASET_DIR', '/app/dataset')
DATASET_COOLDOWN = int(os.environ.get('DATASET_COOLDOWN', 60))
OCR_WORKERS = int(os.environ.get('OCR_WORKERS', 2)) # Número de workers OCR
app = Flask(__name__)
CORS(app)
@@ -28,18 +30,43 @@ frame_lock = threading.Lock()
latest_detections = []
detection_lock = threading.Lock()
# Cola para procesamiento OCR asíncrono (ahora incluye frame completo)
ocr_queue = Queue(maxsize=5)
# Cola para procesamiento OCR asíncrono
ocr_queue = Queue(maxsize=10)
# Cooldown para evitar múltiples capturas de la misma patente
DATASET_COOLDOWN = 60 # segundos entre capturas de la misma patente
recent_captures = {} # {plate_number: timestamp}
captures_lock = threading.Lock()
# Cache para lista de dataset
dataset_cache = {'data': None, 'timestamp': 0, 'ttl': 5} # 5 segundos de cache
# Métricas para health check
metrics = {
'fps': 0,
'ocr_queue_size': 0,
'total_detections': 0,
'total_captures': 0,
'last_detection': None,
'start_time': time.time()
}
metrics_lock = threading.Lock()
# Crear carpeta de dataset si no existe
os.makedirs(DATASET_DIR, exist_ok=True)
print(f"📁 Dataset directory: {DATASET_DIR}")
def cleanup_recent_captures():
"""Limpia capturas antiguas para evitar memory leak - ejecuta cada 5 minutos"""
while True:
time.sleep(300) # 5 minutos
current_time = time.time()
with captures_lock:
expired = [k for k, v in recent_captures.items() if current_time - v > DATASET_COOLDOWN * 2]
for k in expired:
del recent_captures[k]
if expired:
print(f"🧹 Cleaned {len(expired)} expired capture records")
def save_plate_capture(plate_number, full_frame):
"""Guarda la captura de la patente para el dataset con cooldown"""
current_time = time.time()
@@ -54,36 +81,35 @@ def save_plate_capture(plate_number, full_frame):
if plate_number in recent_captures:
elapsed = current_time - recent_captures[plate_number]
if elapsed < DATASET_COOLDOWN:
return False # Aún en cooldown, no guardar
# Actualizar timestamp
return False
recent_captures[plate_number] = current_time
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Hacer una copia profunda del frame para evitar race conditions
frame_to_save = np.copy(full_frame)
# Solo guardar frame completo
filename = f"{plate_number}_{timestamp}.jpg"
filepath = f"{DATASET_DIR}/{filename}"
# Guardar imagen
success = cv2.imwrite(filepath, frame_to_save, [cv2.IMWRITE_JPEG_QUALITY, 95])
# Verificar que el archivo se guardó correctamente
if not success or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
print(f"❌ Failed to save image for {plate_number}")
# Eliminar archivo vacío si existe
if os.path.exists(filepath):
os.remove(filepath)
return False
# Invalidar cache
dataset_cache['timestamp'] = 0
# Actualizar métricas
with metrics_lock:
metrics['total_captures'] += 1
# Contar total de capturas
total_count = len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')])
# Notificar al backend para WebSocket
# Notificar al backend
try:
requests.post(f"{BACKEND_URL}/api/dataset/capture", json={
'plate_number': plate_number,
@@ -91,7 +117,7 @@ def save_plate_capture(plate_number, full_frame):
'count': total_count
}, timeout=2)
except:
pass # No bloquear si falla la notificación
pass
print(f"📸 Saved to dataset: {plate_number} (Total: {total_count})")
return True
@@ -105,16 +131,20 @@ def send_plate(plate_number):
url = f"{BACKEND_URL}/api/detect"
requests.post(url, json={'plate_number': plate_number}, timeout=3)
print(f"✓ Plate sent: {plate_number}")
with metrics_lock:
metrics['total_detections'] += 1
metrics['last_detection'] = plate_number
except Exception as e:
print(f"✗ Error sending plate: {e}")
def validate_plate(text):
"""Valida formato chileno"""
# Formato nuevo: XXXX-00 | Formato antiguo: XX-0000
return bool(re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text))
def ocr_worker(reader):
"""Hilo dedicado para OCR - no bloquea el stream"""
def ocr_worker(reader, worker_id):
"""Hilo dedicado para OCR - múltiples workers para mejor rendimiento"""
print(f"🔤 OCR Worker {worker_id} started")
while True:
try:
data = ocr_queue.get(timeout=1)
@@ -123,7 +153,6 @@ def ocr_worker(reader):
plate_img, full_frame = data
# Preprocesamiento para mejor OCR
gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY)
ocr_results = reader.readtext(gray, detail=0, paragraph=False,
@@ -131,18 +160,17 @@ def ocr_worker(reader):
for text in ocr_results:
clean_text = ''.join(e for e in text if e.isalnum()).upper()
if len(clean_text) >= 6 and validate_plate(clean_text):
# Enviar al backend
send_plate(clean_text)
# Guardar captura para dataset (con cooldown)
save_plate_capture(clean_text, full_frame)
except:
pass
def camera_loop():
"""Hilo principal de captura - mantiene FPS alto"""
"""Hilo principal de captura"""
global outputFrame, latest_detections
print("🚀 Initializing ALPR System...")
print(f"⚙️ Config: PROCESS_INTERVAL={PROCESS_INTERVAL}s, OCR_WORKERS={OCR_WORKERS}")
print("📷 Loading camera...")
cap = cv2.VideoCapture(CAMERA_ID)
@@ -162,16 +190,22 @@ def camera_loop():
print("📝 Initializing EasyOCR...")
reader = easyocr.Reader(['en'], gpu=False)
# Iniciar worker de OCR
ocr_thread = threading.Thread(target=ocr_worker, args=(reader,), daemon=True)
ocr_thread.start()
# Iniciar múltiples workers de OCR
for i in range(OCR_WORKERS):
t = threading.Thread(target=ocr_worker, args=(reader, i+1), daemon=True)
t.start()
# Iniciar limpiador de cache
cleanup_thread = threading.Thread(target=cleanup_recent_captures, daemon=True)
cleanup_thread.start()
print("✅ System ready!")
last_process_time = 0
frame_count = 0
fps_start_time = time.time()
while True:
# Captura eficiente
cap.grab()
cap.grab()
ret, frame = cap.retrieve()
@@ -180,13 +214,21 @@ def camera_loop():
time.sleep(0.01)
continue
frame_count += 1
current_time = time.time()
# Procesar ALPR cada PROCESS_INTERVAL segundos
# Calcular FPS cada segundo
if current_time - fps_start_time >= 1.0:
with metrics_lock:
metrics['fps'] = frame_count
metrics['ocr_queue_size'] = ocr_queue.qsize()
frame_count = 0
fps_start_time = current_time
# Procesar ALPR
if current_time - last_process_time > PROCESS_INTERVAL:
last_process_time = current_time
# YOLO detection
results = model(frame, verbose=False, imgsz=320, conf=0.5)
new_detections = []
@@ -196,16 +238,13 @@ def camera_loop():
conf = float(box.conf[0])
new_detections.append((x1, y1, x2, y2, conf))
# Extraer imagen de placa
plate_img = frame[y1:y2, x1:x2].copy()
if plate_img.size > 0 and not ocr_queue.full():
# Enviar placa Y frame completo para dataset
ocr_queue.put((plate_img, frame.copy()))
with detection_lock:
latest_detections = new_detections
# Actualizar frame para streaming
display_frame = frame
with detection_lock:
for (x1, y1, x2, y2, conf) in latest_detections:
@@ -233,11 +272,24 @@ def video_feed():
@app.route("/health")
def health():
return {"status": "ok", "service": "alpr"}
"""Health check completo con métricas"""
with metrics_lock:
uptime = time.time() - metrics['start_time']
return {
"status": "ok",
"service": "alpr",
"uptime_seconds": int(uptime),
"fps": metrics['fps'],
"ocr_queue_size": metrics['ocr_queue_size'],
"ocr_workers": OCR_WORKERS,
"total_detections": metrics['total_detections'],
"total_captures": metrics['total_captures'],
"last_detection": metrics['last_detection'],
"dataset_size": len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')])
}
@app.route("/dataset/count")
def dataset_count():
"""Endpoint para ver cuántas capturas hay en el dataset"""
try:
files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]
return {"plates_captured": len(files), "total_files": len(files)}
@@ -246,24 +298,24 @@ def dataset_count():
@app.route("/dataset/list")
def dataset_list():
"""Lista las imágenes del dataset con paginación"""
from flask import request
"""Lista las imágenes del dataset con paginación y cache"""
current_time = time.time()
try:
# Usar cache si está vigente
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
cache_key = f"{page}_{per_page}"
try:
# Obtener lista de archivos (con cache básico)
if dataset_cache['timestamp'] == 0 or current_time - dataset_cache['timestamp'] > dataset_cache['ttl']:
files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]
# Ordenar por fecha de modificación (más recientes primero)
files_with_time = []
for f in files:
filepath = os.path.join(DATASET_DIR, f)
mtime = os.path.getmtime(filepath)
files_with_time.append((f, mtime))
files_with_time = [(f, os.path.getmtime(os.path.join(DATASET_DIR, f))) for f in files]
files_with_time.sort(key=lambda x: x[1], reverse=True)
sorted_files = [f[0] for f in files_with_time]
dataset_cache['data'] = [f[0] for f in files_with_time]
dataset_cache['timestamp'] = current_time
sorted_files = dataset_cache['data']
# Paginación
total = len(sorted_files)
@@ -294,8 +346,6 @@ def dataset_list():
@app.route("/dataset/images/<filename>")
def dataset_image(filename):
"""Sirve una imagen específica del dataset"""
from flask import send_from_directory
return send_from_directory(DATASET_DIR, filename)
if __name__ == "__main__":

View File

@@ -44,4 +44,8 @@ model AccessLog {
plateNumber String
accessStatus String // GRANTED, DENIED, UNKNOWN
timestamp DateTime @default(now())
@@index([plateNumber])
@@index([timestamp])
@@index([plateNumber, timestamp])
}

View File

@@ -1,5 +1,6 @@
const express = require('express');
const cors = require('cors');
const bcrypt = require('bcryptjs'); // Movido al inicio (E)
const { PrismaClient } = require('@prisma/client');
const http = require('http');
const { Server } = require('socket.io');
@@ -10,6 +11,36 @@ const prisma = new PrismaClient();
app.use(cors());
app.use(express.json());
// Rate limiting simple para /api/detect (G)
const detectRateLimit = new Map();
const RATE_LIMIT_WINDOW = 1000; // 1 segundo
const RATE_LIMIT_MAX = 10; // máximo 10 requests por segundo
function checkRateLimit(ip) {
const now = Date.now();
const record = detectRateLimit.get(ip) || { count: 0, resetTime: now + RATE_LIMIT_WINDOW };
if (now > record.resetTime) {
record.count = 1;
record.resetTime = now + RATE_LIMIT_WINDOW;
} else {
record.count++;
}
detectRateLimit.set(ip, record);
return record.count <= RATE_LIMIT_MAX;
}
// Limpiar rate limit map cada minuto
setInterval(() => {
const now = Date.now();
for (const [ip, record] of detectRateLimit.entries()) {
if (now > record.resetTime + 60000) {
detectRateLimit.delete(ip);
}
}
}, 60000);
const server = http.createServer(app);
const io = new Server(server, {
cors: {
@@ -31,9 +62,7 @@ app.use('/api/auth', authRoutes);
// Plates CRUD
app.get('/api/plates', authenticateToken, async (req, res) => {
try {
// Filter based on role
const where = req.user.role === 'ADMIN' ? {} : { addedById: req.user.id };
const plates = await prisma.plate.findMany({
where,
include: { addedBy: { select: { username: true } } }
@@ -47,7 +76,6 @@ app.get('/api/plates', authenticateToken, async (req, res) => {
app.post('/api/plates', authenticateToken, async (req, res) => {
const { number, owner } = req.body;
const isAdm = req.user.role === 'ADMIN';
// Admin -> ALLOWED, User -> PENDING
const status = isAdm ? 'ALLOWED' : 'PENDING';
try {
@@ -59,10 +87,7 @@ app.post('/api/plates', authenticateToken, async (req, res) => {
addedById: req.user.id
}
});
// Notify Admin via WebSocket
io.emit('new_plate_registered', plate);
res.json(plate);
} catch (err) {
res.status(500).json({ error: err.message });
@@ -72,7 +97,7 @@ app.post('/api/plates', authenticateToken, async (req, res) => {
// Admin: Approve/Reject Plate
app.put('/api/plates/:id/approve', authenticateToken, isAdmin, async (req, res) => {
const { id } = req.params;
const { status } = req.body; // ALLOWED or DENIED
const { status } = req.body;
if (!['ALLOWED', 'DENIED'].includes(status)) {
return res.status(400).json({ error: 'Invalid status' });
@@ -83,17 +108,13 @@ app.put('/api/plates/:id/approve', authenticateToken, isAdmin, async (req, res)
where: { id: parseInt(id) },
data: { status }
});
// Notify Users via WebSocket
io.emit('plate_status_updated', plate);
res.json(plate);
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Admin: Delete Plate (Optional but good to have)
// Delete Plate (Admin or Owner)
app.delete('/api/plates/:id', authenticateToken, async (req, res) => {
const { id } = req.params;
@@ -101,15 +122,12 @@ app.delete('/api/plates/:id', authenticateToken, async (req, res) => {
const plate = await prisma.plate.findUnique({ where: { id: parseInt(id) } });
if (!plate) return res.status(404).json({ error: 'Plate not found' });
// Check permissions
if (req.user.role !== 'ADMIN' && plate.addedById !== req.user.id) {
return res.status(403).json({ error: 'Unauthorized' });
}
await prisma.plate.delete({ where: { id: parseInt(id) } });
io.emit('plate_deleted', { id: parseInt(id) });
res.json({ message: 'Plate deleted' });
} catch (err) {
res.status(500).json({ error: err.message });
@@ -128,9 +146,7 @@ app.delete('/api/people/:id', authenticateToken, async (req, res) => {
}
await prisma.person.delete({ where: { id: parseInt(id) } });
io.emit('person_deleted', { id: parseInt(id) });
res.json({ message: 'Person deleted' });
} catch (err) {
res.status(500).json({ error: err.message });
@@ -139,7 +155,7 @@ app.delete('/api/people/:id', authenticateToken, async (req, res) => {
// History Endpoint
app.get('/api/history', async (req, res) => {
const { date } = req.query; // Format: YYYY-MM-DD
const { date } = req.query;
if (!date) {
return res.status(400).json({ error: 'Date is required' });
}
@@ -158,9 +174,7 @@ app.get('/api/history', async (req, res) => {
lte: endDate
}
},
orderBy: {
timestamp: 'desc'
}
orderBy: { timestamp: 'desc' }
});
res.json(logs);
} catch (err) {
@@ -174,13 +188,9 @@ app.get('/api/recent', async (req, res) => {
const fiveHoursAgo = new Date(Date.now() - 5 * 60 * 60 * 1000);
const logs = await prisma.accessLog.findMany({
where: {
timestamp: {
gte: fiveHoursAgo
}
timestamp: { gte: fiveHoursAgo }
},
orderBy: {
timestamp: 'desc'
}
orderBy: { timestamp: 'desc' }
});
res.json(logs);
} catch (err) {
@@ -238,10 +248,7 @@ app.post('/api/people', authenticateToken, async (req, res) => {
addedById: req.user.id
}
});
// Notify Admin via WebSocket
io.emit('new_person_registered', person);
res.json(person);
} catch (err) {
res.status(500).json({ error: err.message });
@@ -256,25 +263,28 @@ app.post('/api/people/bulk-approve', authenticateToken, isAdmin, async (req, res
where: { addedById: parseInt(userId), status: 'PENDING' },
data: { status: 'APPROVED' }
});
// Notify Users via WebSocket
io.emit('people_updated', { userId });
res.json({ message: 'Bulk approval successful' });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Detection Endpoint (from Python)
// Detection Endpoint (from Python) with Rate Limiting (G)
app.post('/api/detect', async (req, res) => {
const clientIp = req.ip || req.connection.remoteAddress;
// Check rate limit
if (!checkRateLimit(clientIp)) {
return res.status(429).json({ error: 'Too many requests' });
}
const { plate_number } = req.body;
console.log(`Detected: ${plate_number}`);
const DUPLICATE_COOLDOWN_MS = 30000; // 30 seconds
const DUPLICATE_COOLDOWN_MS = 30000;
try {
// Check for recent duplicate
const lastLog = await prisma.accessLog.findFirst({
where: { plateNumber: plate_number },
orderBy: { timestamp: 'desc' }
@@ -288,7 +298,6 @@ app.post('/api/detect', async (req, res) => {
}
}
// Check if plate exists
let plate = await prisma.plate.findUnique({
where: { number: plate_number }
});
@@ -300,12 +309,9 @@ app.post('/api/detect', async (req, res) => {
}
if (!plate) {
// Optional: Auto-create unknown plates?
// For now, treat as UNKNOWN (Denied)
accessStatus = 'UNKNOWN';
}
// Log the access attempt
const log = await prisma.accessLog.create({
data: {
plateNumber: plate_number,
@@ -314,7 +320,6 @@ app.post('/api/detect', async (req, res) => {
}
});
// Notify Frontend via WebSocket
io.emit('new_detection', {
plate: plate_number,
status: accessStatus,
@@ -334,7 +339,6 @@ app.post('/api/dataset/capture', (req, res) => {
const { plate_number, filename, count } = req.body;
console.log(`📸 Dataset capture: ${plate_number} (Total: ${count})`);
// Notify Frontend via WebSocket
io.emit('dataset_updated', {
plate: plate_number,
filename,
@@ -345,7 +349,15 @@ app.post('/api/dataset/capture', (req, res) => {
res.json({ message: 'Notification sent' });
});
const bcrypt = require('bcryptjs');
// Health check endpoint
app.get('/api/health', async (req, res) => {
try {
await prisma.$queryRaw`SELECT 1`;
res.json({ status: 'ok', database: 'connected' });
} catch (err) {
res.status(500).json({ status: 'error', database: 'disconnected' });
}
});
const PORT = process.env.PORT || 3000;
server.listen(PORT, async () => {

View File

@@ -21,6 +21,11 @@ services:
interval: 5s
timeout: 5s
retries: 5
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# Backend Service (Node.js)
backend:
@@ -40,28 +45,51 @@ services:
volumes:
- ./backend:/app
- /app/node_modules
healthcheck:
test: [ "CMD", "wget", "-q", "--spider", "http://localhost:3000/api/health" ]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# ALPR Component (Python + OpenCV)
alpr-service:
build: ./alpr-service
container_name: controlpatente-alpr
ports:
- "5001:5001" # Permite acceder al stream de video desde el nave
- "5001:5001"
environment:
- BACKEND_URL=http://backend:3000
# On Mac, you usually cannot pass /dev/video0 directly.
# We might need to use a stream or just test with a file for now if direct access fails.
# For Linux/Raspberry Pi, the device mapping below is correct.
- PROCESS_INTERVAL=1.5
- DATASET_COOLDOWN=60
- OCR_WORKERS=2
devices:
- "/dev/video0:/dev/video0"
networks:
- backend-net
depends_on:
- backend
backend:
condition: service_healthy
restart: unless-stopped
privileged: true
volumes:
- ./alpr-service/dataset:/app/dataset
healthcheck:
test: [ "CMD", "wget", "-q", "--spider", "http://localhost:5001/health" ]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
logging:
driver: "json-file"
options:
max-size: "20m"
max-file: "5"
# Frontend Service (React)
frontend:
@@ -78,6 +106,12 @@ services:
environment:
- VITE_API_URL=
- VITE_ALPR_STREAM_URL=
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
networks:
backend-net:
driver: bridge