Optimización de ALPR para Raspberry Pi: ajuste de resolución y límites de CPU
This commit is contained in:
@@ -13,25 +13,23 @@ from ultralytics import YOLO
|
||||
# Configuration
|
||||
BACKEND_URL = os.environ.get('BACKEND_URL', 'http://localhost:3000')
|
||||
CAMERA_ID = 0
|
||||
PROCESS_INTERVAL = 0.5 # Faster processing with YOLO (it's efficient)
|
||||
# Bajamos un poco el intervalo para ser más reactivos
|
||||
PROCESS_INTERVAL = 2.0
|
||||
CONFIDENCE_THRESHOLD = 0.4
|
||||
MODEL_PATH = 'best.pt' # Expecting the model here
|
||||
MODEL_PATH = 'best.pt'
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
|
||||
# Global variables
|
||||
outputFrame = None
|
||||
lock = threading.Lock()
|
||||
# Store latest detections for visualization
|
||||
latest_detections = []
|
||||
|
||||
def send_plate(plate_number):
|
||||
try:
|
||||
url = f"{BACKEND_URL}/api/detect"
|
||||
payload = {'plate_number': plate_number}
|
||||
print(f"Sending plate: {plate_number} to {url}")
|
||||
requests.post(url, json=payload, timeout=2)
|
||||
requests.post(url, json=payload, timeout=3)
|
||||
except Exception as e:
|
||||
print(f"Error sending plate: {e}")
|
||||
|
||||
@@ -39,121 +37,92 @@ def alpr_loop():
|
||||
global outputFrame, lock, latest_detections
|
||||
|
||||
print("Initializing EasyOCR...")
|
||||
reader = easyocr.Reader(['en'], gpu=False)
|
||||
print("EasyOCR initialized.")
|
||||
reader = easyocr.Reader(['en'], gpu=False) # EasyOCR es pesado en CPU
|
||||
|
||||
# Load YOLO Model
|
||||
print(f"Loading YOLO model from {MODEL_PATH}...")
|
||||
print(f"Loading YOLO model...")
|
||||
try:
|
||||
model = YOLO(MODEL_PATH)
|
||||
print("YOLO model loaded successfully!")
|
||||
except Exception as e:
|
||||
print(f"Error loading YOLO model: {e}")
|
||||
print("CRITICAL: Please place the 'best.pt' file in the alpr-service directory.")
|
||||
print(f"Critical Error: {e}")
|
||||
return
|
||||
|
||||
cap = cv2.VideoCapture(CAMERA_ID)
|
||||
time.sleep(2.0)
|
||||
|
||||
if not cap.isOpened():
|
||||
print("Error: Could not open video device.")
|
||||
return
|
||||
# OPTIMIZACIÓN 1: Reducir resolución en hardware
|
||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 480)
|
||||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 360)
|
||||
cap.set(cv2.CAP_PROP_FPS, 15)
|
||||
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Mantener el buffer al mínimo
|
||||
|
||||
last_process_time = 0
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
print("Failed to grab frame")
|
||||
time.sleep(1)
|
||||
continue
|
||||
# OPTIMIZACIÓN 2: Vaciar el buffer de la cámara
|
||||
# Leemos varios cuadros pero solo nos quedamos con el último
|
||||
for _ in range(4):
|
||||
cap.grab()
|
||||
|
||||
# Resize for performance
|
||||
frame = cv2.resize(frame, (640, 480))
|
||||
ret, frame = cap.retrieve()
|
||||
if not ret:
|
||||
continue
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# Detection Processing
|
||||
# Procesamiento ALPR
|
||||
if current_time - last_process_time > PROCESS_INTERVAL:
|
||||
last_process_time = current_time
|
||||
|
||||
# Run YOLO Inference
|
||||
results = model(frame, verbose=False)
|
||||
# Ejecutar YOLO (verbose=False para no saturar la terminal)
|
||||
results = model(frame, verbose=False, imgsz=256) # imgsz=320 acelera mucho
|
||||
|
||||
detections = []
|
||||
|
||||
for r in results:
|
||||
boxes = r.boxes
|
||||
for box in boxes:
|
||||
# Bounding Box
|
||||
x1, y1, x2, y2 = box.xyxy[0]
|
||||
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
|
||||
for box in r.boxes:
|
||||
x1, y1, x2, y2 = map(int, box.xyxy[0])
|
||||
conf = float(box.conf[0])
|
||||
|
||||
if conf > 0.5: # Valid plate detection
|
||||
# Visualization data
|
||||
if conf > 0.5:
|
||||
detections.append((x1, y1, x2, y2, conf))
|
||||
|
||||
# Crop Plate
|
||||
plate_img = frame[y1:y2, x1:x2]
|
||||
|
||||
# Run OCR on Crop
|
||||
# OCR es la parte más lenta
|
||||
try:
|
||||
ocr_results = reader.readtext(plate_img)
|
||||
for (_, text, prob) in ocr_results:
|
||||
if prob > CONFIDENCE_THRESHOLD:
|
||||
clean_text = ''.join(e for e in text if e.isalnum()).upper()
|
||||
validate_and_send(clean_text)
|
||||
except Exception as e:
|
||||
print(f"OCR Error on crop: {e}")
|
||||
# OPTIMIZACIÓN 3: Solo leer el texto esencial
|
||||
ocr_results = reader.readtext(plate_img, detail=0, paragraph=False, workers=0)
|
||||
for text in ocr_results:
|
||||
clean_text = ''.join(e for e in text if e.isalnum()).upper()
|
||||
validate_and_send(clean_text)
|
||||
except:
|
||||
pass
|
||||
|
||||
with lock:
|
||||
latest_detections = detections
|
||||
|
||||
# Draw Detections on Frame for Stream
|
||||
# Dibujar resultados para el stream
|
||||
display_frame = frame.copy()
|
||||
with lock:
|
||||
for (x1, y1, x2, y2, conf) in latest_detections:
|
||||
cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
||||
cv2.putText(display_frame, f"Plate {conf:.2f}", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
||||
|
||||
outputFrame = display_frame
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
def validate_and_send(text):
|
||||
# Chilean Plate Regex Patterns
|
||||
is_valid = False
|
||||
if re.match(r'^[A-Z]{4}\d{2}$', text): # BBBB11
|
||||
is_valid = True
|
||||
elif re.match(r'^[A-Z]{2}\d{4}$', text): # BB1111
|
||||
is_valid = True
|
||||
|
||||
if is_valid:
|
||||
print(f"Detected Valid Plate: {text}")
|
||||
if re.match(r'^[A-Z]{4}\d{2}$', text) or re.match(r'^[A-Z]{2}\d{4}$', text):
|
||||
send_plate(text)
|
||||
|
||||
def generate():
|
||||
global outputFrame, lock
|
||||
while True:
|
||||
time.sleep(0.05)
|
||||
with lock:
|
||||
if outputFrame is None:
|
||||
continue
|
||||
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame)
|
||||
if not flag:
|
||||
continue
|
||||
|
||||
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
|
||||
bytearray(encodedImage) + b'\r\n')
|
||||
if outputFrame is None: continue
|
||||
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame, [cv2.IMWRITE_JPEG_QUALITY, 70])
|
||||
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')
|
||||
|
||||
@app.route("/video_feed")
|
||||
def video_feed():
|
||||
return Response(generate(), mimetype = "multipart/x-mixed-replace; boundary=frame")
|
||||
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
|
||||
|
||||
if __name__ == "__main__":
|
||||
t = threading.Thread(target=alpr_loop)
|
||||
t.daemon = True
|
||||
t = threading.Thread(target=alpr_loop, daemon=True)
|
||||
t.start()
|
||||
|
||||
print("Starting Video Stream on port 5001...")
|
||||
app.run(host="0.0.0.0", port=5001, debug=False, threaded=True, use_reloader=False)
|
||||
|
||||
Reference in New Issue
Block a user