Wayfinder/lidar_reader_V2.py

146 lines
6.3 KiB
Python
Raw Permalink Normal View History

# lidar_reader.py (Modified Again)
import time
import subprocess
import signal
from logger import get_logger
import math # Import math for float('nan') if preferred
logger = get_logger("lidar_reader")
# Define a constant for invalid/filtered readings
INVALID_READING_VALUE = -1.0 # Or use float('nan') if your processing logic handles it
class LidarReader:
def __init__(self, device='/dev/lidar', baud='256000'):
logger.info(f"Initializing LIDAR on device {device} with baud rate {baud}")
# Note: self.scan is now primarily managed within get_scan
self.proc = None
self.running = True
try:
self.proc = subprocess.Popen(
[
'/home/poebot/RPLidar/libs/rplidar_sdk/output/Linux/Release/ultra_simple',
'--channel', '--serial', device, baud
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Consider PIPE stderr too for logging errors
universal_newlines=True,
bufsize=1 # Line buffering
)
logger.info("✅ LIDAR subprocess started successfully.")
except Exception as e:
logger.error(f"❌ Failed to start LIDAR subprocess: {e}")
self.running = False # Ensure running flag is false if Popen fails
raise
# Give the LIDAR process a moment to start up fully
logger.debug("🌀 Letting LIDAR warm up for 1 second...")
time.sleep(1.0) # Keep the warmup delay
def get_scan(self, timeout=2.0):
"""
Attempts to read a full 360-degree scan from the LIDAR subprocess.
Args:
timeout (float): Maximum time in seconds to wait for a full scan.
Returns:
tuple: (list[float], list[int]) containing the scan data and angles (0-359)
if a full scan is received within the timeout. Returns (None, None)
if a timeout occurs or the process stops.
list[float]: Scan data where each index corresponds to the angle.
INVALID_READING_VALUE indicates a filtered or missing reading.
list[int]: List of angles [0, 1, ..., 359].
"""
if not self.running or self.proc is None or self.proc.poll() is not None:
logger.warning("⚠️ Attempted to get scan but LIDAR process is not running.")
return None, None
# Initialize scan list with the invalid value placeholder for this attempt
current_scan = [INVALID_READING_VALUE] * 360
angles_received_this_scan = set()
start_time = time.time()
while time.time() - start_time < timeout:
# Check if the process terminated unexpectedly
if self.proc.poll() is not None:
logger.error("❌ LIDAR subprocess terminated unexpectedly.")
self.running = False
return None, None
line = self.proc.stdout.readline()
if not line:
# No output ready, small sleep to prevent busy-waiting 100% CPU
# Adjust sleep time based on expected data rate vs CPU usage.
time.sleep(0.001)
continue
line = line.strip()
if ',' not in line:
# logger.debug(f"Skipping line without comma: {line}") # Optional debug
continue
try:
angle_str, distance_str = line.split(',')
angle = float(angle_str)
distance = float(distance_str)
# Convert angle to integer index (0-359)
# Consider rounding instead of truncating if angles are slightly off
i = int(round(angle)) % 360 # Use round() and modulo for robustness
# Apply distance filter
if 100 < distance < 6000:
current_scan[i] = distance # Store as float for consistency
else:
# Already initialized to INVALID_READING_VALUE, but explicit is fine
current_scan[i] = INVALID_READING_VALUE
angles_received_this_scan.add(i)
# Check if we have potentially received a full scan
# --- THIS IS THE MODIFIED LINE ---
# Changed >= 360 to >= 359
if len(angles_received_this_scan) >= 359:
# logger.debug(f"✅ Scan received ({len(angles_received_this_scan)} angles) in {time.time() - start_time:.3f}s") # Optional debug
return current_scan, list(range(360))
except ValueError:
# logger.debug(f"Skipping malformed line (ValueError): {line}") # Optional debug
continue
except Exception as e:
logger.error(f"Unexpected error processing line '{line}': {e}")
continue
# --- Timeout occurred ---
logger.warning(f"⏳ Scan timeout after {timeout:.1f}s, received {len(angles_received_this_scan)}/360 angles.")
# Return None to indicate failure to get a full scan
return None, None
def stop(self):
logger.info("🛑 Stopping LIDAR...")
self.running = False # Signal get_scan loop to stop trying
if self.proc and self.proc.poll() is None: # Check if process exists and is running
try:
# Try interrupting gracefully first
self.proc.send_signal(signal.SIGINT)
self.proc.wait(timeout=3.0) # Wait 3 seconds
logger.info("✅ LIDAR subprocess terminated successfully.")
except subprocess.TimeoutExpired:
logger.warning("⚠️ LIDAR subprocess did not exit via SIGINT, killing...")
self.proc.kill() # Force kill if it didn't respond
try:
# Wait a moment after kill to ensure it's gone
self.proc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
logger.error("❌ Failed to confirm LIDAR subprocess killed.")
logger.warning("⚠️ LIDAR subprocess killed due to timeout.")
except Exception as e:
logger.error(f"Error during LIDAR stop: {e}")
else:
logger.info("✅ LIDAR subprocess already stopped or never started.")
logger.info("✅ LIDAR stopped.")