160 lines
4.9 KiB
Python
160 lines
4.9 KiB
Python
import cv2
|
|
import easyocr
|
|
import requests
|
|
import os
|
|
import time
|
|
import threading
|
|
import numpy as np
|
|
import re
|
|
from flask import Flask, Response
|
|
from flask_cors import CORS
|
|
from ultralytics import YOLO
|
|
|
|
# Configuration
|
|
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
|
|
CAMERA_ID = 0
|
|
PROCESS_INTERVAL = 0.5 # Faster processing with YOLO (it's efficient)
|
|
CONFIDENCE_THRESHOLD = 0.4
|
|
MODEL_PATH = 'best.pt' # Expecting the model here
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
# Global variables
|
|
outputFrame = None
|
|
lock = threading.Lock()
|
|
# Store latest detections for visualization
|
|
latest_detections = []
|
|
|
|
def send_plate(plate_number):
|
|
try:
|
|
url = f"{BACKEND_URL}/api/detect"
|
|
payload = {'plate_number': plate_number}
|
|
print(f"Sending plate: {plate_number} to {url}")
|
|
requests.post(url, json=payload, timeout=2)
|
|
except Exception as e:
|
|
print(f"Error sending plate: {e}")
|
|
|
|
def alpr_loop():
|
|
global outputFrame, lock, latest_detections
|
|
|
|
print("Initializing EasyOCR...")
|
|
reader = easyocr.Reader(['en'], gpu=False)
|
|
print("EasyOCR initialized.")
|
|
|
|
# Load YOLO Model
|
|
print(f"Loading YOLO model from {MODEL_PATH}...")
|
|
try:
|
|
model = YOLO(MODEL_PATH)
|
|
print("YOLO model loaded successfully!")
|
|
except Exception as e:
|
|
print(f"Error loading YOLO model: {e}")
|
|
print("CRITICAL: Please place the 'best.pt' file in the alpr-service directory.")
|
|
return
|
|
|
|
cap = cv2.VideoCapture(CAMERA_ID)
|
|
time.sleep(2.0)
|
|
|
|
if not cap.isOpened():
|
|
print("Error: Could not open video device.")
|
|
return
|
|
|
|
last_process_time = 0
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
print("Failed to grab frame")
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Resize for performance
|
|
frame = cv2.resize(frame, (640, 480))
|
|
|
|
current_time = time.time()
|
|
|
|
# Detection Processing
|
|
if current_time - last_process_time > PROCESS_INTERVAL:
|
|
last_process_time = current_time
|
|
|
|
# Run YOLO Inference
|
|
results = model(frame, verbose=False)
|
|
|
|
detections = []
|
|
|
|
for r in results:
|
|
boxes = r.boxes
|
|
for box in boxes:
|
|
# Bounding Box
|
|
x1, y1, x2, y2 = box.xyxy[0]
|
|
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
|
|
conf = float(box.conf[0])
|
|
|
|
if conf > 0.5: # Valid plate detection
|
|
# Visualization data
|
|
detections.append((x1, y1, x2, y2, conf))
|
|
|
|
# Crop Plate
|
|
plate_img = frame[y1:y2, x1:x2]
|
|
|
|
# Run OCR on Crop
|
|
try:
|
|
ocr_results = reader.readtext(plate_img)
|
|
for (_, text, prob) in ocr_results:
|
|
if prob > CONFIDENCE_THRESHOLD:
|
|
clean_text = ''.join(e for e in text if e.isalnum()).upper()
|
|
validate_and_send(clean_text)
|
|
except Exception as e:
|
|
print(f"OCR Error on crop: {e}")
|
|
|
|
with lock:
|
|
latest_detections = detections
|
|
|
|
# Draw Detections on Frame for Stream
|
|
display_frame = frame.copy()
|
|
with lock:
|
|
for (x1, y1, x2, y2, conf) in latest_detections:
|
|
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
cv2.putText(display_frame, f"Plate {conf:.2f}", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
|
|
|
outputFrame = display_frame
|
|
|
|
time.sleep(0.01)
|
|
|
|
def validate_and_send(text):
|
|
# Chilean Plate Regex Patterns
|
|
is_valid = False
|
|
if re.match(r'^[A-Z]{4}\d{2}$', text): # BBBB11
|
|
is_valid = True
|
|
elif re.match(r'^[A-Z]{2}\d{4}$', text): # BB1111
|
|
is_valid = True
|
|
|
|
if is_valid:
|
|
print(f"Detected Valid Plate: {text}")
|
|
send_plate(text)
|
|
|
|
def generate():
|
|
global outputFrame, lock
|
|
while True:
|
|
with lock:
|
|
if outputFrame is None:
|
|
continue
|
|
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame)
|
|
if not flag:
|
|
continue
|
|
|
|
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
|
|
bytearray(encodedImage) + b'\r\n')
|
|
|
|
@app.route("/video_feed")
|
|
def video_feed():
|
|
return Response(generate(), mimetype = "multipart/x-mixed-replace; boundary=frame")
|
|
|
|
if __name__ == "__main__":
|
|
t = threading.Thread(target=alpr_loop)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
print("Starting Video Stream on port 5001...")
|
|
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)
|