Files
ControlPatente/alpr-service/main.py

160 lines
4.9 KiB
Python

import cv2
import easyocr
import requests
import os
import time
import threading
import numpy as np
import re
from flask import Flask, Response
from flask_cors import CORS
from ultralytics import YOLO
# Configuration
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
CAMERA_ID = 0
PROCESS_INTERVAL = 0.5 # Faster processing with YOLO (it's efficient)
CONFIDENCE_THRESHOLD = 0.4
MODEL_PATH = 'best.pt' # Expecting the model here
app = Flask(__name__)
CORS(app)
# Global variables
outputFrame = None
lock = threading.Lock()
# Store latest detections for visualization
latest_detections = []
def send_plate(plate_number):
try:
url = f"{BACKEND_URL}/api/detect"
payload = {'plate_number': plate_number}
print(f"Sending plate: {plate_number} to {url}")
requests.post(url, json=payload, timeout=2)
except Exception as e:
print(f"Error sending plate: {e}")
def alpr_loop():
global outputFrame, lock, latest_detections
print("Initializing EasyOCR...")
reader = easyocr.Reader(['en'], gpu=False)
print("EasyOCR initialized.")
# Load YOLO Model
print(f"Loading YOLO model from {MODEL_PATH}...")
try:
model = YOLO(MODEL_PATH)
print("YOLO model loaded successfully!")
except Exception as e:
print(f"Error loading YOLO model: {e}")
print("CRITICAL: Please place the 'best.pt' file in the alpr-service directory.")
return
cap = cv2.VideoCapture(CAMERA_ID)
time.sleep(2.0)
if not cap.isOpened():
print("Error: Could not open video device.")
return
last_process_time = 0
while True:
ret, frame = cap.read()
if not ret:
print("Failed to grab frame")
time.sleep(1)
continue
# Resize for performance
frame = cv2.resize(frame, (640, 480))
current_time = time.time()
# Detection Processing
if current_time - last_process_time > PROCESS_INTERVAL:
last_process_time = current_time
# Run YOLO Inference
results = model(frame, verbose=False)
detections = []
for r in results:
boxes = r.boxes
for box in boxes:
# Bounding Box
x1, y1, x2, y2 = box.xyxy[0]
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
conf = float(box.conf[0])
if conf > 0.5: # Valid plate detection
# Visualization data
detections.append((x1, y1, x2, y2, conf))
# Crop Plate
plate_img = frame[y1:y2, x1:x2]
# Run OCR on Crop
try:
ocr_results = reader.readtext(plate_img)
for (_, text, prob) in ocr_results:
if prob > CONFIDENCE_THRESHOLD:
clean_text = ''.join(e for e in text if e.isalnum()).upper()
validate_and_send(clean_text)
except Exception as e:
print(f"OCR Error on crop: {e}")
with lock:
latest_detections = detections
# Draw Detections on Frame for Stream
display_frame = frame.copy()
with lock:
for (x1, y1, x2, y2, conf) in latest_detections:
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(display_frame, f"Plate {conf:.2f}", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
outputFrame = display_frame
time.sleep(0.01)
def validate_and_send(text):
# Chilean Plate Regex Patterns
is_valid = False
if re.match(r'^[A-Z]{4}\d{2}$', text): # BBBB11
is_valid = True
elif re.match(r'^[A-Z]{2}\d{4}$', text): # BB1111
is_valid = True
if is_valid:
print(f"Detected Valid Plate: {text}")
send_plate(text)
def generate():
global outputFrame, lock
while True:
with lock:
if outputFrame is None:
continue
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame)
if not flag:
continue
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(encodedImage) + b'\r\n')
@app.route("/video_feed")
def video_feed():
return Response(generate(), mimetype = "multipart/x-mixed-replace; boundary=frame")
if __name__ == "__main__":
t = threading.Thread(target=alpr_loop)
t.daemon = True
t.start()
print("Starting Video Stream on port 5001...")
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)