Files
ControlPatente/alpr-service/main.py
2026-01-12 21:59:59 -03:00

206 lines
6.8 KiB
Python

import cv2
import easyocr
import requests
import os
import time
import threading
import re
from datetime import datetime
from queue import Queue
from flask import Flask, Response, jsonify
from flask_cors import CORS
from ultralytics import YOLO
# Configuration
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
CAMERA_ID = 0
PROCESS_INTERVAL = 1.5
MODEL_PATH = 'best.pt'
DATASET_DIR = '/app/dataset' # Carpeta para guardar capturas
app = Flask(__name__)
CORS(app)
# Shared state
outputFrame = None
frame_lock = threading.Lock()
latest_detections = []
detection_lock = threading.Lock()
# Cola para procesamiento OCR asíncrono (ahora incluye frame completo)
ocr_queue = Queue(maxsize=5)
# Crear carpeta de dataset si no existe
os.makedirs(DATASET_DIR, exist_ok=True)
print(f"📁 Dataset directory: {DATASET_DIR}")
def save_plate_capture(plate_number, plate_img, full_frame):
"""Guarda la captura de la patente para el dataset"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Guardar imagen recortada de la patente
plate_filename = f"{DATASET_DIR}/{plate_number}_{timestamp}_plate.jpg"
cv2.imwrite(plate_filename, plate_img, [cv2.IMWRITE_JPEG_QUALITY, 95])
# Guardar frame completo con contexto
frame_filename = f"{DATASET_DIR}/{plate_number}_{timestamp}_full.jpg"
cv2.imwrite(frame_filename, full_frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
print(f"📸 Saved to dataset: {plate_number}")
return True
except Exception as e:
print(f"❌ Error saving capture: {e}")
return False
def send_plate(plate_number):
"""Envía la patente detectada al backend"""
try:
url = f"{BACKEND_URL}/api/detect"
requests.post(url, json={'plate_number': plate_number}, timeout=3)
print(f"✓ Plate sent: {plate_number}")
except Exception as e:
print(f"✗ Error sending plate: {e}")
def validate_plate(text):
"""Valida formato chileno"""
# Formato nuevo: XXXX-00 | Formato antiguo: XX-0000
return bool(re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text))
def ocr_worker(reader):
"""Hilo dedicado para OCR - no bloquea el stream"""
while True:
try:
data = ocr_queue.get(timeout=1)
if data is None:
continue
plate_img, full_frame = data
# Preprocesamiento para mejor OCR
gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY)
ocr_results = reader.readtext(gray, detail=0, paragraph=False,
allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
for text in ocr_results:
clean_text = ''.join(e for e in text if e.isalnum()).upper()
if len(clean_text) >= 6 and validate_plate(clean_text):
# Enviar al backend
send_plate(clean_text)
# Guardar captura para dataset
save_plate_capture(clean_text, plate_img, full_frame)
except:
pass
def camera_loop():
"""Hilo principal de captura - mantiene FPS alto"""
global outputFrame, latest_detections
print("🚀 Initializing ALPR System...")
print("📷 Loading camera...")
cap = cv2.VideoCapture(CAMERA_ID)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
print("🧠 Loading YOLO model...")
try:
model = YOLO(MODEL_PATH)
except Exception as e:
print(f"❌ Critical Error loading model: {e}")
return
print("📝 Initializing EasyOCR...")
reader = easyocr.Reader(['en'], gpu=False)
# Iniciar worker de OCR
ocr_thread = threading.Thread(target=ocr_worker, args=(reader,), daemon=True)
ocr_thread.start()
print("✅ System ready!")
last_process_time = 0
while True:
# Captura eficiente
cap.grab()
cap.grab()
ret, frame = cap.retrieve()
if not ret:
time.sleep(0.01)
continue
current_time = time.time()
# Procesar ALPR cada PROCESS_INTERVAL segundos
if current_time - last_process_time > PROCESS_INTERVAL:
last_process_time = current_time
# YOLO detection
results = model(frame, verbose=False, imgsz=320, conf=0.5)
new_detections = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
new_detections.append((x1, y1, x2, y2, conf))
# Extraer imagen de placa
plate_img = frame[y1:y2, x1:x2].copy()
if plate_img.size > 0 and not ocr_queue.full():
# Enviar placa Y frame completo para dataset
ocr_queue.put((plate_img, frame.copy()))
with detection_lock:
latest_detections = new_detections
# Actualizar frame para streaming
display_frame = frame
with detection_lock:
for (x1, y1, x2, y2, conf) in latest_detections:
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(display_frame, f"{conf:.0%}", (x1, y1-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
with frame_lock:
outputFrame = display_frame
def generate():
"""Generador para streaming MJPEG"""
global outputFrame
while True:
time.sleep(0.033)
with frame_lock:
if outputFrame is None:
continue
_, encoded = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 75])
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + encoded.tobytes() + b'\r\n'
@app.route("/video_feed")
def video_feed():
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/health")
def health():
return {"status": "ok", "service": "alpr"}
@app.route("/dataset/count")
def dataset_count():
"""Endpoint para ver cuántas capturas hay en el dataset"""
try:
files = os.listdir(DATASET_DIR)
plates = len([f for f in files if f.endswith('_plate.jpg')])
return {"plates_captured": plates, "total_files": len(files)}
except:
return {"plates_captured": 0, "total_files": 0}
if __name__ == "__main__":
t = threading.Thread(target=camera_loop, daemon=True)
t.start()
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)