Wayfinder/lidar_processor.py

75 lines
2.4 KiB
Python
Raw Normal View History

import subprocess
import signal
import numpy as np
from threading import Thread, Lock
from breezyslam.algorithms import RMHC_SLAM
from breezyslam.sensors import Laser
MAP_SIZE_PIXELS = 800
MAP_SIZE_METERS = 8
MM_TO_PIXELS = MAP_SIZE_PIXELS / (MAP_SIZE_METERS * 1000)
CENTER = MAP_SIZE_PIXELS // 2
class RPLidarSensor(Laser):
def __init__(self):
super().__init__(scan_size=360, scan_rate_hz=5, detection_angle_degrees=360,
distance_no_detection_mm=6000, offset_mm=0)
class LidarProcessor:
def __init__(self):
self.lock = Lock()
self.scan = [0] * 360
self.pose = [0, 0, 0]
self.mapbytes = bytearray(MAP_SIZE_PIXELS * MAP_SIZE_PIXELS)
self.map_array = np.zeros((MAP_SIZE_PIXELS, MAP_SIZE_PIXELS), dtype=np.uint8)
self.slam = RMHC_SLAM(RPLidarSensor(), MAP_SIZE_PIXELS, MAP_SIZE_METERS)
self.angles_received = set()
self.proc = subprocess.Popen(
['/home/milo/robot_gui/rplidar/rplidar_sdk/output/Linux/Release/ultra_simple',
'--channel', '--serial', '/dev/ttyUSB0', '256000'],
stdout=subprocess.PIPE, text=True
)
self.thread = Thread(target=self._update_loop, daemon=True)
self.thread.start()
def _update_loop(self):
for line in self.proc.stdout:
line = line.strip()
if not line or ',' not in line:
continue
try:
angle, distance = map(float, line.split(','))
i = int(angle)
if 0 <= i < 360:
self.scan[i] = int(distance) if 100 < distance < 6000 else 0
self.angles_received.add(i)
if len(self.angles_received) >= 360:
self.slam.update(self.scan, self.pose)
self.slam.getmap(self.mapbytes)
with self.lock:
self.map_array = np.reshape(self.mapbytes, (MAP_SIZE_PIXELS, MAP_SIZE_PIXELS)).copy()
self.scan = [0] * 360
self.angles_received.clear()
except ValueError:
continue
def get_data(self):
with self.lock:
return self.map_array.copy(), self.pose.copy()
def stop(self):
self.proc.send_signal(signal.SIGINT)
try:
self.proc.wait(timeout=3)
except subprocess.TimeoutExpired:
self.proc.kill()