Files
ControlPatente/alpr-service/main.py

170 lines
5.4 KiB
Python

import cv2
import easyocr
import requests
import os
import time
import threading
import re
from queue import Queue
from flask import Flask, Response
from flask_cors import CORS
from ultralytics import YOLO
# Configuration
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
CAMERA_ID = 0
PROCESS_INTERVAL = 1.5 # Más reactivo
MODEL_PATH = 'best.pt'
app = Flask(__name__)
CORS(app)
# Shared state
outputFrame = None
frame_lock = threading.Lock()
latest_detections = []
detection_lock = threading.Lock()
# Cola para procesamiento OCR asíncrono
ocr_queue = Queue(maxsize=5)
def send_plate(plate_number):
"""Envía la patente detectada al backend"""
try:
url = f"{BACKEND_URL}/api/detect"
requests.post(url, json={'plate_number': plate_number}, timeout=3)
print(f"✓ Plate sent: {plate_number}")
except Exception as e:
print(f"✗ Error sending plate: {e}")
def validate_and_send(text):
"""Valida formato chileno y envía"""
# Formato nuevo: XXXX-00 | Formato antiguo: XX-0000
if re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text):
send_plate(text)
return True
return False
def ocr_worker(reader):
"""Hilo dedicado para OCR - no bloquea el stream"""
while True:
try:
plate_img = ocr_queue.get(timeout=1)
if plate_img is None:
continue
# Preprocesamiento para mejor OCR
gray = cv2.cvtColor(plate_img, cv2.COLOR_BGR2GRAY)
ocr_results = reader.readtext(gray, detail=0, paragraph=False,
allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
for text in ocr_results:
clean_text = ''.join(e for e in text if e.isalnum()).upper()
if len(clean_text) >= 6:
validate_and_send(clean_text)
except:
pass
def camera_loop():
"""Hilo principal de captura - mantiene FPS alto"""
global outputFrame, latest_detections
print("🚀 Initializing ALPR System...")
print("📷 Loading camera...")
cap = cv2.VideoCapture(CAMERA_ID)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
print("🧠 Loading YOLO model...")
try:
model = YOLO(MODEL_PATH)
except Exception as e:
print(f"❌ Critical Error loading model: {e}")
return
print("📝 Initializing EasyOCR...")
reader = easyocr.Reader(['en'], gpu=False)
# Iniciar worker de OCR
ocr_thread = threading.Thread(target=ocr_worker, args=(reader,), daemon=True)
ocr_thread.start()
print("✅ System ready!")
last_process_time = 0
frame_count = 0
while True:
# Captura eficiente - solo 2 grabs
cap.grab()
cap.grab()
ret, frame = cap.retrieve()
if not ret:
time.sleep(0.01)
continue
frame_count += 1
current_time = time.time()
# Procesar ALPR cada PROCESS_INTERVAL segundos
if current_time - last_process_time > PROCESS_INTERVAL:
last_process_time = current_time
# YOLO detection - usar imgsz pequeño para velocidad
results = model(frame, verbose=False, imgsz=320, conf=0.5)
new_detections = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
new_detections.append((x1, y1, x2, y2, conf))
# Extraer imagen de placa y enviar a cola OCR
plate_img = frame[y1:y2, x1:x2].copy()
if plate_img.size > 0 and not ocr_queue.full():
ocr_queue.put(plate_img)
with detection_lock:
latest_detections = new_detections
# Actualizar frame para streaming (sin bloquear)
display_frame = frame
with detection_lock:
for (x1, y1, x2, y2, conf) in latest_detections:
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(display_frame, f"{conf:.0%}", (x1, y1-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
with frame_lock:
outputFrame = display_frame
def generate():
"""Generador para streaming MJPEG"""
global outputFrame
while True:
time.sleep(0.033) # ~30 FPS para el stream
with frame_lock:
if outputFrame is None:
continue
_, encoded = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 75])
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + encoded.tobytes() + b'\r\n'
@app.route("/video_feed")
def video_feed():
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/health")
def health():
return {"status": "ok", "service": "alpr"}
if __name__ == "__main__":
t = threading.Thread(target=camera_loop, daemon=True)
t.start()
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)