129 lines
4.1 KiB
Python
129 lines
4.1 KiB
Python
import cv2
|
|
import easyocr
|
|
import requests
|
|
import os
|
|
import time
|
|
import threading
|
|
import numpy as np
|
|
import re
|
|
from flask import Flask, Response
|
|
from flask_cors import CORS
|
|
from ultralytics import YOLO
|
|
|
|
# Configuration
|
|
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
|
|
CAMERA_ID = 0
|
|
# Bajamos un poco el intervalo para ser más reactivos
|
|
PROCESS_INTERVAL = 2.0
|
|
CONFIDENCE_THRESHOLD = 0.4
|
|
MODEL_PATH = 'best.pt'
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
outputFrame = None
|
|
lock = threading.Lock()
|
|
latest_detections = []
|
|
|
|
def send_plate(plate_number):
|
|
try:
|
|
url = f"{BACKEND_URL}/api/detect"
|
|
payload = {'plate_number': plate_number}
|
|
requests.post(url, json=payload, timeout=3)
|
|
except Exception as e:
|
|
print(f"Error sending plate: {e}")
|
|
|
|
def alpr_loop():
|
|
global outputFrame, lock, latest_detections
|
|
|
|
print("Initializing EasyOCR...")
|
|
reader = easyocr.Reader(['en'], gpu=False) # EasyOCR es pesado en CPU
|
|
|
|
print(f"Loading YOLO model...")
|
|
try:
|
|
model = YOLO(MODEL_PATH)
|
|
except Exception as e:
|
|
print(f"Critical Error: {e}")
|
|
return
|
|
|
|
cap = cv2.VideoCapture(CAMERA_ID)
|
|
# OPTIMIZACIÓN 1: Reducir resolución en hardware
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 480)
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 360)
|
|
cap.set(cv2.CAP_PROP_FPS, 24)
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Mantener el buffer al mínimo
|
|
|
|
last_process_time = 0
|
|
|
|
while True:
|
|
# OPTIMIZACIÓN 2: Vaciar el buffer de la cámara
|
|
# Leemos varios cuadros pero solo nos quedamos con el último
|
|
for _ in range(4):
|
|
cap.grab()
|
|
|
|
ret, frame = cap.retrieve()
|
|
if not ret:
|
|
continue
|
|
|
|
current_time = time.time()
|
|
|
|
# Procesamiento ALPR
|
|
if current_time - last_process_time > PROCESS_INTERVAL:
|
|
last_process_time = current_time
|
|
|
|
# Ejecutar YOLO (verbose=False para no saturar la terminal)
|
|
results = model(frame, verbose=False, imgsz=256) # imgsz=320 acelera mucho
|
|
|
|
detections = []
|
|
for r in results:
|
|
for box in r.boxes:
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0])
|
|
conf = float(box.conf[0])
|
|
|
|
if conf > 0.5:
|
|
detections.append((x1, y1, x2, y2, conf))
|
|
plate_img = frame[y1:y2, x1:x2]
|
|
|
|
# OCR es la parte más lenta
|
|
try:
|
|
# OPTIMIZACIÓN 3: Solo leer el texto esencial
|
|
ocr_results = reader.readtext(plate_img, detail=0, paragraph=False, workers=0)
|
|
for text in ocr_results:
|
|
clean_text = ''.join(e for e in text if e.isalnum()).upper()
|
|
validate_and_send(clean_text)
|
|
except:
|
|
pass
|
|
|
|
with lock:
|
|
latest_detections = detections
|
|
|
|
# Dibujar resultados para el stream
|
|
display_frame = frame.copy()
|
|
with lock:
|
|
for (x1, y1, x2, y2, conf) in latest_detections:
|
|
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
outputFrame = display_frame
|
|
time.sleep(0.01)
|
|
|
|
def validate_and_send(text):
|
|
if re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text):
|
|
send_plate(text)
|
|
|
|
def generate():
|
|
global outputFrame, lock
|
|
while True:
|
|
time.sleep(0.05)
|
|
with lock:
|
|
if outputFrame is None: continue
|
|
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 70])
|
|
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')
|
|
|
|
@app.route("/video_feed")
|
|
def video_feed():
|
|
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
|
|
|
|
if __name__ == "__main__":
|
|
t = threading.Thread(target=alpr_loop, daemon=True)
|
|
t.start()
|
|
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)
|