Inital commit
Last bit of the data from the bot
This commit is contained in:
commit
8adbe8a491
10 changed files with 1438 additions and 0 deletions
74
lidar_processor.py
Normal file
74
lidar_processor.py
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
import subprocess
|
||||
import signal
|
||||
import numpy as np
|
||||
from threading import Thread, Lock
|
||||
from breezyslam.algorithms import RMHC_SLAM
|
||||
from breezyslam.sensors import Laser
|
||||
|
||||
MAP_SIZE_PIXELS = 800
|
||||
MAP_SIZE_METERS = 8
|
||||
MM_TO_PIXELS = MAP_SIZE_PIXELS / (MAP_SIZE_METERS * 1000)
|
||||
CENTER = MAP_SIZE_PIXELS // 2
|
||||
|
||||
class RPLidarSensor(Laser):
|
||||
def __init__(self):
|
||||
super().__init__(scan_size=360, scan_rate_hz=5, detection_angle_degrees=360,
|
||||
distance_no_detection_mm=6000, offset_mm=0)
|
||||
|
||||
class LidarProcessor:
|
||||
def __init__(self):
|
||||
self.lock = Lock()
|
||||
self.scan = [0] * 360
|
||||
self.pose = [0, 0, 0]
|
||||
self.mapbytes = bytearray(MAP_SIZE_PIXELS * MAP_SIZE_PIXELS)
|
||||
self.map_array = np.zeros((MAP_SIZE_PIXELS, MAP_SIZE_PIXELS), dtype=np.uint8)
|
||||
|
||||
self.slam = RMHC_SLAM(RPLidarSensor(), MAP_SIZE_PIXELS, MAP_SIZE_METERS)
|
||||
self.angles_received = set()
|
||||
|
||||
self.proc = subprocess.Popen(
|
||||
['/home/milo/robot_gui/rplidar/rplidar_sdk/output/Linux/Release/ultra_simple',
|
||||
'--channel', '--serial', '/dev/ttyUSB0', '256000'],
|
||||
stdout=subprocess.PIPE, text=True
|
||||
)
|
||||
|
||||
self.thread = Thread(target=self._update_loop, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
def _update_loop(self):
|
||||
for line in self.proc.stdout:
|
||||
line = line.strip()
|
||||
if not line or ',' not in line:
|
||||
continue
|
||||
|
||||
try:
|
||||
angle, distance = map(float, line.split(','))
|
||||
i = int(angle)
|
||||
|
||||
if 0 <= i < 360:
|
||||
self.scan[i] = int(distance) if 100 < distance < 6000 else 0
|
||||
self.angles_received.add(i)
|
||||
|
||||
if len(self.angles_received) >= 360:
|
||||
self.slam.update(self.scan, self.pose)
|
||||
self.slam.getmap(self.mapbytes)
|
||||
|
||||
with self.lock:
|
||||
self.map_array = np.reshape(self.mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS)).copy()
|
||||
|
||||
self.scan = [0] * 360
|
||||
self.angles_received.clear()
|
||||
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
def get_data(self):
|
||||
with self.lock:
|
||||
return self.map_array.copy(), self.pose.copy()
|
||||
|
||||
def stop(self):
|
||||
self.proc.send_signal(signal.SIGINT)
|
||||
try:
|
||||
self.proc.wait(timeout=3)
|
||||
except subprocess.TimeoutExpired:
|
||||
self.proc.kill()
|
||||
146
lidar_reader_V2.py
Normal file
146
lidar_reader_V2.py
Normal file
|
|
@ -0,0 +1,146 @@
|
|||
# lidar_reader.py (Modified Again)
|
||||
|
||||
import time
|
||||
import subprocess
|
||||
import signal
|
||||
from logger import get_logger
|
||||
import math # Import math for float('nan') if preferred
|
||||
|
||||
logger = get_logger("lidar_reader")
|
||||
|
||||
# Define a constant for invalid/filtered readings
|
||||
INVALID_READING_VALUE = -1.0 # Or use float('nan') if your processing logic handles it
|
||||
|
||||
class LidarReader:
|
||||
def __init__(self, device='/dev/lidar', baud='256000'):
|
||||
logger.info(f"Initializing LIDAR on device {device} with baud rate {baud}")
|
||||
# Note: self.scan is now primarily managed within get_scan
|
||||
self.proc = None
|
||||
self.running = True
|
||||
|
||||
try:
|
||||
self.proc = subprocess.Popen(
|
||||
[
|
||||
'/home/poebot/RPLidar/libs/rplidar_sdk/output/Linux/Release/ultra_simple',
|
||||
'--channel', '--serial', device, baud
|
||||
],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, # Consider PIPE stderr too for logging errors
|
||||
universal_newlines=True,
|
||||
bufsize=1 # Line buffering
|
||||
)
|
||||
logger.info("✅ LIDAR subprocess started successfully.")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to start LIDAR subprocess: {e}")
|
||||
self.running = False # Ensure running flag is false if Popen fails
|
||||
raise
|
||||
|
||||
# Give the LIDAR process a moment to start up fully
|
||||
logger.debug("🌀 Letting LIDAR warm up for 1 second...")
|
||||
time.sleep(1.0) # Keep the warmup delay
|
||||
|
||||
def get_scan(self, timeout=2.0):
|
||||
"""
|
||||
Attempts to read a full 360-degree scan from the LIDAR subprocess.
|
||||
|
||||
Args:
|
||||
timeout (float): Maximum time in seconds to wait for a full scan.
|
||||
|
||||
Returns:
|
||||
tuple: (list[float], list[int]) containing the scan data and angles (0-359)
|
||||
if a full scan is received within the timeout. Returns (None, None)
|
||||
if a timeout occurs or the process stops.
|
||||
list[float]: Scan data where each index corresponds to the angle.
|
||||
INVALID_READING_VALUE indicates a filtered or missing reading.
|
||||
list[int]: List of angles [0, 1, ..., 359].
|
||||
"""
|
||||
if not self.running or self.proc is None or self.proc.poll() is not None:
|
||||
logger.warning("⚠️ Attempted to get scan but LIDAR process is not running.")
|
||||
return None, None
|
||||
|
||||
# Initialize scan list with the invalid value placeholder for this attempt
|
||||
current_scan = [INVALID_READING_VALUE] * 360
|
||||
angles_received_this_scan = set()
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
# Check if the process terminated unexpectedly
|
||||
if self.proc.poll() is not None:
|
||||
logger.error("❌ LIDAR subprocess terminated unexpectedly.")
|
||||
self.running = False
|
||||
return None, None
|
||||
|
||||
line = self.proc.stdout.readline()
|
||||
if not line:
|
||||
# No output ready, small sleep to prevent busy-waiting 100% CPU
|
||||
# Adjust sleep time based on expected data rate vs CPU usage.
|
||||
time.sleep(0.001)
|
||||
continue
|
||||
|
||||
line = line.strip()
|
||||
if ',' not in line:
|
||||
# logger.debug(f"Skipping line without comma: {line}") # Optional debug
|
||||
continue
|
||||
|
||||
try:
|
||||
angle_str, distance_str = line.split(',')
|
||||
angle = float(angle_str)
|
||||
distance = float(distance_str)
|
||||
|
||||
# Convert angle to integer index (0-359)
|
||||
# Consider rounding instead of truncating if angles are slightly off
|
||||
i = int(round(angle)) % 360 # Use round() and modulo for robustness
|
||||
|
||||
# Apply distance filter
|
||||
if 100 < distance < 6000:
|
||||
current_scan[i] = distance # Store as float for consistency
|
||||
else:
|
||||
# Already initialized to INVALID_READING_VALUE, but explicit is fine
|
||||
current_scan[i] = INVALID_READING_VALUE
|
||||
|
||||
angles_received_this_scan.add(i)
|
||||
|
||||
# Check if we have potentially received a full scan
|
||||
# --- THIS IS THE MODIFIED LINE ---
|
||||
# Changed >= 360 to >= 359
|
||||
if len(angles_received_this_scan) >= 359:
|
||||
# logger.debug(f"✅ Scan received ({len(angles_received_this_scan)} angles) in {time.time() - start_time:.3f}s") # Optional debug
|
||||
return current_scan, list(range(360))
|
||||
|
||||
except ValueError:
|
||||
# logger.debug(f"Skipping malformed line (ValueError): {line}") # Optional debug
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error processing line '{line}': {e}")
|
||||
continue
|
||||
|
||||
|
||||
# --- Timeout occurred ---
|
||||
logger.warning(f"⏳ Scan timeout after {timeout:.1f}s, received {len(angles_received_this_scan)}/360 angles.")
|
||||
# Return None to indicate failure to get a full scan
|
||||
return None, None
|
||||
|
||||
def stop(self):
|
||||
logger.info("🛑 Stopping LIDAR...")
|
||||
self.running = False # Signal get_scan loop to stop trying
|
||||
|
||||
if self.proc and self.proc.poll() is None: # Check if process exists and is running
|
||||
try:
|
||||
# Try interrupting gracefully first
|
||||
self.proc.send_signal(signal.SIGINT)
|
||||
self.proc.wait(timeout=3.0) # Wait 3 seconds
|
||||
logger.info("✅ LIDAR subprocess terminated successfully.")
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning("⚠️ LIDAR subprocess did not exit via SIGINT, killing...")
|
||||
self.proc.kill() # Force kill if it didn't respond
|
||||
try:
|
||||
# Wait a moment after kill to ensure it's gone
|
||||
self.proc.wait(timeout=1.0)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error("❌ Failed to confirm LIDAR subprocess killed.")
|
||||
logger.warning("⚠️ LIDAR subprocess killed due to timeout.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error during LIDAR stop: {e}")
|
||||
else:
|
||||
logger.info("✅ LIDAR subprocess already stopped or never started.")
|
||||
logger.info("✅ LIDAR stopped.")
|
||||
80
log.log
Normal file
80
log.log
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
[11:07:31] DEBUG: ✅ Loaded saved IMU calibration.
|
||||
[11:07:32] DEBUG: ✅ IMU initialized successfully.
|
||||
[11:07:32] INFO: Initializing LIDAR on device /dev/lidar with baud rate 256000
|
||||
[11:07:32] INFO: ✅ LIDAR subprocess started successfully.
|
||||
[11:07:32] DEBUG: 🌀 Letting LIDAR warm up for 1 second...
|
||||
[11:07:33] DEBUG: 🌀 Letting LIDAR warm up for 1 second...
|
||||
[11:07:34] DEBUG: ⏳ Waiting for first valid LIDAR scan...
|
||||
[11:07:36] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:36] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:36] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:38] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:38] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:38] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:40] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:40] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:40] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:42] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:42] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:42] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:44] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:44] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:44] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:48] DEBUG: ✅ Loaded saved IMU calibration.
|
||||
[11:07:49] DEBUG: ✅ IMU initialized successfully.
|
||||
[11:07:49] INFO: Initializing LIDAR on device /dev/lidar with baud rate 256000
|
||||
[11:07:49] INFO: ✅ LIDAR subprocess started successfully.
|
||||
[11:07:49] DEBUG: 🌀 Letting LIDAR warm up for 1 second...
|
||||
[11:07:50] DEBUG: 🌀 Letting LIDAR warm up for 1 second...
|
||||
[11:07:51] DEBUG: ⏳ Waiting for first valid LIDAR scan...
|
||||
[11:07:53] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:53] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:53] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:55] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:55] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:55] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:57] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:57] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:57] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:07:59] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:07:59] DEBUG: 👀 Current scan length: 0
|
||||
[11:07:59] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:02] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:02] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:02] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:04] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:04] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:04] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:06] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:06] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:06] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:08] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:08] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:08] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:10] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:10] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:10] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:12] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:12] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:12] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:15] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:15] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:15] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:17] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:17] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:17] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:19] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:19] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:19] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:21] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:21] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:21] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:23] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:23] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:23] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:25] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:25] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:25] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
[11:08:28] WARNING: ⏳ Scan timeout after 2.0s, received 359/360 angles.
|
||||
[11:08:28] DEBUG: 👀 Current scan length: 0
|
||||
[11:08:28] DEBUG: 🕰️ Incomplete scan, retrying...
|
||||
27
logger.py
Normal file
27
logger.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
# logger.py
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
LOG_FILENAME = 'log.log'
|
||||
|
||||
def get_logger(name="wayfinder"):
|
||||
logger = logging.getLogger(name)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
# Always remove existing handlers (hotfix for your current bug)
|
||||
if logger.hasHandlers():
|
||||
logger.handlers.clear()
|
||||
|
||||
file_handler = logging.FileHandler(LOG_FILENAME)
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
'[%(asctime)s] %(levelname)s: %(message)s',
|
||||
datefmt='%H:%M:%S'
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
|
||||
logger.addHandler(file_handler)
|
||||
|
||||
return logger
|
||||
303
main_V2.py
Normal file
303
main_V2.py
Normal file
|
|
@ -0,0 +1,303 @@
|
|||
# main.py (Complete Structure)
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
RUNNING_IN_GUI = os.getenv("RUNNING_IN_GUI") == "0"
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
#if os.getenv("RUNNING_IN_GUI") == "1":
|
||||
# plt.switch_backend('Agg')
|
||||
from matplotlib.colors import LinearSegmentedColormap
|
||||
import subprocess # Ensure subprocess is imported if LidarReader might need it indirectly (though it's self-contained)
|
||||
|
||||
|
||||
from lidar_reader import LidarReader
|
||||
from imu_reader import IMUReader
|
||||
|
||||
# Import necessary components from BreezySLAM
|
||||
from breezyslam.algorithms import RMHC_SLAM
|
||||
from breezyslam.sensors import Laser # <--- Import Laser base class
|
||||
|
||||
# Add the parent of TurboPi to the path
|
||||
sys.path.append('/home/poebot/RPLidar/libs/rplidar_sdk/app/ultra_simple/TurboPi/HiwonderSDK/')
|
||||
from mecanum import MecanumChassis
|
||||
|
||||
from power_monitor import PowerMonitor
|
||||
power = PowerMonitor()
|
||||
|
||||
|
||||
# Import logger if you have a separate logger setup
|
||||
# from logger import get_logger
|
||||
# logger = get_logger()
|
||||
# If not using a separate logger.py, configure basic logging:
|
||||
from logger import get_logger
|
||||
logger = get_logger()
|
||||
# Set higher level for noisy BreezySLAM logs if needed
|
||||
# logging.getLogger('breezyslam').setLevel(logging.WARNING)
|
||||
|
||||
chassis = MecanumChassis()
|
||||
|
||||
from wall_follower import WallFollower
|
||||
|
||||
# --- Constants ---
|
||||
MAP_SIZE_PIXELS = 800
|
||||
MAP_SIZE_METERS = 8
|
||||
# Calculate MM_TO_PIXELS correctly
|
||||
MM_TO_PIXELS = MAP_SIZE_PIXELS / (MAP_SIZE_METERS * 1000.0)
|
||||
CENTER = MAP_SIZE_PIXELS // 2
|
||||
LIDAR_UPDATE_INTERVAL = 0.1 # Optional: control loop frequency
|
||||
|
||||
# --- Sensor model ---
|
||||
# Describes the LIDAR characteristics to BreezySLAM
|
||||
class RPLidarSensor(Laser):
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
scan_size=360, # Number of measurements per scan
|
||||
scan_rate_hz=5, # Scan rate (adjust if known, affects some SLAM aspects)
|
||||
detection_angle_degrees=360, # Angular range of the scan
|
||||
distance_no_detection_mm=6000, # Max range in mm or value for no detection, prev 6k
|
||||
offset_mm=0 # Offset from robot center (if any)
|
||||
)
|
||||
# Should be a list of floats/ints from 0 to 359
|
||||
self.scan_angles_degrees = list(range(360))
|
||||
|
||||
# --- IMU Initialization ---
|
||||
try:
|
||||
imu = IMUReader()
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to initialize IMU: {e}")
|
||||
imu = None # Allow running without for testing
|
||||
|
||||
# --- LIDAR Initialization ---
|
||||
try:
|
||||
# Use the correct device path for your system
|
||||
lidar = LidarReader(device="/dev/lidar")
|
||||
|
||||
logger.debug("🌀 Letting LIDAR warm up for 1 second...")
|
||||
time.sleep(1.0)
|
||||
|
||||
logger.debug("⏳ Waiting for first valid LIDAR scan...")
|
||||
while True:
|
||||
scan, _ = lidar.get_scan()
|
||||
logger.debug(f"👀 Current scan length: {len(scan) if scan else 0}")
|
||||
if scan and len(scan) == 360:
|
||||
logger.debug("✅ First full LIDAR scan received. Proceeding with SLAM.")
|
||||
break
|
||||
else:
|
||||
logger.debug(f"🕰️ Incomplete scan, retrying...")
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to initialize LIDAR: {e}")
|
||||
exit() # Exit if LIDAR fails
|
||||
|
||||
|
||||
# --- SLAM Initialization ---
|
||||
try:
|
||||
# Create an instance of sensor description class
|
||||
sensor_description = RPLidarSensor()
|
||||
# Initialize SLAM with the sensor description, map size in pixels, and map size in meters
|
||||
slam = RMHC_SLAM(sensor_description, MAP_SIZE_PIXELS, MAP_SIZE_METERS)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to initialize SLAM: {e}")
|
||||
exit()
|
||||
|
||||
# --- Visualization setup ---
|
||||
try:
|
||||
cmap = LinearSegmentedColormap.from_list("redmap", ["#FFD700", "#4C566A"]) # Black to red
|
||||
|
||||
plt.ion()
|
||||
fig, ax = plt.subplots(figsize=(8, 8))
|
||||
fig.patch.set_facecolor('#0a0a0a')
|
||||
ax.set_facecolor('#0a0a0a')
|
||||
|
||||
map_im = ax.imshow(np.zeros((MAP_SIZE_PIXELS, MAP_SIZE_PIXELS), dtype=np.uint8),
|
||||
cmap=cmap, origin='lower', vmin=0, vmax=255)
|
||||
robot_dot, = ax.plot([], [], 'co', markersize=5)
|
||||
path_line, = ax.plot([], [], 'c-', linewidth=1, alpha=0.7)
|
||||
ax.set_title("SLAM Map (Press Ctrl+C to save & exit)", color="white")
|
||||
ax.set_aspect('equal', adjustable='box')
|
||||
|
||||
# 🚫 Remove this if running from the GUI
|
||||
if not RUNNING_IN_GUI:
|
||||
plt.show() # Only show in standalone
|
||||
|
||||
|
||||
|
||||
# Show the plot window
|
||||
logger.info("✅ Visualization initialized.")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to initialize visualization: {e}")
|
||||
visualization_enabled = False
|
||||
logger.warning("⚠️ Running without visualization due to error.")
|
||||
else:
|
||||
visualization_enabled = True
|
||||
|
||||
wall_follower = WallFollower(chassis)
|
||||
|
||||
# --- Main Loop ---
|
||||
mapbytes = bytearray(MAP_SIZE_PIXELS * MAP_SIZE_PIXELS)
|
||||
pose = [0.0, 0.0, 0.0] # [x_mm, y_mm, theta_degrees]
|
||||
path = []
|
||||
viz_center_x = CENTER
|
||||
viz_center_y = CENTER
|
||||
|
||||
|
||||
|
||||
try:
|
||||
print("🗺️ Mapping started. Move the bot around to map.")
|
||||
logger.info("🗺️ Mapping started.")
|
||||
last_map_update_time = time.time()
|
||||
|
||||
while True:
|
||||
loop_start_time = time.time()
|
||||
|
||||
voltage = power.get_voltage()
|
||||
if voltage is not None:
|
||||
logger.debug(f"🔋 Voltage = {voltage:.2f} V")
|
||||
if not power.voltage_ok():
|
||||
logger.warning("🛑 Voltage too low! Stopping motors.")
|
||||
chassis.reset_motors()
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
|
||||
try:
|
||||
scan_data, scan_angles = lidar.get_scan()
|
||||
|
||||
if scan_data is None or scan_angles is None:
|
||||
logger.warning("⚠️ LIDAR scan failed or incomplete, skipping iteration.")
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error getting LIDAR scan: {e}")
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
if not scan_data or len(scan_data) != sensor_description.scan_size:
|
||||
logger.warning(f"Received invalid scan data (length {len(scan_data)}), skipping SLAM update.")
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
heading = None
|
||||
if imu:
|
||||
try:
|
||||
heading = imu.get_heading()
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Could not get IMU heading: {e}")
|
||||
|
||||
try:
|
||||
slam.update(scan_data, scan_angles_degrees=scan_angles)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error updating SLAM: {e}")
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
try:
|
||||
pose = slam.getpos()
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error getting SLAM pose: {e}")
|
||||
|
||||
#Let the wall follower take control
|
||||
try:
|
||||
logger.debug("📡 About to call wall_follower.step()")
|
||||
wall_follower.step(scan_data)
|
||||
except Exception as e:
|
||||
logger.error(f"⚠️ Wall follower error: {e}")
|
||||
|
||||
current_time = time.time()
|
||||
if current_time - last_map_update_time > 0.1:
|
||||
try:
|
||||
slam.getmap(mapbytes)
|
||||
map_np = np.reshape(mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS))
|
||||
last_map_update_time = current_time
|
||||
|
||||
if visualization_enabled:
|
||||
x_pix = int(pose[0] * MM_TO_PIXELS + viz_center_x)
|
||||
y_pix = int(pose[1] * MM_TO_PIXELS + viz_center_y)
|
||||
|
||||
path.append((x_pix, y_pix))
|
||||
max_path_points = 2000
|
||||
if len(path) > max_path_points:
|
||||
path = path[-max_path_points:]
|
||||
path_x, path_y = zip(*path) if path else ([], [])
|
||||
|
||||
map_im.set_data(map_np)
|
||||
robot_dot.set_data([x_pix], [y_pix])
|
||||
path_line.set_data(path_x, path_y)
|
||||
|
||||
plt.draw()
|
||||
plt.pause(0.001)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error updating map/visualization: {e}")
|
||||
visualization_enabled = False
|
||||
|
||||
loop_duration = time.time() - loop_start_time
|
||||
sleep_time = LIDAR_UPDATE_INTERVAL - loop_duration
|
||||
if sleep_time > 0:
|
||||
time.sleep(sleep_time)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n🛑 Mapping stopped by user.")
|
||||
logger.info("🛑 KeyboardInterrupt received, stopping mapping.")
|
||||
|
||||
# Stop motors FIRST
|
||||
try:
|
||||
if 'wall_follower' in locals():
|
||||
wall_follower.stop()
|
||||
elif 'chassis' in locals() and chassis:
|
||||
chassis.reset_motors()
|
||||
logger.info("✅ Motors stopped.")
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Could not stop motors: {e}")
|
||||
|
||||
# Save the map
|
||||
try:
|
||||
slam.getmap(mapbytes)
|
||||
final_map = np.reshape(mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS))
|
||||
np.save("slam_occupancy_grid.npy", final_map)
|
||||
plt.imsave("slam_map_final.png", final_map, cmap=cmap, origin='lower')
|
||||
print("💾 Map saved to slam_map_final.png and slam_occupancy_grid.npy.")
|
||||
logger.info("💾 Map saved successfully.")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to save final map: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"🚨 An unexpected error occurred in the main loop: {e}")
|
||||
|
||||
finally:
|
||||
print("🧹 Cleaning up...")
|
||||
logger.info("🧹 Cleaning up resources...")
|
||||
|
||||
# Ensure motors are stopped
|
||||
try:
|
||||
if 'wall_follower' in locals():
|
||||
wall_follower.stop()
|
||||
elif 'chassis' in locals() and chassis:
|
||||
chassis.reset_motors()
|
||||
logger.info("✅ Motors stopped (from finally block).")
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Could not stop motors in finally: {e}")
|
||||
|
||||
# Stop LIDAR
|
||||
if 'lidar' in locals() and lidar:
|
||||
try:
|
||||
lidar.stop()
|
||||
logger.info("✅ LIDAR stopped.")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error stopping LIDAR: {e}")
|
||||
|
||||
# Shutdown visualization
|
||||
if visualization_enabled and 'plt' in locals():
|
||||
try:
|
||||
if not RUNNING_IN_GUI:
|
||||
plt.ioff()
|
||||
logger.info("ℹ️ Visualization ended cleanly.")
|
||||
# Avoid showing the plot interactively
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error handling plot closure: {e}")
|
||||
|
||||
|
||||
36
power_monitor.py
Normal file
36
power_monitor.py
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
# power_monitor.py
|
||||
|
||||
from HiwonderSDK.ros_robot_controller_sdk import Board
|
||||
import time
|
||||
from logger import get_logger
|
||||
|
||||
logger = get_logger("power_monitor")
|
||||
|
||||
class PowerMonitor:
|
||||
def __init__(self):
|
||||
self.board = Board()
|
||||
self.board.enable_reception()
|
||||
self.last_voltage = None
|
||||
self.last_check_time = 0
|
||||
self.cache_interval = 0.5 # seconds
|
||||
|
||||
def get_voltage(self):
|
||||
now = time.time()
|
||||
if self.last_voltage is not None and now - self.last_check_time < self.cache_interval:
|
||||
return self.last_voltage # return cached value
|
||||
raw = self.board.get_battery()
|
||||
if raw is None:
|
||||
logger.warning("⚠️ Failed to read battery voltage!")
|
||||
return None
|
||||
self.last_voltage = raw / 1000.0
|
||||
self.last_check_time = now
|
||||
return self.last_voltage
|
||||
|
||||
def voltage_ok(self, threshold=6.8):
|
||||
voltage = self.get_voltage()
|
||||
if voltage is None:
|
||||
return False
|
||||
if voltage < threshold:
|
||||
logger.warning(f"🔋 LOW VOLTAGE: {voltage:.2f}V! (Threshold: {threshold:.2f}V)")
|
||||
return False
|
||||
return True
|
||||
183
robot_gui.py
Normal file
183
robot_gui.py
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
import time
|
||||
import sys
|
||||
import os
|
||||
from PyQt6.QtWidgets import (
|
||||
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel,
|
||||
QFrame, QGridLayout, QSpacerItem, QSizePolicy
|
||||
)
|
||||
from PyQt6.QtGui import QPixmap, QImage, QFont
|
||||
from PyQt6.QtCore import Qt, QTimer
|
||||
import numpy as np
|
||||
import cv2
|
||||
from lidar_processor import LidarProcessor # Import LIDAR processing
|
||||
|
||||
def hex_to_bgr(hex_color):
|
||||
"""Convert HEX to OpenCV BGR"""
|
||||
hex_color = hex_color.lstrip("#")
|
||||
r, g, b = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))
|
||||
return (b, g, r)
|
||||
|
||||
class RobotGUI(QWidget):
|
||||
def __init__(self, lidar_processor):
|
||||
super().__init__()
|
||||
self.lidar_processor = lidar_processor
|
||||
self.robot_path = []
|
||||
|
||||
self.setWindowTitle("Robot Control GUI")
|
||||
self.setGeometry(100, 100, 800, 480)
|
||||
self.showFullScreen()
|
||||
|
||||
# Colors
|
||||
self.bg_color = hex_to_bgr("#2E3440")
|
||||
self.wall_color = hex_to_bgr("#FFD700")
|
||||
|
||||
# Layout
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
# Top Bar
|
||||
status_bar = QHBoxLayout()
|
||||
self.icon_label = QLabel("🤖")
|
||||
self.icon_label.setFont(QFont("Noto Color Emoji", 20))
|
||||
self.battery_label = QLabel("Battery: 100%")
|
||||
self.speed_label = QLabel("Speed: 0 m/s")
|
||||
self.power_label = QLabel("Power: 0W")
|
||||
for widget in [self.icon_label, self.battery_label, self.speed_label, self.power_label]:
|
||||
status_bar.addWidget(widget)
|
||||
layout.addLayout(status_bar)
|
||||
|
||||
# Main Content
|
||||
main_content = QHBoxLayout()
|
||||
layout.addLayout(main_content)
|
||||
|
||||
# Buttons
|
||||
button_container = QVBoxLayout()
|
||||
button_grid = QGridLayout()
|
||||
self.start_btn = QPushButton("Start")
|
||||
self.stop_btn = QPushButton("Stop")
|
||||
self.feature_btn = QPushButton("Feature")
|
||||
self.drift_btn = QPushButton("Drift")
|
||||
button_size = 150
|
||||
for btn in [self.start_btn, self.stop_btn, self.feature_btn, self.drift_btn]:
|
||||
btn.setObjectName("controlButton")
|
||||
btn.setMinimumSize(button_size, button_size)
|
||||
|
||||
button_grid.addWidget(self.start_btn, 0, 0)
|
||||
button_grid.addWidget(self.stop_btn, 0, 1)
|
||||
button_grid.addWidget(self.feature_btn, 1, 0)
|
||||
button_grid.addWidget(self.drift_btn, 1, 1)
|
||||
button_container.addSpacerItem(QSpacerItem(20, 40, QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Expanding))
|
||||
button_container.addLayout(button_grid)
|
||||
main_content.addLayout(button_container)
|
||||
|
||||
# Map Display
|
||||
self.map_label = QLabel()
|
||||
self.map_label.setObjectName("mapLabel")
|
||||
self.map_label.setFrameStyle(QFrame.Shape.Box)
|
||||
self.map_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
self.map_label.setMinimumSize(350, 350)
|
||||
main_content.addWidget(self.map_label)
|
||||
|
||||
# Timer
|
||||
self.timer = QTimer(self)
|
||||
self.timer.timeout.connect(self.update_map)
|
||||
self.timer.start(100)
|
||||
|
||||
self.show()
|
||||
|
||||
# Load stylesheet
|
||||
self.load_stylesheet()
|
||||
|
||||
def load_stylesheet(self):
|
||||
try:
|
||||
qss_path = os.path.join(os.path.dirname(__file__), "styles.qss")
|
||||
with open(qss_path, "r") as f:
|
||||
self.setStyleSheet(f.read())
|
||||
except FileNotFoundError:
|
||||
print("Warning: styles.qss not found. Running without stylesheet.")
|
||||
|
||||
def update_map(self):
|
||||
map_img, pose = self.lidar_processor.get_data()
|
||||
height, width = map_img.shape
|
||||
|
||||
# --- Colors ---
|
||||
nord_bgr = (64, 52, 46) # Nord Background (#2E3440)
|
||||
yellow_bgr = (0, 215, 255) # Cyberpunk Yellow (#FFD700)
|
||||
white_bgr = (255, 255, 255)
|
||||
green_bgr = (0, 255, 128) # Cyberpunk Green (custom pick)
|
||||
|
||||
# --- Start with yellow background ---
|
||||
color_map = np.full((height, width, 3), yellow_bgr, dtype=np.uint8)
|
||||
|
||||
# --- Threshold walls from map ---
|
||||
_, wall_mask = cv2.threshold(map_img, 100, 255, cv2.THRESH_BINARY)
|
||||
|
||||
# --- Apply Nord Background where wall mask is present
|
||||
color_map[wall_mask == 255] = nord_bgr
|
||||
|
||||
# --- Convert robot pose to pixel coordinates ---
|
||||
MM_TO_PIXELS = 100
|
||||
CENTER = height // 2
|
||||
x_pix = int(pose[0] * MM_TO_PIXELS + CENTER)
|
||||
y_pix = int(pose[1] * MM_TO_PIXELS + CENTER)
|
||||
|
||||
# Determine movement speed (basic)
|
||||
if len(self.robot_path) > 1:
|
||||
dx = self.robot_path[-1][0] - self.robot_path[-2][0]
|
||||
dy = self.robot_path[-1][1] - self.robot_path[-2][1]
|
||||
movement = np.hypot(dx, dy)
|
||||
else:
|
||||
movement = 0
|
||||
|
||||
# Breathing pulse (only if moving)
|
||||
base_radius = 6
|
||||
pulse_amplitude = 2 # how big the pulse gets
|
||||
pulse_speed = 2.0 # Hz
|
||||
|
||||
if movement > 1:
|
||||
# Animate based on time
|
||||
t = time.time()
|
||||
pulse = int(pulse_amplitude * (0.5 + 0.5 * np.sin(2 * np.pi * pulse_speed * t)))
|
||||
dot_radius = base_radius + pulse
|
||||
else:
|
||||
dot_radius = base_radius
|
||||
|
||||
# --- Draw robot as white dot ---
|
||||
cv2.circle(color_map, (x_pix, y_pix), dot_radius, white_bgr, -1)
|
||||
|
||||
# --- Trail logic ---
|
||||
self.robot_path.append((x_pix, y_pix))
|
||||
if len(self.robot_path) > 50:
|
||||
self.robot_path.pop(0)
|
||||
|
||||
# --- Draw fading green trail ---
|
||||
for i in range(1, len(self.robot_path)):
|
||||
alpha = int(255 * (i / len(self.robot_path))) # Fade effect
|
||||
trail_color = (
|
||||
int(green_bgr[0] * (alpha / 255)),
|
||||
int(green_bgr[1] * (alpha / 255)),
|
||||
int(green_bgr[2] * (alpha / 255))
|
||||
)
|
||||
cv2.line(color_map, self.robot_path[i - 1], self.robot_path[i], trail_color, 1)
|
||||
|
||||
# --- Convert to RGB for Qt display ---
|
||||
rgb_image = cv2.cvtColor(color_map, cv2.COLOR_BGR2RGB)
|
||||
qimg = QImage(rgb_image.data, width, height, width * 3, QImage.Format.Format_RGB888)
|
||||
self.map_label.setPixmap(QPixmap.fromImage(qimg))
|
||||
|
||||
def close_application(self):
|
||||
print("🔌 Stopping LIDAR and closing GUI...")
|
||||
self.timer.stop()
|
||||
self.lidar_processor.stop()
|
||||
self.close()
|
||||
sys.exit()
|
||||
|
||||
def closeEvent(self, event):
|
||||
self.close_application()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = QApplication(sys.argv)
|
||||
lidar_processor = LidarProcessor()
|
||||
window = RobotGUI(lidar_processor)
|
||||
sys.exit(app.exec())
|
||||
32
styles.qss
Normal file
32
styles.qss
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
/* Set the background color */
|
||||
QWidget {
|
||||
background-color: #2E3440;
|
||||
}
|
||||
|
||||
/* Style buttons */
|
||||
QPushButton#controlButton {
|
||||
background-color: #4C566A;
|
||||
color: white;
|
||||
border-radius: 10px;
|
||||
font-size: 18px;
|
||||
padding: 10px;
|
||||
}
|
||||
QPushButton#controlButton:hover {
|
||||
background-color: #5E81AC;
|
||||
}
|
||||
QPushButton#controlButton:pressed {
|
||||
background-color: #88C0D0;
|
||||
}
|
||||
|
||||
/* Style Labels */
|
||||
QLabel {
|
||||
color: white;
|
||||
font-size: 16px;
|
||||
}
|
||||
|
||||
/* Style Map Display */
|
||||
QLabel#mapLabel {
|
||||
background-color: #3B4252;
|
||||
border: 2px solid #5E81AC;
|
||||
border-radius: 10px;
|
||||
}
|
||||
284
wall_follower.py
Normal file
284
wall_follower.py
Normal file
|
|
@ -0,0 +1,284 @@
|
|||
import time
|
||||
from logger import get_logger
|
||||
|
||||
# Constants for state management
|
||||
STATE_SEARCHING = 0
|
||||
STATE_ALIGNING = 1
|
||||
STATE_FOLLOWING = 2
|
||||
STATE_RIGHT_TURNING = 3
|
||||
STATE_LEFT_TURNING = 4
|
||||
|
||||
|
||||
# alignment parameters
|
||||
DESIRED_DISTANCE_FROM_WALL = 280 # mm (200 seems to work very well)
|
||||
SIDE_ALIGNMENT_TOLERANCE = 20 # mm (for side distance correction)
|
||||
ALIGNMENT_ERROR_THRESHOLD = 50 # mm (for angle correction)
|
||||
STRAFE_SPEED = 30 # mm/s
|
||||
CORRECTION_AMOUNT = 2 # direction only, gets clamped anyway
|
||||
|
||||
# Front obstacle detection
|
||||
FRONT_OBSTACLE_THRESHOLD = 500 # mm
|
||||
CORRECTION_TIME = 0.5 # seconds (alignment timing)
|
||||
FORWARD_SPEED = 30 # mm/s
|
||||
|
||||
# Right turn params
|
||||
RIGHT_TURN_THRESHOLD = 800 # mm
|
||||
RIGHT_TURN_SPEED = 30 # mm/s
|
||||
TURN_RESET_DISTANCE = 800 # mm
|
||||
|
||||
logger = get_logger("wall_follower")
|
||||
|
||||
# 💫 Rotation helper using dumb timing
|
||||
def rotate_degrees(chassis, degrees):
|
||||
DEG_PER_SEC = 90 # confirmed from tuning test
|
||||
duration = abs(degrees) / DEG_PER_SEC
|
||||
direction = 1 if degrees > 0 else -1
|
||||
logger.info(f"🔁 Rotating {degrees}° ({'CW' if direction > 0 else 'CCW'}) for {duration:.2f}s")
|
||||
chassis.set_velocity(0, 0, direction)
|
||||
time.sleep(duration)
|
||||
chassis.reset_motors()
|
||||
|
||||
def sanitize_reading(val):
|
||||
return val if isinstance(val, (int, float)) and val > 0 else 0
|
||||
|
||||
class WallFollower:
|
||||
def __init__(self, chassis):
|
||||
self.chassis = chassis
|
||||
self.state = STATE_SEARCHING
|
||||
self.last_alignment_time = time.time()
|
||||
self.realign_interval = 5.0 # seconds between realignment checks
|
||||
self._turning_phase_done = False # Used to track turn phases
|
||||
self.last_correction_time = 0
|
||||
self.alignment_cooldown = 0.3 # seconds
|
||||
|
||||
def stop(self):
|
||||
logger.info("🛑 WallFollower stopping motors.")
|
||||
self.chassis.reset_motors()
|
||||
|
||||
def step(self, scan):
|
||||
front = sanitize_reading(scan[0]) #
|
||||
side = sanitize_reading(scan[90]) #
|
||||
diag_front = sanitize_reading(scan[60]) #
|
||||
diag_back = sanitize_reading(scan[120]) #
|
||||
|
||||
|
||||
|
||||
if self.state == STATE_SEARCHING:
|
||||
self._searching(side)
|
||||
elif self.state == STATE_ALIGNING:
|
||||
self._aligning(side, diag_front, diag_back)
|
||||
elif self.state == STATE_FOLLOWING:
|
||||
self._following(front, diag_front, diag_back, side)
|
||||
elif self.state == STATE_RIGHT_TURNING:
|
||||
self._right_turning(diag_front, side)
|
||||
elif self.state == STATE_LEFT_TURNING:
|
||||
self._left_turning()
|
||||
|
||||
def _searching(self, side_distance):
|
||||
logger.info("🔍 SEARCHING for wall...")
|
||||
|
||||
# Immediately transition to ALIGNING state for development/testing
|
||||
logger.info("✅ Skipping search — switching to ALIGNING mode")
|
||||
self.state = STATE_ALIGNING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
def _aligning(self, side_distance, dist_45, dist_135):
|
||||
|
||||
logger.info("📐 ALIGNING to wall...")
|
||||
|
||||
# Safety check
|
||||
if side_distance == 0 or dist_45 == 0 or dist_135 == 0:
|
||||
logger.warning("⚠️ Invalid LIDAR reading(s). Holding position.")
|
||||
#self.chassis.reset_motors()
|
||||
return
|
||||
|
||||
# Calculate errors
|
||||
angle_error = dist_45 - dist_135
|
||||
distance_error = side_distance - DESIRED_DISTANCE_FROM_WALL
|
||||
|
||||
# 🔄 Step 1: Correct angle first
|
||||
if abs(angle_error) > ALIGNMENT_ERROR_THRESHOLD:
|
||||
logger.info(f"🔄 Alignment error: {angle_error:.1f} mm")
|
||||
|
||||
P_GAIN = 0.1
|
||||
correction_degrees = P_GAIN * angle_error
|
||||
correction_degrees = max(min(correction_degrees, 2), -2)
|
||||
|
||||
logger.debug(f"🎯 Rotating by {correction_degrees:.1f}° for alignment")
|
||||
rotate_degrees(self.chassis, correction_degrees)
|
||||
time.sleep(0.1)
|
||||
return
|
||||
|
||||
# ↔️ Step 2: Then fix distance to wall
|
||||
if abs(distance_error) > SIDE_ALIGNMENT_TOLERANCE:
|
||||
if distance_error > 0:
|
||||
logger.debug("➡️ Strafe Right to correct wall distance")
|
||||
logger.debug(f"📏 Distance: {side_distance:.1f} mm")
|
||||
logger.debug(f"📏 Distance error: {distance_error:.1f} mm")
|
||||
self.chassis.set_velocity(STRAFE_SPEED, 0, 0)
|
||||
else:
|
||||
logger.debug("⬅️ Strafe Left to correct wall distance")
|
||||
self.chassis.set_velocity(-STRAFE_SPEED, 0, 0)
|
||||
time.sleep(0.3)
|
||||
self.chassis.reset_motors()
|
||||
time.sleep(0.2)
|
||||
return
|
||||
|
||||
# ✅ If both are good
|
||||
logger.info("✅ Aligned successfully — switching to FOLLOWING mode")
|
||||
self.state = STATE_FOLLOWING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
|
||||
# ✅ If both are good
|
||||
logger.info("✅ Aligned successfully — switching to FOLLOWING mode")
|
||||
self.state = STATE_FOLLOWING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
def _following(self, front_distance, dist_45, dist_135, side):
|
||||
|
||||
#time.sleep(1.0) # Allow LIDAR to settle
|
||||
|
||||
logger.info("🚧 FOLLOWING state: driving forward...")
|
||||
|
||||
# 🧭 Check for possible left-hand turn
|
||||
if 0 < front_distance < FRONT_OBSTACLE_THRESHOLD:
|
||||
logger.info("🧭 Potential left turn detected — entering LEFT_TURNING state")
|
||||
logger.debug(f"📏 Front distance: {front_distance:.1f} mm")
|
||||
self.state = STATE_LEFT_TURNING
|
||||
return
|
||||
|
||||
# 🔁 Timed angle + distance correction
|
||||
if time.time() - self.last_alignment_time > CORRECTION_TIME:
|
||||
if dist_45 > 0 and dist_135 > 0 and side > 0:
|
||||
angle_error = dist_45 - dist_135
|
||||
distance_error = side - DESIRED_DISTANCE_FROM_WALL
|
||||
|
||||
logger.info(f"🛠️ Periodic corrections | Angle error: {angle_error:.1f} mm | Distance error: {distance_error:.1f} mm")
|
||||
logger.debug(f"📏 dist_45 = {dist_45} | dist_135 = {dist_135} | side = {side}")
|
||||
|
||||
# 📐 Angle correction (rotation)
|
||||
if abs(angle_error) > ALIGNMENT_ERROR_THRESHOLD:
|
||||
P_GAIN = 0.1
|
||||
correction_degrees = max(min(P_GAIN * angle_error, 5), -5)
|
||||
logger.debug(f"🎯 FOLLOWING correction: rotating by {correction_degrees:.2f}°")
|
||||
rotate_degrees(self.chassis, correction_degrees)
|
||||
|
||||
# ↔️ Distance correction: strafe to maintain target distance
|
||||
if abs(distance_error) > SIDE_ALIGNMENT_TOLERANCE:
|
||||
if distance_error > 0:
|
||||
logger.debug("📏 FOLLOWING correction: strafing right (too far from wall)")
|
||||
self.chassis.set_velocity(STRAFE_SPEED, 0, 0)
|
||||
else:
|
||||
logger.debug("📏 FOLLOWING correction: strafing left (too close to wall)")
|
||||
self.chassis.set_velocity(-STRAFE_SPEED, 0, 0)
|
||||
|
||||
time.sleep(0.2)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
# 🧭 Check for possible right-hand turn
|
||||
if dist_45 > RIGHT_TURN_THRESHOLD and front_distance > FRONT_OBSTACLE_THRESHOLD:
|
||||
logger.info("🧭 Potential right turn detected — entering RIGHT_TURNING state")
|
||||
self.state = STATE_RIGHT_TURNING
|
||||
self.turn_stage = "checking_forward"
|
||||
return
|
||||
|
||||
#if 0 < front_distance < FRONT_OBSTACLE_THRESHOLD:
|
||||
# logger.warning("🛑 Obstacle detected ahead! Stopping.")
|
||||
# self.chassis.reset_motors()
|
||||
# return
|
||||
|
||||
# 🚗 Continue forward motion
|
||||
logger.debug("🚗 Continuing forward after (or without) correction.")
|
||||
self.chassis.set_velocity(FORWARD_SPEED, 90, 0)
|
||||
|
||||
def _right_turning(self, dist_45, dist_90):
|
||||
|
||||
#time.sleep(1.0) # Allow LIDAR to settle
|
||||
"""
|
||||
State 4: RIGHT_TURNING
|
||||
Handles logic for detecting and confirming a 90-degree right-hand turn.
|
||||
The robot first reverses until the 45° distance returns to normal,
|
||||
then moves forward slowly until both 45° and 90° are large,
|
||||
confirming a valid turn. It stops just past the corner.
|
||||
"""
|
||||
logger.info("↪️ RIGHT_TURNING state: verifying turn...")
|
||||
"""
|
||||
if self.turn_stage == "backing_up":
|
||||
logger.debug("🔙 Backing up slowly to detect return of wall...")
|
||||
self.chassis.set_velocity(30, 270, 0) # reverse slowly
|
||||
|
||||
if 0 < dist_45 < TURN_RESET_DISTANCE:
|
||||
logger.info("🧱 Wall detected again at 45° — switching to forward check")
|
||||
self.turn_stage = "checking_forward"
|
||||
time.sleep(0.5)
|
||||
"""
|
||||
|
||||
"""
|
||||
if self.turn_stage == "checking_forward":
|
||||
logger.debug("🚶♂️ Moving forward slowly to confirm turn...")
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 90, 0)
|
||||
|
||||
logger.debug(f"📈 dist_45 = {dist_45} | dist_90 = {dist_90}")
|
||||
#if dist_45 > RIGHT_TURN_THRESHOLD and dist_90 > RIGHT_TURN_THRESHOLD:
|
||||
if dist_45 > RIGHT_TURN_THRESHOLD:
|
||||
logger.info("✅ Verified right-hand turn — initiating reposition sequence")
|
||||
"""
|
||||
|
||||
# Step 1: Drive forward to clear the corner
|
||||
logger.debug("🚶♂️ Moving forward to clear corner...")
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 90, 0)
|
||||
time.sleep(0.25)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
# Step 2: Rotate 90° to face next wall
|
||||
logger.debug("🔁 Rotating 90° (CW) to face new wall")
|
||||
rotate_degrees(self.chassis, 70)
|
||||
|
||||
# Step 3: Drive forward into new alignment zone
|
||||
logger.debug("🚶♂️ Moving forward to align with new wall")
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 90, 0)
|
||||
time.sleep(3.0)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
# After repositioning
|
||||
time.sleep(2.0) # Let LIDAR settle
|
||||
|
||||
# Step 4: Switch to ALIGNING
|
||||
logger.info("📐 Reposition complete — switching to ALIGNING state")
|
||||
self.state = STATE_FOLLOWING # was previously set to STATE_ALIGNING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
def _left_turning(self):
|
||||
#time.sleep(1.0) # Allow LIDAR to settle
|
||||
"""
|
||||
STATE_LEFT_TURNING — reverses, rotates left, then aligns.
|
||||
"""
|
||||
logger.info("↩️ LEFT_TURNING: initiating left turn sequence")
|
||||
|
||||
# Step 1: Backup to clear corner
|
||||
logger.debug("🔙 Reversing slightly before turning")
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 270, 0)
|
||||
time.sleep(0.5)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
# Step 2: Rotate left (CCW) with extra compensation
|
||||
logger.info("🔁 Rotating -90° (CCW) to face new wall")
|
||||
rotate_degrees(self.chassis, -90)
|
||||
|
||||
#step3: Move foward to avoid angle errors
|
||||
logger.debug("🚘 Move foward to avoid angle errors")
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 90, 0)
|
||||
time.sleep(1.0)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
# Step 3: Pause to let LIDAR stabilize
|
||||
logger.debug("⏸️ Pausing briefly after turn...")
|
||||
#time.sleep(1.0)
|
||||
|
||||
# Step 4: Switch to ALIGNING
|
||||
logger.info("📐 Left turn complete — switching to ALIGNING")
|
||||
self.state = STATE_FOLLOWING # was previously set to STATE_ALIGNING
|
||||
#self.last_alignment_time = time.time()
|
||||
273
wall_follower_borked.py
Normal file
273
wall_follower_borked.py
Normal file
|
|
@ -0,0 +1,273 @@
|
|||
import time
|
||||
from logger import get_logger
|
||||
|
||||
# Constants for state management
|
||||
STATE_SEARCHING = 0
|
||||
STATE_ALIGNING = 1
|
||||
STATE_FOLLOWING = 2
|
||||
STATE_RIGHT_TURNING = 3
|
||||
STATE_LEFT_TURNING = 4
|
||||
|
||||
|
||||
# alignment parameters
|
||||
DESIRED_DISTANCE_FROM_WALL = 250 # mm (200 seems to work very well)
|
||||
SIDE_ALIGNMENT_TOLERANCE = 20 # mm (for side distance correction)
|
||||
ALIGNMENT_ERROR_THRESHOLD = 50 # mm (for angle correction)
|
||||
STRAFE_SPEED = 30 # mm/s
|
||||
CORRECTION_AMOUNT = 2 # direction only, gets clamped anyway
|
||||
|
||||
# Front obstacle detection
|
||||
FRONT_OBSTACLE_THRESHOLD = 400 # mm
|
||||
CORRECTION_TIME = 1.0 # seconds (alignment timing)
|
||||
FORWARD_SPEED = 30 # mm/s
|
||||
|
||||
# Right turn params
|
||||
RIGHT_TURN_THRESHOLD = 500 # mm
|
||||
RIGHT_TURN_SPEED = 30 # mm/s
|
||||
TURN_RESET_DISTANCE = 500 # mm
|
||||
|
||||
logger = get_logger("wall_follower")
|
||||
|
||||
# 💫 Rotation helper using dumb timing
|
||||
def rotate_degrees(chassis, degrees):
|
||||
DEG_PER_SEC = 90 # confirmed from tuning test
|
||||
duration = abs(degrees) / DEG_PER_SEC
|
||||
direction = 1 if degrees > 0 else -1
|
||||
logger.info(f"🔁 Rotating {degrees}° ({'CW' if direction > 0 else 'CCW'}) for {duration:.2f}s")
|
||||
chassis.set_velocity(0, 0, direction)
|
||||
time.sleep(duration)
|
||||
chassis.reset_motors()
|
||||
|
||||
def sanitize_reading(val):
|
||||
return val if isinstance(val, (int, float)) and val > 0 else 0
|
||||
|
||||
class WallFollower:
|
||||
def __init__(self, chassis):
|
||||
self.chassis = chassis
|
||||
self.state = STATE_SEARCHING
|
||||
self.last_alignment_time = time.time()
|
||||
self.realign_interval = 5.0 # seconds between realignment checks
|
||||
self._turning_phase_done = False # Used to track turn phases
|
||||
self.last_correction_time = 0
|
||||
self.alignment_cooldown = 0.3 # seconds
|
||||
|
||||
def stop(self):
|
||||
logger.info("🛑 WallFollower stopping motors.")
|
||||
self.chassis.reset_motors()
|
||||
|
||||
def step(self, scan):
|
||||
front = sanitize_reading(scan[0]) #
|
||||
side = sanitize_reading(scan[90]) #
|
||||
diag_front = sanitize_reading(scan[45]) #
|
||||
diag_back = sanitize_reading(scan[135]) #
|
||||
|
||||
|
||||
|
||||
if self.state == STATE_SEARCHING:
|
||||
self._searching(side)
|
||||
elif self.state == STATE_ALIGNING:
|
||||
self._aligning(side, diag_front, diag_back)
|
||||
elif self.state == STATE_FOLLOWING:
|
||||
self._following(front, diag_front, diag_back, side)
|
||||
elif self.state == STATE_RIGHT_TURNING:
|
||||
self._right_turning(diag_front, side)
|
||||
elif self.state == STATE_LEFT_TURNING:
|
||||
self._left_turning()
|
||||
|
||||
|
||||
def _searching(self, side_distance):
|
||||
logger.info("🔍 SEARCHING for wall...")
|
||||
|
||||
# Immediately transition to ALIGNING state for development/testing
|
||||
logger.info("✅ Skipping search — switching to ALIGNING mode")
|
||||
self.state = STATE_ALIGNING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
def _aligning(self, side_distance, dist_45, dist_135):
|
||||
|
||||
logger.info("📐 ALIGNING to wall...")
|
||||
|
||||
# Safety check
|
||||
if side_distance == 0 or dist_45 == 0 or dist_135 == 0:
|
||||
logger.warning("⚠️ Invalid LIDAR reading(s). Holding position.")
|
||||
#self.chassis.reset_motors()
|
||||
return
|
||||
|
||||
# Calculate errors
|
||||
angle_error = dist_45 - dist_135
|
||||
distance_error = side_distance - DESIRED_DISTANCE_FROM_WALL
|
||||
|
||||
# 🔄 Step 1: Correct angle first
|
||||
if abs(angle_error) > ALIGNMENT_ERROR_THRESHOLD:
|
||||
logger.info(f"🔄 Alignment error: {angle_error:.1f} mm")
|
||||
|
||||
P_GAIN = 0.1
|
||||
correction_degrees = P_GAIN * angle_error
|
||||
correction_degrees = max(min(correction_degrees, 2), -2)
|
||||
|
||||
logger.debug(f"🎯 Rotating by {correction_degrees:.1f}° for alignment")
|
||||
rotate_degrees(self.chassis, correction_degrees)
|
||||
time.sleep(0.1)
|
||||
return
|
||||
|
||||
# ↔️ Step 2: Then fix distance to wall
|
||||
if abs(distance_error) > SIDE_ALIGNMENT_TOLERANCE:
|
||||
if distance_error > 0:
|
||||
logger.debug("➡️ Strafe Right to correct wall distance")
|
||||
logger.debug(f"📏 Distance: {side_distance:.1f} mm")
|
||||
logger.debug(f"📏 Distance error: {distance_error:.1f} mm")
|
||||
self.chassis.set_velocity(STRAFE_SPEED, 0, 0)
|
||||
else:
|
||||
logger.debug("⬅️ Strafe Left to correct wall distance")
|
||||
self.chassis.set_velocity(-STRAFE_SPEED, 0, 0)
|
||||
time.sleep(0.3)
|
||||
self.chassis.reset_motors()
|
||||
time.sleep(0.2)
|
||||
return
|
||||
|
||||
# ✅ If both are good
|
||||
logger.info("✅ Aligned successfully — switching to FOLLOWING mode")
|
||||
self.state = STATE_FOLLOWING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
|
||||
# ✅ If both are good
|
||||
logger.info("✅ Aligned successfully — switching to FOLLOWING mode")
|
||||
self.state = STATE_FOLLOWING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
def _following(self, front_distance, dist_45, dist_135, side):
|
||||
|
||||
time.sleep(1.0) # Allow LIDAR to settle
|
||||
|
||||
logger.info("🚧 FOLLOWING state: driving forward...")
|
||||
|
||||
# 🧭 Check for possible left-hand turn
|
||||
if 0 < front_distance < FRONT_OBSTACLE_THRESHOLD:
|
||||
logger.info("🧭 Potential left turn detected — entering LEFT_TURNING state")
|
||||
logger.debug(f"📏 Front distance: {front_distance:.1f} mm")
|
||||
self.state = STATE_LEFT_TURNING
|
||||
return
|
||||
|
||||
# 🔁 Timed angle + distance correction
|
||||
if time.time() - self.last_alignment_time > CORRECTION_TIME:
|
||||
if dist_45 > 0 and dist_135 > 0 and side > 0:
|
||||
angle_error = dist_45 - dist_135
|
||||
distance_error = side - DESIRED_DISTANCE_FROM_WALL
|
||||
|
||||
logger.info(f"🛠️ Periodic corrections | Angle error: {angle_error:.1f} mm | Distance error: {distance_error:.1f} mm")
|
||||
logger.debug(f"📏 dist_45 = {dist_45} | dist_135 = {dist_135} | side = {side}")
|
||||
|
||||
# 📐 Angle correction (rotation)
|
||||
if abs(angle_error) > ALIGNMENT_ERROR_THRESHOLD:
|
||||
P_GAIN = 0.05
|
||||
correction_degrees = max(min(P_GAIN * angle_error, 5), -5)
|
||||
logger.debug(f"🎯 FOLLOWING correction: rotating by {correction_degrees:.2f}°")
|
||||
rotate_degrees(self.chassis, correction_degrees)
|
||||
|
||||
# ➡️ Distance correction: strafe right if too far from wall
|
||||
if distance_error > SIDE_ALIGNMENT_TOLERANCE:
|
||||
logger.debug("📏 FOLLOWING correction: strafing right to re-approach wall")
|
||||
self.chassis.set_velocity(STRAFE_SPEED, 0, 0)
|
||||
time.sleep(0.2)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
# 🧭 Check for possible right-hand turn
|
||||
if dist_45 > RIGHT_TURN_THRESHOLD and front_distance > FRONT_OBSTACLE_THRESHOLD:
|
||||
logger.info("🧭 Potential right turn detected — entering RIGHT_TURNING state")
|
||||
self.state = STATE_RIGHT_TURNING
|
||||
self.turn_stage = "checking_forward"
|
||||
return
|
||||
|
||||
#if 0 < front_distance < FRONT_OBSTACLE_THRESHOLD:
|
||||
# logger.warning("🛑 Obstacle detected ahead! Stopping.")
|
||||
# self.chassis.reset_motors()
|
||||
# return
|
||||
|
||||
# 🚗 Continue forward motion
|
||||
logger.debug("🚗 Continuing forward after (or without) correction.")
|
||||
self.chassis.set_velocity(FORWARD_SPEED, 90, 0)
|
||||
|
||||
|
||||
|
||||
def _right_turning(self, dist_45, dist_90):
|
||||
|
||||
time.sleep(1.0) # Allow LIDAR to settle
|
||||
"""
|
||||
State 4: RIGHT_TURNING
|
||||
Handles logic for detecting and confirming a 90-degree right-hand turn.
|
||||
The robot first reverses until the 45° distance returns to normal,
|
||||
then moves forward slowly until both 45° and 90° are large,
|
||||
confirming a valid turn. It stops just past the corner.
|
||||
"""
|
||||
logger.info("↪️ RIGHT_TURNING state: verifying turn...")
|
||||
"""
|
||||
if self.turn_stage == "backing_up":
|
||||
logger.debug("🔙 Backing up slowly to detect return of wall...")
|
||||
self.chassis.set_velocity(30, 270, 0) # reverse slowly
|
||||
|
||||
if 0 < dist_45 < TURN_RESET_DISTANCE:
|
||||
logger.info("🧱 Wall detected again at 45° — switching to forward check")
|
||||
self.turn_stage = "checking_forward"
|
||||
time.sleep(0.5)
|
||||
"""
|
||||
|
||||
"""
|
||||
if self.turn_stage == "checking_forward":
|
||||
logger.debug("🚶♂️ Moving forward slowly to confirm turn...")
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 90, 0)
|
||||
|
||||
logger.debug(f"📈 dist_45 = {dist_45} | dist_90 = {dist_90}")
|
||||
#if dist_45 > RIGHT_TURN_THRESHOLD and dist_90 > RIGHT_TURN_THRESHOLD:
|
||||
if dist_45 > RIGHT_TURN_THRESHOLD:
|
||||
logger.info("✅ Verified right-hand turn — initiating reposition sequence")
|
||||
"""
|
||||
|
||||
# Step 1: Drive forward to clear the corner
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 90, 0)
|
||||
time.sleep(0.25)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
# Step 2: Rotate 90° to face next wall
|
||||
rotate_degrees(self.chassis, 70)
|
||||
|
||||
# Step 3: Drive forward into new alignment zone
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 90, 0)
|
||||
time.sleep(3.0)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
# After repositioning
|
||||
time.sleep(2.0) # Let LIDAR settle
|
||||
|
||||
# Step 4: Switch to ALIGNING
|
||||
logger.info("📐 Reposition complete — switching to ALIGNING state")
|
||||
self.state = STATE_ALIGNING # was previously set to STATE_ALIGNING
|
||||
self.last_alignment_time = time.time()
|
||||
|
||||
def _left_turning(self):
|
||||
time.sleep(1.0) # Allow LIDAR to settle
|
||||
"""
|
||||
STATE_LEFT_TURNING — reverses, rotates left, then aligns.
|
||||
"""
|
||||
logger.info("↩️ LEFT_TURNING: initiating left turn sequence")
|
||||
|
||||
# Step 1: Backup to clear corner
|
||||
logger.debug("🔙 Reversing slightly before turning")
|
||||
self.chassis.set_velocity(RIGHT_TURN_SPEED, 270, 0)
|
||||
time.sleep(1.0)
|
||||
self.chassis.reset_motors()
|
||||
|
||||
# Step 2: Rotate left (CCW) with extra compensation
|
||||
logger.info("🔁 Rotating -100° (CCW) to face new wall")
|
||||
rotate_degrees(self.chassis, -95)
|
||||
|
||||
# Step 3: Pause to let LIDAR stabilize
|
||||
logger.debug("⏸️ Pausing briefly after turn...")
|
||||
time.sleep(2.5)
|
||||
|
||||
# Step 4: Switch to ALIGNING
|
||||
logger.info("📐 Left turn complete — switching to ALIGNING")
|
||||
self.state = STATE_FOLLOWING # was previously set to STATE_ALIGNING
|
||||
self.last_alignment_time = time.time()
|
||||
Loading…
Reference in a new issue