140 lines
4.2 KiB
Python
140 lines
4.2 KiB
Python
import cv2
|
|
import easyocr
|
|
import requests
|
|
import os
|
|
import time
|
|
import threading
|
|
import numpy as np
|
|
from flask import Flask, Response
|
|
from flask_cors import CORS
|
|
|
|
# Configuration
|
|
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
|
|
CAMERA_ID = 0
|
|
PROCESS_INTERVAL = 2.0 # OCR every 2 seconds
|
|
CONFIDENCE_THRESHOLD = 0.4
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
# Global variables (using simple globals for PoC)
|
|
outputFrame = None
|
|
lock = threading.Lock()
|
|
|
|
def send_plate(plate_number):
|
|
try:
|
|
url = f"{BACKEND_URL}/api/detect"
|
|
payload = {'plate_number': plate_number}
|
|
print(f"Sending plate: {plate_number} to {url}")
|
|
requests.post(url, json=payload, timeout=2)
|
|
except Exception as e:
|
|
print(f"Error sending plate: {e}")
|
|
|
|
def alpr_loop():
|
|
global outputFrame, lock
|
|
|
|
print("Initializing EasyOCR...")
|
|
reader = easyocr.Reader(['en'], gpu=False)
|
|
print("EasyOCR initialized.")
|
|
|
|
cap = cv2.VideoCapture(CAMERA_ID)
|
|
time.sleep(2.0) # Warmup
|
|
|
|
if not cap.isOpened():
|
|
print("Error: Could not open video device.")
|
|
return
|
|
|
|
last_process_time = 0
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
print("Failed to grab frame")
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Resize for performance and streaming bandwidth
|
|
frame = cv2.resize(frame, (640, 480))
|
|
|
|
current_time = time.time()
|
|
|
|
# OCR Processing
|
|
if current_time - last_process_time > PROCESS_INTERVAL:
|
|
last_process_time = current_time
|
|
threading.Thread(target=perform_ocr, args=(reader, frame.copy())).start()
|
|
|
|
# Update output frame
|
|
with lock:
|
|
outputFrame = frame.copy()
|
|
|
|
time.sleep(0.01)
|
|
|
|
import re
|
|
|
|
# ... (imports)
|
|
|
|
def perform_ocr(reader, img):
|
|
try:
|
|
results = reader.readtext(img)
|
|
for (bbox, text, prob) in results:
|
|
if prob > CONFIDENCE_THRESHOLD:
|
|
# Clean text: keep alphanumerics
|
|
clean_text = ''.join(e for e in text if e.isalnum()).upper()
|
|
|
|
# Chilean Plate Regex Patterns
|
|
# 1. New format: 4 letters + 2 numbers (e.g., BB-BB-10) -> ^[A-Z]{4}\d{2}$
|
|
# 2. Old format: 2 letters + 4 numbers (e.g., AA-1000) -> ^[A-Z]{2}\d{4}$
|
|
# 3. Moto format (common): 3 letters + 2 numbers or 3 numbers
|
|
|
|
# General robust filter for Chile:
|
|
# - Length 6 (standard)
|
|
# - Must match specific patterns
|
|
|
|
is_valid = False
|
|
|
|
# Check Pattern 1: BBBB11 (4 Letters, 2 Digits)
|
|
if re.match(r'^[A-Z]{4}\d{2}$', clean_text):
|
|
is_valid = True
|
|
# Check Pattern 2: BB1111 (2 Letters, 4 Digits)
|
|
elif re.match(r'^[A-Z]{2}\d{4}$', clean_text):
|
|
is_valid = True
|
|
|
|
if is_valid:
|
|
print(f"Detected Valid Plate: {clean_text}")
|
|
send_plate(clean_text)
|
|
else:
|
|
# Optional logging for debugging ignored text
|
|
# print(f"Ignored: {clean_text} (Format mismatch)")
|
|
pass
|
|
|
|
except Exception as e:
|
|
print(f"OCR Error: {e}")
|
|
|
|
def generate():
|
|
global outputFrame, lock
|
|
while True:
|
|
with lock:
|
|
if outputFrame is None:
|
|
continue
|
|
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame)
|
|
if not flag:
|
|
continue
|
|
|
|
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
|
|
bytearray(encodedImage) + b'\r\n')
|
|
|
|
@app.route("/video_feed")
|
|
def video_feed():
|
|
return Response(generate(), mimetype = "multipart/x-mixed-replace; boundary=frame")
|
|
|
|
if __name__ == "__main__":
|
|
# Start capturing thread
|
|
t = threading.Thread(target=alpr_loop)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
# Start Flask Server
|
|
print("Starting Video Stream on port 5001...")
|
|
# Using 0.0.0.0 to allow access if needed, though mostly used locally
|
|
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)
|