Files
ControlPatente/alpr-service/main.py

286 lines
9.5 KiB
Python

import cv2
import easyocr
import requests
import os
import time
import threading
import re
from datetime import datetime
from queue import Queue
from flask import Flask, Response, jsonify
from flask_cors import CORS
from ultralytics import YOLO
# Configuration
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
CAMERA_ID = 0
PROCESS_INTERVAL = 1.5
MODEL_PATH = 'best.pt'
DATASET_DIR = '/app/dataset' # Carpeta para guardar capturas
app = Flask(__name__)
CORS(app)
# Shared state
outputFrame = None
frame_lock = threading.Lock()
latest_detections = []
detection_lock = threading.Lock()
# Cola para procesamiento OCR asíncrono (ahora incluye frame completo)
ocr_queue = Queue(maxsize=5)
# Cooldown para evitar múltiples capturas de la misma patente
DATASET_COOLDOWN = 60 # segundos entre capturas de la misma patente
recent_captures = {} # {plate_number: timestamp}
captures_lock = threading.Lock()
# Crear carpeta de dataset si no existe
os.makedirs(DATASET_DIR, exist_ok=True)
print(f"📁 Dataset directory: {DATASET_DIR}")
def save_plate_capture(plate_number, full_frame):
"""Guarda la captura de la patente para el dataset con cooldown"""
current_time = time.time()
# Verificar cooldown
with captures_lock:
if plate_number in recent_captures:
elapsed = current_time - recent_captures[plate_number]
if elapsed < DATASET_COOLDOWN:
return False # Aún en cooldown, no guardar
# Actualizar timestamp
recent_captures[plate_number] = current_time
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Solo guardar frame completo
filename = f"{plate_number}_{timestamp}.jpg"
filepath = f"{DATASET_DIR}/{filename}"
cv2.imwrite(filepath, full_frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
# Contar total de capturas
total_count = len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')])
# Notificar al backend para WebSocket
try:
requests.post(f"{BACKEND_URL}/api/dataset/capture", json={
'plate_number': plate_number,
'filename': filename,
'count': total_count
}, timeout=2)
except:
pass # No bloquear si falla la notificación
print(f"📸 Saved to dataset: {plate_number} (Total: {total_count})")
return True
except Exception as e:
print(f"❌ Error saving capture: {e}")
return False
def send_plate(plate_number):
"""Envía la patente detectada al backend"""
try:
url = f"{BACKEND_URL}/api/detect"
requests.post(url, json={'plate_number': plate_number}, timeout=3)
print(f"✓ Plate sent: {plate_number}")
except Exception as e:
print(f"✗ Error sending plate: {e}")
def validate_plate(text):
"""Valida formato chileno"""
# Formato nuevo: XXXX-00 | Formato antiguo: XX-0000
return bool(re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text))
def ocr_worker(reader):
"""Hilo dedicado para OCR - no bloquea el stream"""
while True:
try:
data = ocr_queue.get(timeout=1)
if data is None:
continue
plate_img, full_frame = data
# Preprocesamiento para mejor OCR
gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY)
ocr_results = reader.readtext(gray, detail=0, paragraph=False,
allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
for text in ocr_results:
clean_text = ''.join(e for e in text if e.isalnum()).upper()
if len(clean_text) >= 6 and validate_plate(clean_text):
# Enviar al backend
send_plate(clean_text)
# Guardar captura para dataset (con cooldown)
save_plate_capture(clean_text, full_frame)
except:
pass
def camera_loop():
"""Hilo principal de captura - mantiene FPS alto"""
global outputFrame, latest_detections
print("🚀 Initializing ALPR System...")
print("📷 Loading camera...")
cap = cv2.VideoCapture(CAMERA_ID)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
print("🧠 Loading YOLO model...")
try:
model = YOLO(MODEL_PATH)
except Exception as e:
print(f"❌ Critical Error loading model: {e}")
return
print("📝 Initializing EasyOCR...")
reader = easyocr.Reader(['en'], gpu=False)
# Iniciar worker de OCR
ocr_thread = threading.Thread(target=ocr_worker, args=(reader,), daemon=True)
ocr_thread.start()
print("✅ System ready!")
last_process_time = 0
while True:
# Captura eficiente
cap.grab()
cap.grab()
ret, frame = cap.retrieve()
if not ret:
time.sleep(0.01)
continue
current_time = time.time()
# Procesar ALPR cada PROCESS_INTERVAL segundos
if current_time - last_process_time > PROCESS_INTERVAL:
last_process_time = current_time
# YOLO detection
results = model(frame, verbose=False, imgsz=320, conf=0.5)
new_detections = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
new_detections.append((x1, y1, x2, y2, conf))
# Extraer imagen de placa
plate_img = frame[y1:y2, x1:x2].copy()
if plate_img.size > 0 and not ocr_queue.full():
# Enviar placa Y frame completo para dataset
ocr_queue.put((plate_img, frame.copy()))
with detection_lock:
latest_detections = new_detections
# Actualizar frame para streaming
display_frame = frame
with detection_lock:
for (x1, y1, x2, y2, conf) in latest_detections:
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(display_frame, f"{conf:.0%}", (x1, y1-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
with frame_lock:
outputFrame = display_frame
def generate():
"""Generador para streaming MJPEG"""
global outputFrame
while True:
time.sleep(0.033)
with frame_lock:
if outputFrame is None:
continue
_, encoded = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 75])
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + encoded.tobytes() + b'\r\n'
@app.route("/video_feed")
def video_feed():
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/health")
def health():
return {"status": "ok", "service": "alpr"}
@app.route("/dataset/count")
def dataset_count():
"""Endpoint para ver cuántas capturas hay en el dataset"""
try:
files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]
return {"plates_captured": len(files), "total_files": len(files)}
except:
return {"plates_captured": 0, "total_files": 0}
@app.route("/dataset/list")
def dataset_list():
"""Lista las imágenes del dataset con paginación"""
from flask import request
try:
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]
# Ordenar por fecha de modificación (más recientes primero)
files_with_time = []
for f in files:
filepath = os.path.join(DATASET_DIR, f)
mtime = os.path.getmtime(filepath)
files_with_time.append((f, mtime))
files_with_time.sort(key=lambda x: x[1], reverse=True)
sorted_files = [f[0] for f in files_with_time]
# Paginación
total = len(sorted_files)
total_pages = (total + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
page_files = sorted_files[start:end]
images = []
for f in page_files:
parts = f.replace('.jpg', '').split('_')
plate = parts[0] if parts else 'Unknown'
images.append({
'filename': f,
'plate': plate,
'url': f'/dataset/images/{f}'
})
return {
"images": images,
"total": total,
"page": page,
"per_page": per_page,
"total_pages": total_pages
}
except Exception as e:
return {"images": [], "total": 0, "error": str(e)}
@app.route("/dataset/images/<filename>")
def dataset_image(filename):
"""Sirve una imagen específica del dataset"""
from flask import send_from_directory
return send_from_directory(DATASET_DIR, filename)
if __name__ == "__main__":
t = threading.Thread(target=camera_loop, daemon=True)
t.start()
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)