import cv2 import easyocr import requests import os import time import threading import re import numpy as np from datetime import datetime from queue import Queue from flask import Flask, Response, jsonify from flask_cors import CORS from ultralytics import YOLO # Configuration BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000') CAMERA_ID = 0 PROCESS_INTERVAL = 1.5 MODEL_PATH = 'best.pt' DATASET_DIR = '/app/dataset' # Carpeta para guardar capturas app = Flask(__name__) CORS(app) # Shared state outputFrame = None frame_lock = threading.Lock() latest_detections = [] detection_lock = threading.Lock() # Cola para procesamiento OCR asíncrono (ahora incluye frame completo) ocr_queue = Queue(maxsize=5) # Cooldown para evitar múltiples capturas de la misma patente DATASET_COOLDOWN = 60 # segundos entre capturas de la misma patente recent_captures = {} # {plate_number: timestamp} captures_lock = threading.Lock() # Crear carpeta de dataset si no existe os.makedirs(DATASET_DIR, exist_ok=True) print(f"📁 Dataset directory: {DATASET_DIR}") def save_plate_capture(plate_number, full_frame): """Guarda la captura de la patente para el dataset con cooldown""" current_time = time.time() # Validar que el frame no esté vacío if full_frame is None or full_frame.size == 0: print(f"⚠️ Empty frame, skipping save for {plate_number}") return False # Verificar cooldown with captures_lock: if plate_number in recent_captures: elapsed = current_time - recent_captures[plate_number] if elapsed < DATASET_COOLDOWN: return False # Aún en cooldown, no guardar # Actualizar timestamp recent_captures[plate_number] = current_time try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Hacer una copia profunda del frame para evitar race conditions frame_to_save = np.copy(full_frame) # Solo guardar frame completo filename = f"{plate_number}_{timestamp}.jpg" filepath = f"{DATASET_DIR}/{filename}" # Guardar imagen success = cv2.imwrite(filepath, frame_to_save, [cv2.IMWRITE_JPEG_QUALITY, 95]) # Verificar que el archivo se guardó correctamente if not success or not os.path.exists(filepath) or os.path.getsize(filepath) == 0: print(f"❌ Failed to save image for {plate_number}") # Eliminar archivo vacío si existe if os.path.exists(filepath): os.remove(filepath) return False # Contar total de capturas total_count = len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]) # Notificar al backend para WebSocket try: requests.post(f"{BACKEND_URL}/api/dataset/capture", json={ 'plate_number': plate_number, 'filename': filename, 'count': total_count }, timeout=2) except: pass # No bloquear si falla la notificación print(f"📸 Saved to dataset: {plate_number} (Total: {total_count})") return True except Exception as e: print(f"❌ Error saving capture: {e}") return False def send_plate(plate_number): """Envía la patente detectada al backend""" try: url = f"{BACKEND_URL}/api/detect" requests.post(url, json={'plate_number': plate_number}, timeout=3) print(f"✓ Plate sent: {plate_number}") except Exception as e: print(f"✗ Error sending plate: {e}") def validate_plate(text): """Valida formato chileno""" # Formato nuevo: XXXX-00 | Formato antiguo: XX-0000 return bool(re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text)) def ocr_worker(reader): """Hilo dedicado para OCR - no bloquea el stream""" while True: try: data = ocr_queue.get(timeout=1) if data is None: continue plate_img, full_frame = data # Preprocesamiento para mejor OCR gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY) ocr_results = reader.readtext(gray, detail=0, paragraph=False, allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for text in ocr_results: clean_text = ''.join(e for e in text if e.isalnum()).upper() if len(clean_text) >= 6 and validate_plate(clean_text): # Enviar al backend send_plate(clean_text) # Guardar captura para dataset (con cooldown) save_plate_capture(clean_text, full_frame) except: pass def camera_loop(): """Hilo principal de captura - mantiene FPS alto""" global outputFrame, latest_detections print("🚀 Initializing ALPR System...") print("📷 Loading camera...") cap = cv2.VideoCapture(CAMERA_ID) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) cap.set(cv2.CAP_PROP_FPS, 30) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) print("🧠 Loading YOLO model...") try: model = YOLO(MODEL_PATH) except Exception as e: print(f"❌ Critical Error loading model: {e}") return print("📝 Initializing EasyOCR...") reader = easyocr.Reader(['en'], gpu=False) # Iniciar worker de OCR ocr_thread = threading.Thread(target=ocr_worker, args=(reader,), daemon=True) ocr_thread.start() print("✅ System ready!") last_process_time = 0 while True: # Captura eficiente cap.grab() cap.grab() ret, frame = cap.retrieve() if not ret: time.sleep(0.01) continue current_time = time.time() # Procesar ALPR cada PROCESS_INTERVAL segundos if current_time - last_process_time > PROCESS_INTERVAL: last_process_time = current_time # YOLO detection results = model(frame, verbose=False, imgsz=320, conf=0.5) new_detections = [] for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) new_detections.append((x1, y1, x2, y2, conf)) # Extraer imagen de placa plate_img = frame[y1:y2, x1:x2].copy() if plate_img.size > 0 and not ocr_queue.full(): # Enviar placa Y frame completo para dataset ocr_queue.put((plate_img, frame.copy())) with detection_lock: latest_detections = new_detections # Actualizar frame para streaming display_frame = frame with detection_lock: for (x1, y1, x2, y2, conf) in latest_detections: cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(display_frame, f"{conf:.0%}", (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) with frame_lock: outputFrame = display_frame def generate(): """Generador para streaming MJPEG""" global outputFrame while True: time.sleep(0.033) with frame_lock: if outputFrame is None: continue _, encoded = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 75]) yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + encoded.tobytes() + b'\r\n' @app.route("/video_feed") def video_feed(): return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame") @app.route("/health") def health(): return {"status": "ok", "service": "alpr"} @app.route("/dataset/count") def dataset_count(): """Endpoint para ver cuántas capturas hay en el dataset""" try: files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')] return {"plates_captured": len(files), "total_files": len(files)} except: return {"plates_captured": 0, "total_files": 0} @app.route("/dataset/list") def dataset_list(): """Lista las imágenes del dataset con paginación""" from flask import request try: page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')] # Ordenar por fecha de modificación (más recientes primero) files_with_time = [] for f in files: filepath = os.path.join(DATASET_DIR, f) mtime = os.path.getmtime(filepath) files_with_time.append((f, mtime)) files_with_time.sort(key=lambda x: x[1], reverse=True) sorted_files = [f[0] for f in files_with_time] # Paginación total = len(sorted_files) total_pages = (total + per_page - 1) // per_page start = (page - 1) * per_page end = start + per_page page_files = sorted_files[start:end] images = [] for f in page_files: parts = f.replace('.jpg', '').split('_') plate = parts[0] if parts else 'Unknown' images.append({ 'filename': f, 'plate': plate, 'url': f'/dataset/images/{f}' }) return { "images": images, "total": total, "page": page, "per_page": per_page, "total_pages": total_pages } except Exception as e: return {"images": [], "total": 0, "error": str(e)} @app.route("/dataset/images/") def dataset_image(filename): """Sirve una imagen específica del dataset""" from flask import send_from_directory return send_from_directory(DATASET_DIR, filename) if __name__ == "__main__": t = threading.Thread(target=camera_loop, daemon=True) t.start() app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)