import cv2 import easyocr import requests import os import time import threading import re import numpy as np from datetime import datetime from queue import Queue from flask import Flask, Response, request, send_from_directory from flask_cors import CORS from ultralytics import YOLO # Configuration (puede ser sobrescrito por variables de entorno) BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000') CAMERA_ID = int(os.environ.get('CAMERA_ID', 0)) PROCESS_INTERVAL = float(os.environ.get('PROCESS_INTERVAL', 1.5)) MODEL_PATH = os.environ.get('MODEL_PATH', 'best.pt') DATASET_DIR = os.environ.get('DATASET_DIR', '/app/dataset') DATASET_COOLDOWN = int(os.environ.get('DATASET_COOLDOWN', 60)) OCR_WORKERS = int(os.environ.get('OCR_WORKERS', 2)) # Número de workers OCR SERVICE_API_KEY = os.environ.get('SERVICE_API_KEY', '') # For backend auth app = Flask(__name__) CORS(app) # Shared state outputFrame = None frame_lock = threading.Lock() latest_detections = [] detection_lock = threading.Lock() # Cola para procesamiento OCR asíncrono ocr_queue = Queue(maxsize=10) # Cooldown para evitar múltiples capturas de la misma patente recent_captures = {} # {plate_number: timestamp} captures_lock = threading.Lock() # Cache para lista de dataset dataset_cache = {'data': None, 'timestamp': 0, 'ttl': 5} # 5 segundos de cache # Métricas para health check metrics = { 'fps': 0, 'ocr_queue_size': 0, 'total_detections': 0, 'total_captures': 0, 'last_detection': None, 'start_time': time.time() } metrics_lock = threading.Lock() # Crear carpeta de dataset si no existe os.makedirs(DATASET_DIR, exist_ok=True) print(f"📁 Dataset directory: {DATASET_DIR}") def cleanup_recent_captures(): """Limpia capturas antiguas para evitar memory leak - ejecuta cada 5 minutos""" while True: time.sleep(300) # 5 minutos current_time = time.time() with captures_lock: expired = [k for k, v in recent_captures.items() if current_time - v > DATASET_COOLDOWN * 2] for k in expired: del recent_captures[k] if expired: print(f"🧹 Cleaned {len(expired)} expired capture records") def save_plate_capture(plate_number, full_frame): """Guarda la captura de la patente para el dataset con cooldown""" current_time = time.time() # Validar que el frame no esté vacío if full_frame is None or full_frame.size == 0: print(f"⚠️ Empty frame, skipping save for {plate_number}") return False # Verificar cooldown with captures_lock: if plate_number in recent_captures: elapsed = current_time - recent_captures[plate_number] if elapsed < DATASET_COOLDOWN: return False recent_captures[plate_number] = current_time try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") frame_to_save = np.copy(full_frame) filename = f"{plate_number}_{timestamp}.jpg" filepath = f"{DATASET_DIR}/{filename}" success = cv2.imwrite(filepath, frame_to_save, [cv2.IMWRITE_JPEG_QUALITY, 95]) if not success or not os.path.exists(filepath) or os.path.getsize(filepath) == 0: print(f"❌ Failed to save image for {plate_number}") if os.path.exists(filepath): os.remove(filepath) return False # Invalidar cache dataset_cache['timestamp'] = 0 # Actualizar métricas with metrics_lock: metrics['total_captures'] += 1 # Contar total de capturas total_count = len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]) # Notificar al backend try: requests.post(f"{BACKEND_URL}/api/dataset/capture", json={ 'plate_number': plate_number, 'filename': filename, 'count': total_count }, timeout=2) except: pass print(f"📸 Saved to dataset: {plate_number} (Total: {total_count})") return True except Exception as e: print(f"❌ Error saving capture: {e}") return False def send_plate(plate_number): """Envía la patente detectada al backend""" try: url = f"{BACKEND_URL}/api/detect" headers = {} if SERVICE_API_KEY: headers['X-Service-Key'] = SERVICE_API_KEY requests.post(url, json={'plate_number': plate_number}, headers=headers, timeout=3) print(f"✓ Plate sent: {plate_number}") with metrics_lock: metrics['total_detections'] += 1 metrics['last_detection'] = plate_number except Exception as e: print(f"✗ Error sending plate: {e}") def validate_plate(text): """Valida formatos de patentes de Chile, Argentina y Brasil""" # Chile formato nuevo: XXXX00 (4 letras, 2 números) # Chile formato antiguo: XX0000 (2 letras, 4 números) # Argentina Mercosur: AA000AA (2 letras, 3 números, 2 letras) # Brasil Mercosur: AAA0A00 (3 letras, 1 número, 1 letra, 2 números) chile_new = re.match(r'^[A-Z]{4}\d{2}$', text) chile_old = re.match(r'^[A-Z]{2}\d{4}$', text) argentina = re.match(r'^[A-Z]{2}\d{3}[A-Z]{2}$', text) brasil = re.match(r'^[A-Z]{3}\d[A-Z]\d{2}$', text) return bool(chile_new or chile_old or argentina or brasil) def ocr_worker(reader, worker_id): """Hilo dedicado para OCR - múltiples workers para mejor rendimiento""" print(f"🔤 OCR Worker {worker_id} started") while True: try: data = ocr_queue.get(timeout=1) if data is None: continue plate_img, full_frame = data gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY) ocr_results = reader.readtext(gray, detail=0, paragraph=False, allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for text in ocr_results: clean_text = ''.join(e for e in text if e.isalnum()).upper() if len(clean_text) >= 6 and validate_plate(clean_text): send_plate(clean_text) save_plate_capture(clean_text, full_frame) except: pass def camera_loop(): """Hilo principal de captura""" global outputFrame, latest_detections print("🚀 Initializing ALPR System...") print(f"⚙️ Config: PROCESS_INTERVAL={PROCESS_INTERVAL}s, OCR_WORKERS={OCR_WORKERS}") print("📷 Loading camera...") cap = cv2.VideoCapture(CAMERA_ID) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) cap.set(cv2.CAP_PROP_FPS, 30) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) print("🧠 Loading YOLO model...") try: model = YOLO(MODEL_PATH) except Exception as e: print(f"❌ Critical Error loading model: {e}") return print("📝 Initializing EasyOCR...") reader = easyocr.Reader(['en'], gpu=False) # Iniciar múltiples workers de OCR for i in range(OCR_WORKERS): t = threading.Thread(target=ocr_worker, args=(reader, i+1), daemon=True) t.start() # Iniciar limpiador de cache cleanup_thread = threading.Thread(target=cleanup_recent_captures, daemon=True) cleanup_thread.start() print("✅ System ready!") last_process_time = 0 frame_count = 0 fps_start_time = time.time() while True: cap.grab() cap.grab() ret, frame = cap.retrieve() if not ret: time.sleep(0.01) continue frame_count += 1 current_time = time.time() # Calcular FPS cada segundo if current_time - fps_start_time >= 1.0: with metrics_lock: metrics['fps'] = frame_count metrics['ocr_queue_size'] = ocr_queue.qsize() frame_count = 0 fps_start_time = current_time # Procesar ALPR if current_time - last_process_time > PROCESS_INTERVAL: last_process_time = current_time results = model(frame, verbose=False, imgsz=320, conf=0.5) new_detections = [] for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) new_detections.append((x1, y1, x2, y2, conf)) plate_img = frame[y1:y2, x1:x2].copy() if plate_img.size > 0 and not ocr_queue.full(): ocr_queue.put((plate_img, frame.copy())) with detection_lock: latest_detections = new_detections display_frame = frame with detection_lock: for (x1, y1, x2, y2, conf) in latest_detections: cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(display_frame, f"{conf:.0%}", (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) with frame_lock: outputFrame = display_frame def generate(): """Generador para streaming MJPEG""" global outputFrame while True: time.sleep(0.033) with frame_lock: if outputFrame is None: continue _, encoded = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 75]) yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + encoded.tobytes() + b'\r\n' @app.route("/video_feed") def video_feed(): return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame") @app.route("/health") def health(): """Health check completo con métricas""" with metrics_lock: uptime = time.time() - metrics['start_time'] return { "status": "ok", "service": "alpr", "uptime_seconds": int(uptime), "fps": metrics['fps'], "ocr_queue_size": metrics['ocr_queue_size'], "ocr_workers": OCR_WORKERS, "total_detections": metrics['total_detections'], "total_captures": metrics['total_captures'], "last_detection": metrics['last_detection'], "dataset_size": len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]) } @app.route("/dataset/count") def dataset_count(): try: files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')] return {"plates_captured": len(files), "total_files": len(files)} except: return {"plates_captured": 0, "total_files": 0} @app.route("/dataset/list") def dataset_list(): """Lista las imágenes del dataset con paginación y cache""" current_time = time.time() # Usar cache si está vigente page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) cache_key = f"{page}_{per_page}" try: # Obtener lista de archivos (con cache básico) if dataset_cache['timestamp'] == 0 or current_time - dataset_cache['timestamp'] > dataset_cache['ttl']: files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')] files_with_time = [(f, os.path.getmtime(os.path.join(DATASET_DIR, f))) for f in files] files_with_time.sort(key=lambda x: x[1], reverse=True) dataset_cache['data'] = [f[0] for f in files_with_time] dataset_cache['timestamp'] = current_time sorted_files = dataset_cache['data'] # Paginación total = len(sorted_files) total_pages = (total + per_page - 1) // per_page start = (page - 1) * per_page end = start + per_page page_files = sorted_files[start:end] images = [] for f in page_files: parts = f.replace('.jpg', '').split('_') plate = parts[0] if parts else 'Unknown' images.append({ 'filename': f, 'plate': plate, 'url': f'/dataset/images/{f}' }) return { "images": images, "total": total, "page": page, "per_page": per_page, "total_pages": total_pages } except Exception as e: return {"images": [], "total": 0, "error": str(e)} @app.route("/dataset/images/") def dataset_image(filename): return send_from_directory(DATASET_DIR, filename) # SECURITY: Auth decorator for destructive operations from functools import wraps def require_auth(f): @wraps(f) def decorated(*args, **kwargs): if not SERVICE_API_KEY: # No key configured = dev mode, allow but warn print("⚠️ SERVICE_API_KEY not set - dataset DELETE unprotected!") return f(*args, **kwargs) provided_key = request.headers.get('X-Service-Key', '') if provided_key != SERVICE_API_KEY: return {"error": "Unauthorized"}, 401 return f(*args, **kwargs) return decorated @app.route("/dataset/images/", methods=['DELETE']) @require_auth def delete_dataset_image(filename): """Elimina una imagen del dataset""" try: filepath = os.path.join(DATASET_DIR, filename) if os.path.exists(filepath): os.remove(filepath) # Invalidar cache dataset_cache['timestamp'] = 0 print(f"🗑️ Deleted from dataset: {filename}") return {"success": True, "message": f"Deleted {filename}"} else: return {"success": False, "message": "File not found"}, 404 except Exception as e: return {"success": False, "message": str(e)}, 500 if __name__ == "__main__": t = threading.Thread(target=camera_loop, daemon=True) t.start() app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)