# main.py (Complete Structure) import sys import os import time RUNNING_IN_GUI = os.getenv("RUNNING_IN_GUI") == "0" import numpy as np import matplotlib.pyplot as plt #if os.getenv("RUNNING_IN_GUI") == "1": # plt.switch_backend('Agg') from matplotlib.colors import LinearSegmentedColormap import subprocess # Ensure subprocess is imported if LidarReader might need it indirectly (though it's self-contained) from lidar_reader import LidarReader from imu_reader import IMUReader # Import necessary components from BreezySLAM from breezyslam.algorithms import RMHC_SLAM from breezyslam.sensors import Laser # <--- Import Laser base class # Add the parent of TurboPi to the path sys.path.append('/home/poebot/RPLidar/libs/rplidar_sdk/app/ultra_simple/TurboPi/HiwonderSDK/') from mecanum import MecanumChassis from power_monitor import PowerMonitor power = PowerMonitor() # Import logger if you have a separate logger setup # from logger import get_logger # logger = get_logger() # If not using a separate logger.py, configure basic logging: from logger import get_logger logger = get_logger() # Set higher level for noisy BreezySLAM logs if needed # logging.getLogger('breezyslam').setLevel(logging.WARNING) chassis = MecanumChassis() from wall_follower import WallFollower # --- Constants --- MAP_SIZE_PIXELS = 800 MAP_SIZE_METERS = 8 # Calculate MM_TO_PIXELS correctly MM_TO_PIXELS = MAP_SIZE_PIXELS / (MAP_SIZE_METERS * 1000.0) CENTER = MAP_SIZE_PIXELS // 2 LIDAR_UPDATE_INTERVAL = 0.1 # Optional: control loop frequency # --- Sensor model --- # Describes the LIDAR characteristics to BreezySLAM class RPLidarSensor(Laser): def __init__(self): super().__init__( scan_size=360, # Number of measurements per scan scan_rate_hz=5, # Scan rate (adjust if known, affects some SLAM aspects) detection_angle_degrees=360, # Angular range of the scan distance_no_detection_mm=6000, # Max range in mm or value for no detection, prev 6k offset_mm=0 # Offset from robot center (if any) ) # Should be a list of floats/ints from 0 to 359 self.scan_angles_degrees = list(range(360)) # --- IMU Initialization --- try: imu = IMUReader() except Exception as e: logger.error(f"❌ Failed to initialize IMU: {e}") imu = None # Allow running without for testing # --- LIDAR Initialization --- try: # Use the correct device path for your system lidar = LidarReader(device="/dev/lidar") logger.debug("πŸŒ€ Letting LIDAR warm up for 1 second...") time.sleep(1.0) logger.debug("⏳ Waiting for first valid LIDAR scan...") while True: scan, _ = lidar.get_scan() logger.debug(f"πŸ‘€ Current scan length: {len(scan) if scan else 0}") if scan and len(scan) == 360: logger.debug("βœ… First full LIDAR scan received. Proceeding with SLAM.") break else: logger.debug(f"πŸ•°οΈ Incomplete scan, retrying...") time.sleep(0.1) except Exception as e: logger.error(f"❌ Failed to initialize LIDAR: {e}") exit() # Exit if LIDAR fails # --- SLAM Initialization --- try: # Create an instance of sensor description class sensor_description = RPLidarSensor() # Initialize SLAM with the sensor description, map size in pixels, and map size in meters slam = RMHC_SLAM(sensor_description, MAP_SIZE_PIXELS, MAP_SIZE_METERS) except Exception as e: logger.error(f"❌ Failed to initialize SLAM: {e}") exit() # --- Visualization setup --- try: cmap = LinearSegmentedColormap.from_list("redmap", ["#FFD700", "#4C566A"]) # Black to red plt.ion() fig, ax = plt.subplots(figsize=(8, 8)) fig.patch.set_facecolor('#0a0a0a') ax.set_facecolor('#0a0a0a') map_im = ax.imshow(np.zeros((MAP_SIZE_PIXELS, MAP_SIZE_PIXELS), dtype=np.uint8), cmap=cmap, origin='lower', vmin=0, vmax=255) robot_dot, = ax.plot([], [], 'co', markersize=5) path_line, = ax.plot([], [], 'c-', linewidth=1, alpha=0.7) ax.set_title("SLAM Map (Press Ctrl+C to save & exit)", color="white") ax.set_aspect('equal', adjustable='box') # 🚫 Remove this if running from the GUI if not RUNNING_IN_GUI: plt.show() # Only show in standalone # Show the plot window logger.info("βœ… Visualization initialized.") except Exception as e: logger.error(f"❌ Failed to initialize visualization: {e}") visualization_enabled = False logger.warning("⚠️ Running without visualization due to error.") else: visualization_enabled = True wall_follower = WallFollower(chassis) # --- Main Loop --- mapbytes = bytearray(MAP_SIZE_PIXELS * MAP_SIZE_PIXELS) pose = [0.0, 0.0, 0.0] # [x_mm, y_mm, theta_degrees] path = [] viz_center_x = CENTER viz_center_y = CENTER try: print("πŸ—ΊοΈ Mapping started. Move the bot around to map.") logger.info("πŸ—ΊοΈ Mapping started.") last_map_update_time = time.time() while True: loop_start_time = time.time() voltage = power.get_voltage() if voltage is not None: logger.debug(f"πŸ”‹ Voltage = {voltage:.2f} V") if not power.voltage_ok(): logger.warning("πŸ›‘ Voltage too low! Stopping motors.") chassis.reset_motors() time.sleep(5) continue try: scan_data, scan_angles = lidar.get_scan() if scan_data is None or scan_angles is None: logger.warning("⚠️ LIDAR scan failed or incomplete, skipping iteration.") time.sleep(0.1) continue except Exception as e: logger.error(f"❌ Error getting LIDAR scan: {e}") time.sleep(0.5) continue if not scan_data or len(scan_data) != sensor_description.scan_size: logger.warning(f"Received invalid scan data (length {len(scan_data)}), skipping SLAM update.") time.sleep(0.1) continue heading = None if imu: try: heading = imu.get_heading() except Exception as e: logger.warning(f"⚠️ Could not get IMU heading: {e}") try: slam.update(scan_data, scan_angles_degrees=scan_angles) except Exception as e: logger.error(f"❌ Error updating SLAM: {e}") time.sleep(0.5) continue try: pose = slam.getpos() except Exception as e: logger.error(f"❌ Error getting SLAM pose: {e}") #Let the wall follower take control try: logger.debug("πŸ“‘ About to call wall_follower.step()") wall_follower.step(scan_data) except Exception as e: logger.error(f"⚠️ Wall follower error: {e}") current_time = time.time() if current_time - last_map_update_time > 0.1: try: slam.getmap(mapbytes) map_np = np.reshape(mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS)) last_map_update_time = current_time if visualization_enabled: x_pix = int(pose[0] * MM_TO_PIXELS + viz_center_x) y_pix = int(pose[1] * MM_TO_PIXELS + viz_center_y) path.append((x_pix, y_pix)) max_path_points = 2000 if len(path) > max_path_points: path = path[-max_path_points:] path_x, path_y = zip(*path) if path else ([], []) map_im.set_data(map_np) robot_dot.set_data([x_pix], [y_pix]) path_line.set_data(path_x, path_y) plt.draw() plt.pause(0.001) except Exception as e: logger.error(f"❌ Error updating map/visualization: {e}") visualization_enabled = False loop_duration = time.time() - loop_start_time sleep_time = LIDAR_UPDATE_INTERVAL - loop_duration if sleep_time > 0: time.sleep(sleep_time) except KeyboardInterrupt: print("\nπŸ›‘ Mapping stopped by user.") logger.info("πŸ›‘ KeyboardInterrupt received, stopping mapping.") # Stop motors FIRST try: if 'wall_follower' in locals(): wall_follower.stop() elif 'chassis' in locals() and chassis: chassis.reset_motors() logger.info("βœ… Motors stopped.") except Exception as e: logger.warning(f"⚠️ Could not stop motors: {e}") # Save the map try: slam.getmap(mapbytes) final_map = np.reshape(mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS)) np.save("slam_occupancy_grid.npy", final_map) plt.imsave("slam_map_final.png", final_map, cmap=cmap, origin='lower') print("πŸ’Ύ Map saved to slam_map_final.png and slam_occupancy_grid.npy.") logger.info("πŸ’Ύ Map saved successfully.") except Exception as e: logger.error(f"❌ Failed to save final map: {e}") except Exception as e: logger.exception(f"🚨 An unexpected error occurred in the main loop: {e}") finally: print("🧹 Cleaning up...") logger.info("🧹 Cleaning up resources...") # Ensure motors are stopped try: if 'wall_follower' in locals(): wall_follower.stop() elif 'chassis' in locals() and chassis: chassis.reset_motors() logger.info("βœ… Motors stopped (from finally block).") except Exception as e: logger.warning(f"⚠️ Could not stop motors in finally: {e}") # Stop LIDAR if 'lidar' in locals() and lidar: try: lidar.stop() logger.info("βœ… LIDAR stopped.") except Exception as e: logger.error(f"❌ Error stopping LIDAR: {e}") # Shutdown visualization if visualization_enabled and 'plt' in locals(): try: if not RUNNING_IN_GUI: plt.ioff() logger.info("ℹ️ Visualization ended cleanly.") # Avoid showing the plot interactively except Exception as e: logger.error(f"❌ Error handling plot closure: {e}")