Files
ControlPatente/alpr-service/main.py

371 lines
13 KiB
Python

import cv2
import easyocr
import requests
import os
import time
import threading
import re
import numpy as np
from datetime import datetime
from queue import Queue
from flask import Flask, Response, request, send_from_directory
from flask_cors import CORS
from ultralytics import YOLO
# Configuration (puede ser sobrescrito por variables de entorno)
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
CAMERA_ID = int(os.environ.get('CAMERA_ID', 0))
PROCESS_INTERVAL = float(os.environ.get('PROCESS_INTERVAL', 1.5))
MODEL_PATH = os.environ.get('MODEL_PATH', 'best.pt')
DATASET_DIR = os.environ.get('DATASET_DIR', '/app/dataset')
DATASET_COOLDOWN = int(os.environ.get('DATASET_COOLDOWN', 60))
OCR_WORKERS = int(os.environ.get('OCR_WORKERS', 2)) # Número de workers OCR
app = Flask(__name__)
CORS(app)
# Shared state
outputFrame = None
frame_lock = threading.Lock()
latest_detections = []
detection_lock = threading.Lock()
# Cola para procesamiento OCR asíncrono
ocr_queue = Queue(maxsize=10)
# Cooldown para evitar múltiples capturas de la misma patente
recent_captures = {} # {plate_number: timestamp}
captures_lock = threading.Lock()
# Cache para lista de dataset
dataset_cache = {'data': None, 'timestamp': 0, 'ttl': 5} # 5 segundos de cache
# Métricas para health check
metrics = {
'fps': 0,
'ocr_queue_size': 0,
'total_detections': 0,
'total_captures': 0,
'last_detection': None,
'start_time': time.time()
}
metrics_lock = threading.Lock()
# Crear carpeta de dataset si no existe
os.makedirs(DATASET_DIR, exist_ok=True)
print(f"📁 Dataset directory: {DATASET_DIR}")
def cleanup_recent_captures():
"""Limpia capturas antiguas para evitar memory leak - ejecuta cada 5 minutos"""
while True:
time.sleep(300) # 5 minutos
current_time = time.time()
with captures_lock:
expired = [k for k, v in recent_captures.items() if current_time - v > DATASET_COOLDOWN * 2]
for k in expired:
del recent_captures[k]
if expired:
print(f"🧹 Cleaned {len(expired)} expired capture records")
def save_plate_capture(plate_number, full_frame):
"""Guarda la captura de la patente para el dataset con cooldown"""
current_time = time.time()
# Validar que el frame no esté vacío
if full_frame is None or full_frame.size == 0:
print(f"⚠️ Empty frame, skipping save for {plate_number}")
return False
# Verificar cooldown
with captures_lock:
if plate_number in recent_captures:
elapsed = current_time - recent_captures[plate_number]
if elapsed < DATASET_COOLDOWN:
return False
recent_captures[plate_number] = current_time
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
frame_to_save = np.copy(full_frame)
filename = f"{plate_number}_{timestamp}.jpg"
filepath = f"{DATASET_DIR}/{filename}"
success = cv2.imwrite(filepath, frame_to_save, [cv2.IMWRITE_JPEG_QUALITY, 95])
if not success or not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
print(f"❌ Failed to save image for {plate_number}")
if os.path.exists(filepath):
os.remove(filepath)
return False
# Invalidar cache
dataset_cache['timestamp'] = 0
# Actualizar métricas
with metrics_lock:
metrics['total_captures'] += 1
# Contar total de capturas
total_count = len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')])
# Notificar al backend
try:
requests.post(f"{BACKEND_URL}/api/dataset/capture", json={
'plate_number': plate_number,
'filename': filename,
'count': total_count
}, timeout=2)
except:
pass
print(f"📸 Saved to dataset: {plate_number} (Total: {total_count})")
return True
except Exception as e:
print(f"❌ Error saving capture: {e}")
return False
def send_plate(plate_number):
"""Envía la patente detectada al backend"""
try:
url = f"{BACKEND_URL}/api/detect"
requests.post(url, json={'plate_number': plate_number}, timeout=3)
print(f"✓ Plate sent: {plate_number}")
with metrics_lock:
metrics['total_detections'] += 1
metrics['last_detection'] = plate_number
except Exception as e:
print(f"✗ Error sending plate: {e}")
def validate_plate(text):
"""Valida formato chileno"""
return bool(re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text))
def ocr_worker(reader, worker_id):
"""Hilo dedicado para OCR - múltiples workers para mejor rendimiento"""
print(f"🔤 OCR Worker {worker_id} started")
while True:
try:
data = ocr_queue.get(timeout=1)
if data is None:
continue
plate_img, full_frame = data
gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY)
ocr_results = reader.readtext(gray, detail=0, paragraph=False,
allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
for text in ocr_results:
clean_text = ''.join(e for e in text if e.isalnum()).upper()
if len(clean_text) >= 6 and validate_plate(clean_text):
send_plate(clean_text)
save_plate_capture(clean_text, full_frame)
except:
pass
def camera_loop():
"""Hilo principal de captura"""
global outputFrame, latest_detections
print("🚀 Initializing ALPR System...")
print(f"⚙️ Config: PROCESS_INTERVAL={PROCESS_INTERVAL}s, OCR_WORKERS={OCR_WORKERS}")
print("📷 Loading camera...")
cap = cv2.VideoCapture(CAMERA_ID)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
print("🧠 Loading YOLO model...")
try:
model = YOLO(MODEL_PATH)
except Exception as e:
print(f"❌ Critical Error loading model: {e}")
return
print("📝 Initializing EasyOCR...")
reader = easyocr.Reader(['en'], gpu=False)
# Iniciar múltiples workers de OCR
for i in range(OCR_WORKERS):
t = threading.Thread(target=ocr_worker, args=(reader, i+1), daemon=True)
t.start()
# Iniciar limpiador de cache
cleanup_thread = threading.Thread(target=cleanup_recent_captures, daemon=True)
cleanup_thread.start()
print("✅ System ready!")
last_process_time = 0
frame_count = 0
fps_start_time = time.time()
while True:
cap.grab()
cap.grab()
ret, frame = cap.retrieve()
if not ret:
time.sleep(0.01)
continue
frame_count += 1
current_time = time.time()
# Calcular FPS cada segundo
if current_time - fps_start_time >= 1.0:
with metrics_lock:
metrics['fps'] = frame_count
metrics['ocr_queue_size'] = ocr_queue.qsize()
frame_count = 0
fps_start_time = current_time
# Procesar ALPR
if current_time - last_process_time > PROCESS_INTERVAL:
last_process_time = current_time
results = model(frame, verbose=False, imgsz=320, conf=0.5)
new_detections = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
new_detections.append((x1, y1, x2, y2, conf))
plate_img = frame[y1:y2, x1:x2].copy()
if plate_img.size > 0 and not ocr_queue.full():
ocr_queue.put((plate_img, frame.copy()))
with detection_lock:
latest_detections = new_detections
display_frame = frame
with detection_lock:
for (x1, y1, x2, y2, conf) in latest_detections:
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(display_frame, f"{conf:.0%}", (x1, y1-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
with frame_lock:
outputFrame = display_frame
def generate():
"""Generador para streaming MJPEG"""
global outputFrame
while True:
time.sleep(0.033)
with frame_lock:
if outputFrame is None:
continue
_, encoded = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 75])
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + encoded.tobytes() + b'\r\n'
@app.route("/video_feed")
def video_feed():
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/health")
def health():
"""Health check completo con métricas"""
with metrics_lock:
uptime = time.time() - metrics['start_time']
return {
"status": "ok",
"service": "alpr",
"uptime_seconds": int(uptime),
"fps": metrics['fps'],
"ocr_queue_size": metrics['ocr_queue_size'],
"ocr_workers": OCR_WORKERS,
"total_detections": metrics['total_detections'],
"total_captures": metrics['total_captures'],
"last_detection": metrics['last_detection'],
"dataset_size": len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')])
}
@app.route("/dataset/count")
def dataset_count():
try:
files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]
return {"plates_captured": len(files), "total_files": len(files)}
except:
return {"plates_captured": 0, "total_files": 0}
@app.route("/dataset/list")
def dataset_list():
"""Lista las imágenes del dataset con paginación y cache"""
current_time = time.time()
# Usar cache si está vigente
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
cache_key = f"{page}_{per_page}"
try:
# Obtener lista de archivos (con cache básico)
if dataset_cache['timestamp'] == 0 or current_time - dataset_cache['timestamp'] > dataset_cache['ttl']:
files = [f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')]
files_with_time = [(f, os.path.getmtime(os.path.join(DATASET_DIR, f))) for f in files]
files_with_time.sort(key=lambda x: x[1], reverse=True)
dataset_cache['data'] = [f[0] for f in files_with_time]
dataset_cache['timestamp'] = current_time
sorted_files = dataset_cache['data']
# Paginación
total = len(sorted_files)
total_pages = (total + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
page_files = sorted_files[start:end]
images = []
for f in page_files:
parts = f.replace('.jpg', '').split('_')
plate = parts[0] if parts else 'Unknown'
images.append({
'filename': f,
'plate': plate,
'url': f'/dataset/images/{f}'
})
return {
"images": images,
"total": total,
"page": page,
"per_page": per_page,
"total_pages": total_pages
}
except Exception as e:
return {"images": [], "total": 0, "error": str(e)}
@app.route("/dataset/images/<filename>")
def dataset_image(filename):
return send_from_directory(DATASET_DIR, filename)
@app.route("/dataset/images/<filename>", methods=['DELETE'])
def delete_dataset_image(filename):
"""Elimina una imagen del dataset"""
try:
filepath = os.path.join(DATASET_DIR, filename)
if os.path.exists(filepath):
os.remove(filepath)
# Invalidar cache
dataset_cache['timestamp'] = 0
print(f"🗑️ Deleted from dataset: {filename}")
return {"success": True, "message": f"Deleted {filename}"}
else:
return {"success": False, "message": "File not found"}, 404
except Exception as e:
return {"success": False, "message": str(e)}, 500
if __name__ == "__main__":
t = threading.Thread(target=camera_loop, daemon=True)
t.start()
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)