Wayfinder/main_V2.py
milo 8adbe8a491 Inital commit
Last bit of the data from the bot
2025-05-06 21:39:41 -04:00

303 lines
10 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# main.py (Complete Structure)
import sys
import os
import time
RUNNING_IN_GUI = os.getenv("RUNNING_IN_GUI") == "0"
import numpy as np
import matplotlib.pyplot as plt
#if os.getenv("RUNNING_IN_GUI") == "1":
# plt.switch_backend('Agg')
from matplotlib.colors import LinearSegmentedColormap
import subprocess # Ensure subprocess is imported if LidarReader might need it indirectly (though it's self-contained)
from lidar_reader import LidarReader
from imu_reader import IMUReader
# Import necessary components from BreezySLAM
from breezyslam.algorithms import RMHC_SLAM
from breezyslam.sensors import Laser # <--- Import Laser base class
# Add the parent of TurboPi to the path
sys.path.append('/home/poebot/RPLidar/libs/rplidar_sdk/app/ultra_simple/TurboPi/HiwonderSDK/')
from mecanum import MecanumChassis
from power_monitor import PowerMonitor
power = PowerMonitor()
# Import logger if you have a separate logger setup
# from logger import get_logger
# logger = get_logger()
# If not using a separate logger.py, configure basic logging:
from logger import get_logger
logger = get_logger()
# Set higher level for noisy BreezySLAM logs if needed
# logging.getLogger('breezyslam').setLevel(logging.WARNING)
chassis = MecanumChassis()
from wall_follower import WallFollower
# --- Constants ---
MAP_SIZE_PIXELS = 800
MAP_SIZE_METERS = 8
# Calculate MM_TO_PIXELS correctly
MM_TO_PIXELS = MAP_SIZE_PIXELS / (MAP_SIZE_METERS * 1000.0)
CENTER = MAP_SIZE_PIXELS // 2
LIDAR_UPDATE_INTERVAL = 0.1 # Optional: control loop frequency
# --- Sensor model ---
# Describes the LIDAR characteristics to BreezySLAM
class RPLidarSensor(Laser):
def __init__(self):
super().__init__(
scan_size=360, # Number of measurements per scan
scan_rate_hz=5, # Scan rate (adjust if known, affects some SLAM aspects)
detection_angle_degrees=360, # Angular range of the scan
distance_no_detection_mm=6000, # Max range in mm or value for no detection, prev 6k
offset_mm=0 # Offset from robot center (if any)
)
# Should be a list of floats/ints from 0 to 359
self.scan_angles_degrees = list(range(360))
# --- IMU Initialization ---
try:
imu = IMUReader()
except Exception as e:
logger.error(f"❌ Failed to initialize IMU: {e}")
imu = None # Allow running without for testing
# --- LIDAR Initialization ---
try:
# Use the correct device path for your system
lidar = LidarReader(device="/dev/lidar")
logger.debug("🌀 Letting LIDAR warm up for 1 second...")
time.sleep(1.0)
logger.debug("⏳ Waiting for first valid LIDAR scan...")
while True:
scan, _ = lidar.get_scan()
logger.debug(f"👀 Current scan length: {len(scan) if scan else 0}")
if scan and len(scan) == 360:
logger.debug("✅ First full LIDAR scan received. Proceeding with SLAM.")
break
else:
logger.debug(f"🕰️ Incomplete scan, retrying...")
time.sleep(0.1)
except Exception as e:
logger.error(f"❌ Failed to initialize LIDAR: {e}")
exit() # Exit if LIDAR fails
# --- SLAM Initialization ---
try:
# Create an instance of sensor description class
sensor_description = RPLidarSensor()
# Initialize SLAM with the sensor description, map size in pixels, and map size in meters
slam = RMHC_SLAM(sensor_description, MAP_SIZE_PIXELS, MAP_SIZE_METERS)
except Exception as e:
logger.error(f"❌ Failed to initialize SLAM: {e}")
exit()
# --- Visualization setup ---
try:
cmap = LinearSegmentedColormap.from_list("redmap", ["#FFD700", "#4C566A"]) # Black to red
plt.ion()
fig, ax = plt.subplots(figsize=(8, 8))
fig.patch.set_facecolor('#0a0a0a')
ax.set_facecolor('#0a0a0a')
map_im = ax.imshow(np.zeros((MAP_SIZE_PIXELS, MAP_SIZE_PIXELS), dtype=np.uint8),
cmap=cmap, origin='lower', vmin=0, vmax=255)
robot_dot, = ax.plot([], [], 'co', markersize=5)
path_line, = ax.plot([], [], 'c-', linewidth=1, alpha=0.7)
ax.set_title("SLAM Map (Press Ctrl+C to save & exit)", color="white")
ax.set_aspect('equal', adjustable='box')
# 🚫 Remove this if running from the GUI
if not RUNNING_IN_GUI:
plt.show() # Only show in standalone
# Show the plot window
logger.info("✅ Visualization initialized.")
except Exception as e:
logger.error(f"❌ Failed to initialize visualization: {e}")
visualization_enabled = False
logger.warning("⚠️ Running without visualization due to error.")
else:
visualization_enabled = True
wall_follower = WallFollower(chassis)
# --- Main Loop ---
mapbytes = bytearray(MAP_SIZE_PIXELS * MAP_SIZE_PIXELS)
pose = [0.0, 0.0, 0.0] # [x_mm, y_mm, theta_degrees]
path = []
viz_center_x = CENTER
viz_center_y = CENTER
try:
print("🗺️ Mapping started. Move the bot around to map.")
logger.info("🗺️ Mapping started.")
last_map_update_time = time.time()
while True:
loop_start_time = time.time()
voltage = power.get_voltage()
if voltage is not None:
logger.debug(f"🔋 Voltage = {voltage:.2f} V")
if not power.voltage_ok():
logger.warning("🛑 Voltage too low! Stopping motors.")
chassis.reset_motors()
time.sleep(5)
continue
try:
scan_data, scan_angles = lidar.get_scan()
if scan_data is None or scan_angles is None:
logger.warning("⚠️ LIDAR scan failed or incomplete, skipping iteration.")
time.sleep(0.1)
continue
except Exception as e:
logger.error(f"❌ Error getting LIDAR scan: {e}")
time.sleep(0.5)
continue
if not scan_data or len(scan_data) != sensor_description.scan_size:
logger.warning(f"Received invalid scan data (length {len(scan_data)}), skipping SLAM update.")
time.sleep(0.1)
continue
heading = None
if imu:
try:
heading = imu.get_heading()
except Exception as e:
logger.warning(f"⚠️ Could not get IMU heading: {e}")
try:
slam.update(scan_data, scan_angles_degrees=scan_angles)
except Exception as e:
logger.error(f"❌ Error updating SLAM: {e}")
time.sleep(0.5)
continue
try:
pose = slam.getpos()
except Exception as e:
logger.error(f"❌ Error getting SLAM pose: {e}")
#Let the wall follower take control
try:
logger.debug("📡 About to call wall_follower.step()")
wall_follower.step(scan_data)
except Exception as e:
logger.error(f"⚠️ Wall follower error: {e}")
current_time = time.time()
if current_time - last_map_update_time > 0.1:
try:
slam.getmap(mapbytes)
map_np = np.reshape(mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS))
last_map_update_time = current_time
if visualization_enabled:
x_pix = int(pose[0] * MM_TO_PIXELS + viz_center_x)
y_pix = int(pose[1] * MM_TO_PIXELS + viz_center_y)
path.append((x_pix, y_pix))
max_path_points = 2000
if len(path) > max_path_points:
path = path[-max_path_points:]
path_x, path_y = zip(*path) if path else ([], [])
map_im.set_data(map_np)
robot_dot.set_data([x_pix], [y_pix])
path_line.set_data(path_x, path_y)
plt.draw()
plt.pause(0.001)
except Exception as e:
logger.error(f"❌ Error updating map/visualization: {e}")
visualization_enabled = False
loop_duration = time.time() - loop_start_time
sleep_time = LIDAR_UPDATE_INTERVAL - loop_duration
if sleep_time > 0:
time.sleep(sleep_time)
except KeyboardInterrupt:
print("\n🛑 Mapping stopped by user.")
logger.info("🛑 KeyboardInterrupt received, stopping mapping.")
# Stop motors FIRST
try:
if 'wall_follower' in locals():
wall_follower.stop()
elif 'chassis' in locals() and chassis:
chassis.reset_motors()
logger.info("✅ Motors stopped.")
except Exception as e:
logger.warning(f"⚠️ Could not stop motors: {e}")
# Save the map
try:
slam.getmap(mapbytes)
final_map = np.reshape(mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS))
np.save("slam_occupancy_grid.npy", final_map)
plt.imsave("slam_map_final.png", final_map, cmap=cmap, origin='lower')
print("💾 Map saved to slam_map_final.png and slam_occupancy_grid.npy.")
logger.info("💾 Map saved successfully.")
except Exception as e:
logger.error(f"❌ Failed to save final map: {e}")
except Exception as e:
logger.exception(f"🚨 An unexpected error occurred in the main loop: {e}")
finally:
print("🧹 Cleaning up...")
logger.info("🧹 Cleaning up resources...")
# Ensure motors are stopped
try:
if 'wall_follower' in locals():
wall_follower.stop()
elif 'chassis' in locals() and chassis:
chassis.reset_motors()
logger.info("✅ Motors stopped (from finally block).")
except Exception as e:
logger.warning(f"⚠️ Could not stop motors in finally: {e}")
# Stop LIDAR
if 'lidar' in locals() and lidar:
try:
lidar.stop()
logger.info("✅ LIDAR stopped.")
except Exception as e:
logger.error(f"❌ Error stopping LIDAR: {e}")
# Shutdown visualization
if visualization_enabled and 'plt' in locals():
try:
if not RUNNING_IN_GUI:
plt.ioff()
logger.info(" Visualization ended cleanly.")
# Avoid showing the plot interactively
except Exception as e:
logger.error(f"❌ Error handling plot closure: {e}")