diff --git a/alpr-service/main.py b/alpr-service/main.py index b08c22d..9df64b4 100644 --- a/alpr-service/main.py +++ b/alpr-service/main.py @@ -5,21 +5,26 @@ import os import time import threading import numpy as np +import re from flask import Flask, Response from flask_cors import CORS +from ultralytics import YOLO # Configuration BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000') CAMERA_ID = 0 -PROCESS_INTERVAL = 2.0 # OCR every 2 seconds +PROCESS_INTERVAL = 0.5 # Faster processing with YOLO (it's efficient) CONFIDENCE_THRESHOLD = 0.4 +MODEL_PATH = 'best.pt' # Expecting the model here app = Flask(__name__) CORS(app) -# Global variables (using simple globals for PoC) +# Global variables outputFrame = None lock = threading.Lock() +# Store latest detections for visualization +latest_detections = [] def send_plate(plate_number): try: @@ -31,14 +36,24 @@ def send_plate(plate_number): print(f"Error sending plate: {e}") def alpr_loop(): - global outputFrame, lock + global outputFrame, lock, latest_detections print("Initializing EasyOCR...") reader = easyocr.Reader(['en'], gpu=False) print("EasyOCR initialized.") + + # Load YOLO Model + print(f"Loading YOLO model from {MODEL_PATH}...") + try: + model = YOLO(MODEL_PATH) + print("YOLO model loaded successfully!") + except Exception as e: + print(f"Error loading YOLO model: {e}") + print("CRITICAL: Please place the 'best.pt' file in the alpr-service directory.") + return cap = cv2.VideoCapture(CAMERA_ID) - time.sleep(2.0) # Warmup + time.sleep(2.0) if not cap.isOpened(): print("Error: Could not open video device.") @@ -53,62 +68,70 @@ def alpr_loop(): time.sleep(1) continue - # Resize for performance and streaming bandwidth + # Resize for performance frame = cv2.resize(frame, (640, 480)) current_time = time.time() - # OCR Processing + # Detection Processing if current_time - last_process_time > PROCESS_INTERVAL: last_process_time = current_time - threading.Thread(target=perform_ocr, args=(reader, frame.copy())).start() + + # Run YOLO Inference + results = model(frame, verbose=False) + + detections = [] + + for r in results: + boxes = r.boxes + for box in boxes: + # Bounding Box + x1, y1, x2, y2 = box.xyxy[0] + x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) + conf = float(box.conf[0]) + + if conf > 0.5: # Valid plate detection + # Visualization data + detections.append((x1, y1, x2, y2, conf)) + + # Crop Plate + plate_img = frame[y1:y2, x1:x2] + + # Run OCR on Crop + try: + ocr_results = reader.readtext(plate_img) + for (_, text, prob) in ocr_results: + if prob > CONFIDENCE_THRESHOLD: + clean_text = ''.join(e for e in text if e.isalnum()).upper() + validate_and_send(clean_text) + except Exception as e: + print(f"OCR Error on crop: {e}") - # Update output frame + with lock: + latest_detections = detections + + # Draw Detections on Frame for Stream + display_frame = frame.copy() with lock: - outputFrame = frame.copy() + for (x1, y1, x2, y2, conf) in latest_detections: + cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) + cv2.putText(display_frame, f"Plate {conf:.2f}", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) + + outputFrame = display_frame time.sleep(0.01) -import re - -# ... (imports) - -def perform_ocr(reader, img): - try: - results = reader.readtext(img) - for (bbox, text, prob) in results: - if prob > CONFIDENCE_THRESHOLD: - # Clean text: keep alphanumerics - clean_text = ''.join(e for e in text if e.isalnum()).upper() - - # Chilean Plate Regex Patterns - # 1. New format: 4 letters + 2 numbers (e.g., BB-BB-10) -> ^[A-Z]{4}\d{2}$ - # 2. Old format: 2 letters + 4 numbers (e.g., AA-1000) -> ^[A-Z]{2}\d{4}$ - # 3. Moto format (common): 3 letters + 2 numbers or 3 numbers - - # General robust filter for Chile: - # - Length 6 (standard) - # - Must match specific patterns - - is_valid = False - - # Check Pattern 1: BBBB11 (4 Letters, 2 Digits) - if re.match(r'^[A-Z]{4}\d{2}$', clean_text): - is_valid = True - # Check Pattern 2: BB1111 (2 Letters, 4 Digits) - elif re.match(r'^[A-Z]{2}\d{4}$', clean_text): - is_valid = True - - if is_valid: - print(f"Detected Valid Plate: {clean_text}") - send_plate(clean_text) - else: - # Optional logging for debugging ignored text - # print(f"Ignored: {clean_text} (Format mismatch)") - pass - - except Exception as e: - print(f"OCR Error: {e}") +def validate_and_send(text): + # Chilean Plate Regex Patterns + is_valid = False + if re.match(r'^[A-Z]{4}\d{2}$', text): # BBBB11 + is_valid = True + elif re.match(r'^[A-Z]{2}\d{4}$', text): # BB1111 + is_valid = True + + if is_valid: + print(f"Detected Valid Plate: {text}") + send_plate(text) def generate(): global outputFrame, lock @@ -128,12 +151,9 @@ def video_feed(): return Response(generate(), mimetype = "multipart/x-mixed-replace; boundary=frame") if __name__ == "__main__": - # Start capturing thread t = threading.Thread(target=alpr_loop) t.daemon = True t.start() - # Start Flask Server print("Starting Video Stream on port 5001...") - # Using 0.0.0.0 to allow access if needed, though mostly used locally app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False) diff --git a/alpr-service/requirements.txt b/alpr-service/requirements.txt index 0543f2d..427d711 100644 --- a/alpr-service/requirements.txt +++ b/alpr-service/requirements.txt @@ -3,3 +3,5 @@ easyocr requests numpy flask +flask-cors +ultralytics