import cv2 import easyocr import requests import os import time import threading import re from datetime import datetime from queue import Queue from flask import Flask, Response, jsonify from flask_cors import CORS from ultralytics import YOLO # Configuration BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000') CAMERA_ID = 0 PROCESS_INTERVAL = 1.5 MODEL_PATH = 'best.pt' DATASET_DIR = '/app/dataset' # Carpeta para guardar capturas app = Flask(__name__) CORS(app) # Shared state outputFrame = None frame_lock = threading.Lock() latest_detections = [] detection_lock = threading.Lock() # Cola para procesamiento OCR asíncrono (ahora incluye frame completo) ocr_queue = Queue(maxsize=5) # Cooldown para evitar múltiples capturas de la misma patente DATASET_COOLDOWN = 60 # segundos entre capturas de la misma patente recent_captures = {} # {plate_number: timestamp} captures_lock = threading.Lock() # Crear carpeta de dataset si no existe os.makedirs(DATASET_DIR, exist_ok=True) print(f"📁 Dataset directory: {DATASET_DIR}") def save_plate_capture(plate_number, full_frame): """Guarda la captura de la patente para el dataset con cooldown""" current_time = time.time() # Verificar cooldown with captures_lock: if plate_number in recent_captures: elapsed = current_time - recent_captures[plate_number] if elapsed < DATASET_COOLDOWN: return False # Aún en cooldown, no guardar # Actualizar timestamp recent_captures[plate_number] = current_time try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Solo guardar frame completo filename = f"{plate_number}_{timestamp}.jpg" filepath = f"{DATASET_DIR}/{filename}" cv2.imwrite(filepath, full_frame, [cv2.IMWRITE_JPEG_QUALITY, 95]) # Contar total de capturas total_count = len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]) # Notificar al backend para WebSocket try: requests.post(f"{BACKEND_URL}/api/dataset/capture", json={ 'plate_number': plate_number, 'filename': filename, 'count': total_count }, timeout=2) except: pass # No bloquear si falla la notificación print(f"📸 Saved to dataset: {plate_number} (Total: {total_count})") return True except Exception as e: print(f"❌ Error saving capture: {e}") return False def send_plate(plate_number): """Envía la patente detectada al backend""" try: url = f"{BACKEND_URL}/api/detect" requests.post(url, json={'plate_number': plate_number}, timeout=3) print(f"✓ Plate sent: {plate_number}") except Exception as e: print(f"✗ Error sending plate: {e}") def validate_plate(text): """Valida formato chileno""" # Formato nuevo: XXXX-00 | Formato antiguo: XX-0000 return bool(re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text)) def ocr_worker(reader): """Hilo dedicado para OCR - no bloquea el stream""" while True: try: data = ocr_queue.get(timeout=1) if data is None: continue plate_img, full_frame = data # Preprocesamiento para mejor OCR gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY) ocr_results = reader.readtext(gray, detail=0, paragraph=False, allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for text in ocr_results: clean_text = ''.join(e for e in text if e.isalnum()).upper() if len(clean_text) >= 6 and validate_plate(clean_text): # Enviar al backend send_plate(clean_text) # Guardar captura para dataset (con cooldown) save_plate_capture(clean_text, full_frame) except: pass def camera_loop(): """Hilo principal de captura - mantiene FPS alto""" global outputFrame, latest_detections print("🚀 Initializing ALPR System...") print("📷 Loading camera...") cap = cv2.VideoCapture(CAMERA_ID) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) cap.set(cv2.CAP_PROP_FPS, 30) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) print("🧠 Loading YOLO model...") try: model = YOLO(MODEL_PATH) except Exception as e: print(f"❌ Critical Error loading model: {e}") return print("📝 Initializing EasyOCR...") reader = easyocr.Reader(['en'], gpu=False) # Iniciar worker de OCR ocr_thread = threading.Thread(target=ocr_worker, args=(reader,), daemon=True) ocr_thread.start() print("✅ System ready!") last_process_time = 0 while True: # Captura eficiente cap.grab() cap.grab() ret, frame = cap.retrieve() if not ret: time.sleep(0.01) continue current_time = time.time() # Procesar ALPR cada PROCESS_INTERVAL segundos if current_time - last_process_time > PROCESS_INTERVAL: last_process_time = current_time # YOLO detection results = model(frame, verbose=False, imgsz=320, conf=0.5) new_detections = [] for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) new_detections.append((x1, y1, x2, y2, conf)) # Extraer imagen de placa plate_img = frame[y1:y2, x1:x2].copy() if plate_img.size > 0 and not ocr_queue.full(): # Enviar placa Y frame completo para dataset ocr_queue.put((plate_img, frame.copy())) with detection_lock: latest_detections = new_detections # Actualizar frame para streaming display_frame = frame with detection_lock: for (x1, y1, x2, y2, conf) in latest_detections: cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(display_frame, f"{conf:.0%}", (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) with frame_lock: outputFrame = display_frame def generate(): """Generador para streaming MJPEG""" global outputFrame while True: time.sleep(0.033) with frame_lock: if outputFrame is None: continue _, encoded = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 75]) yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + encoded.tobytes() + b'\r\n' @app.route("/video_feed") def video_feed(): return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame") @app.route("/health") def health(): return {"status": "ok", "service": "alpr"} @app.route("/dataset/count") def dataset_count(): """Endpoint para ver cuántas capturas hay en el dataset""" try: files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')] return {"plates_captured": len(files), "total_files": len(files)} except: return {"plates_captured": 0, "total_files": 0} @app.route("/dataset/list") def dataset_list(): """Lista todas las imágenes del dataset""" try: files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')] # Ordenar por fecha (más recientes primero) files.sort(reverse=True) images = [] for f in files[:50]: # Limitar a últimas 50 parts = f.replace('.jpg', '').split('_') plate = parts[0] if parts else 'Unknown' images.append({ 'filename': f, 'plate': plate, 'url': f'/dataset/images/{f}' }) return {"images": images, "total": len(files)} except Exception as e: return {"images": [], "total": 0, "error": str(e)} @app.route("/dataset/images/") def dataset_image(filename): """Sirve una imagen específica del dataset""" from flask import send_from_directory return send_from_directory(DATASET_DIR, filename) if __name__ == "__main__": t = threading.Thread(target=camera_loop, daemon=True) t.start() app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)