I'm also working on this from a Python perspective. I'm close.
('fov_degrees', 42.0), # Field of view in degrees
('image_width_pixels', 35), # Width of image in pixels
import serial
import time
import numpy as np
class MatekOpticalFlowSensor:
FUNC_LIDAR = 7937
FUNC_FLOW = 7938
HEADER_SIZE = 8
def __init__(self, port, baudrate=115200, timeout=1):
self.serial_port = serial.Serial(port, baudrate, timeout=timeout)
self.data = {
'height': [],
'xm': [],
'ym': []
}
def crc8_dvb_s2(self, crc, a):
crc ^= a
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0xD5
else:
crc = crc << 1
crc &= 0xFF # Ensure crc is always 8 bits
return crc
def calculate_checksum(self, data):
crc = 0
for byte in data:
crc = self.crc8_dvb_s2(crc, byte)
return crc
def parse_lidar_payload(self, payload):
quality = payload[0]
range_data = int.from_bytes(payload[1:5], byteorder='little')
return quality, range_data
def parse_flow_payload(self, payload):
quality = payload[0]
x_velocity = int.from_bytes(payload[1:5], byteorder='little', signed=True)
y_velocity = int.from_bytes(payload[5:9], byteorder='little', signed=True)
print(f"parse_flow_payload payload: {[hex(x) for x in payload]} size: {len(payload)}, payload[1:5]: {payload[1:5]}, payload[5:9]: {payload[5:9]}")
print(f"parse_flow_payload x_velocity: {x_velocity}, y_velocity: {y_velocity}")
return quality, x_velocity, y_velocity
def poll_flow(self, num_samples=1):
self.data = {
'height': [],
'xm': [],
'ym': []
}
lidar_count = 0
flow_count = 0
start_time = time.time()
while len(self.data['height']) < num_samples or len(self.data['xm']) < num_samples or len(self.data['ym']) < num_samples:
while True:
if self.serial_port.read(1) == b'$':
break
while self.serial_port.in_waiting < self.HEADER_SIZE - 1:
time.sleep(0.01)
header = b'$' + self.serial_port.read(self.HEADER_SIZE - 1)
if header[1:2] != b'X' or header[2:3] not in b'<>!':
print(f"Invalid header: {header}")
continue
flag = header[3]
func = int.from_bytes(header[4:6], byteorder='little')
size = int.from_bytes(header[6:8], byteorder='little')
while self.serial_port.in_waiting < size + 1:
time.sleep(0.01)
payload = self.serial_port.read(size)
msg_crc = self.serial_port.read(1)[0]
checksum_data = header[3:] + payload
calculated_checksum = self.calculate_checksum(checksum_data)
if calculated_checksum != msg_crc:
print(f"Checksum mismatch: expected {msg_crc}, calculated {calculated_checksum}")
continue
if func == self.FUNC_LIDAR and size == 5:
lidar_count += 1
quality, range_data = self.parse_lidar_payload(payload)
if len(self.data['height']) < num_samples:
# millimeters
self.data['height'].append(range_data / 1000.)
elif func == self.FUNC_FLOW and size == 9:
flow_count += 1
quality, x_velocity, y_velocity = self.parse_flow_payload(payload)
if quality < 255:
if len(self.data['xm']) < num_samples:
self.data['xm'].append(x_velocity)
if len(self.data['ym']) < num_samples:
self.data['ym'].append(y_velocity)
else:
print(f"Unknown packet: func={func}, size={size}, payload={payload}")
end_time = time.time()
elapsed_time = end_time - start_time
lidar_frequency = lidar_count / elapsed_time
flow_frequency = flow_count / elapsed_time
# print(f"Lidar message frequency: {lidar_frequency} Hz")
# print(f"Optical flow message frequency: {flow_frequency} Hz")
return self.data
def poll_sensor(self):
current_time = self.get_clock().now().to_msg().sec
try:
self._sensor.poll_flow()
delta_time = current_time - self.last_time if self.last_time is not None else self._dt
# Get raw sensor value (average of data for more stable reading)
# 13.000000
# x_dps: -0.016250
xv = np.average(self._sensor.data['xm']) # x-axis velocity
yv = np.average(self._sensor.data['ym']) # y-axis velocity
self._pos_z = np.average(self._sensor.data['height']) # Meters off the ground
self.get_logger().info(f"xv: {xv:.6f}, yv: {yv:.6f}, _pos_z: {self._pos_z:.6f}")
fov_degrees = self.get_parameter('fov_degrees').value
fov_radians = np.radians(fov_degrees)
fov_width_meters = 2.0 * self._pos_z * np.tan(fov_radians / 2.0)
self.get_logger().info(f"fov_degrees: {fov_degrees}, fov_radians: {fov_radians}. fov_width_meters: {fov_width_meters:.6f}, self._pos_z: {self._pos_z}")
scale_factor = fov_width_meters / self.get_parameter('image_width_pixels').value
# Calculate the distance traveled using self._pos_z in meters over time
dist_x = xv * scale_factor
dist_y = yv * scale_factor
# Calculate velocities in meters per second
x_vel = dist_x / delta_time
y_vel = dist_y / delta_time
self.linear_velocity.x = 0.0
self.linear_velocity.y = 0.0
if not np.isnan(x_vel) and not np.isinf(x_vel):
self.linear_velocity.x = x_vel
if not np.isnan(y_vel) and not np.isinf(y_vel):
self.linear_velocity.y = y_vel
self.get_logger().info(f"linear_velocity: {self.linear_velocity}")
# Update positions
self._pos_x += dist_x
self._pos_y += dist_y
# self.get_logger().info(f"_pos_x: {self._pos_x:.6f}, _pos_y: {self._pos_y:.6f}")
except (RuntimeError, AttributeError) as e:
self.get_logger().error('Exception occurred during poll_sensor: ' + str(e))
self.last_time = current_time
Here is some payload during movement in the x axis:
fov_degrees: 42.0, fov_radians: 0.7330382858376184. fov_width_meters: 0.066792, self._pos_z: 0.087
linear_velocity: geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0)
payload: ['0x68', '0x2', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\x02\x00\x00\x00', payload[5:9]: b'\x00\x00\x00\x00'
x_velocity: 2, y_velocity: 0
payload: ['0x67', '0x1', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\x01\x00\x00\x00', payload[5:9]: b'\x00\x00\x00\x00'
x_velocity: 1, y_velocity: 0
payload: ['0x69', '0xff', '0xff', '0xff', '0xff', '0x1', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\xff\xff\xff\xff', payload[5:9]: b'\x01\x00\x00\x00'
x_velocity: -1, y_velocity: 1
payload: ['0x65', '0xa', '0x0', '0x0', '0x0', '0xff', '0xff', '0xff', '0xff'] size: 9, payload[1:5]: b'\n\x00\x00\x00', payload[5:9]: b'\xff\xff\xff\xff'
x_velocity: 10, y_velocity: -1
payload: ['0x65', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\x00\x00\x00\x00', payload[5:9]: b'\x00\x00\x00\x00'
x_velocity: 0, y_velocity: 0
payload: ['0x6a', '0xff', '0xff', '0xff', '0xff', '0x1', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\xff\xff\xff\xff', payload[5:9]: b'\x01\x00\x00\x00'
x_velocity: -1, y_velocity: 1
payload: ['0x66', '0xfe', '0xff', '0xff', '0xff', '0xfe', '0xff', '0xff', '0xff'] size: 9, payload[1:5]: b'\xfe\xff\xff\xff', payload[5:9]: b'\xfe\xff\xff\xff'
x_velocity: -2, y_velocity: -2
payload: ['0x65', '0xf7', '0xff', '0xff', '0xff', '0xfe', '0xff', '0xff', '0xff'] size: 9, payload[1:5]: b'\xf7\xff\xff\xff', payload[5:9]: b'\xfe\xff\xff\xff'
x_velocity: -9, y_velocity: -2
payload: ['0x67', '0x1', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\x01\x00\x00\x00', payload[5:9]: b'\x00\x00\x00\x00'
x_velocity: 1, y_velocity: 0
payload: ['0x6a', '0xff', '0xff', '0xff', '0xff', '0x0', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\xff\xff\xff\xff', payload[5:9]: b'\x00\x00\x00\x00'
x_velocity: -1, y_velocity: 0
payload: ['0x68', '0x1', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\x01\x00\x00\x00', payload[5:9]: b'\x00\x00\x00\x00'
x_velocity: 1, y_velocity: 0
payload: ['0x65', '0xff', '0xff', '0xff', '0xff', '0x0', '0x0', '0x0', '0x0'] size: 9, payload[1:5]: b'\xff\xff\xff\xff', payload[5:9]: b'\x00\x00\x00\x00'