Hello,
I wrote an arduino software to control each WS2814 led with an python script (pyseril, serial_for_url) via ethernet. The board I use is an Olimex ESP32 POE-Board.
Now, if I send an command to enable an LED it only works sporadically. It looks like some commands are lost, but I have no idea why. This is my first time using the ethernet library but connect to the router and getting an IP from DHCP works fine.
I hope someone can give me advice to my code:
/*
Software to control WS2815 leds with python via ethernet
*/
#include <Adafruit_NeoPixel.h>
// === DEFINITIONS FOR NETWORK COMMUNICATION ===
// Important to be defined BEFORE including ETH.h for ETH.begin() to work.
// Example RMII LAN8720 (Olimex, etc.)
#ifndef ETH_PHY_TYPE
#define ETH_PHY_TYPE ETH_PHY_LAN8720
#define ETH_PHY_ADDR 0
#define ETH_PHY_MDC 23
#define ETH_PHY_MDIO 18
#define ETH_PHY_POWER -1
#define ETH_CLK_MODE ETH_CLOCK_GPIO0_IN
#endif
// Include network library
#include <ETH.h>
// Ethernet server definitions
#define ETH_SERVER_PORT 8123
#define DATA_LEN_MAX 3
#define CMD_LED_OFF 1
#define CMD_SET_LED_COLOR 2
// === DEFINITIONS FOR LED CONTROL ===
// LED data pin
#define PIN 2
#define PIXEL_COUNT 20
// === VARIABLES FOR NETWORK COMMUNICATION ===
// Global variable which indicates if network is connected
static bool eth_connected = false;
// Initialize the Ethernet server library
NetworkServer server(ETH_SERVER_PORT);
// Typedef for data structure
typedef struct {
unsigned char cmd;
unsigned char data_length;
unsigned char data_buffer[DATA_LEN_MAX];
} CMD_t;
CMD_t rx_data;
// === VARIABLES FOR LED CONTROL ===
Adafruit_NeoPixel strip = Adafruit_NeoPixel(PIXEL_COUNT, PIN, NEO_GRB + NEO_KHZ800);
// WARNING: onEvent is called from a separate FreeRTOS task (thread)!
// Handles ethernet state machine
void onEvent(arduino_event_id_t event) {
switch (event) {
case ARDUINO_EVENT_ETH_START:
Serial.println("ETH Started");
// The hostname must be set after the interface is started, but needs
// to be set before DHCP, so set it from the event handler thread.
ETH.setHostname("esp32-ethernet");
break;
case ARDUINO_EVENT_ETH_CONNECTED: Serial.println("ETH Connected"); break;
case ARDUINO_EVENT_ETH_GOT_IP:
Serial.println("ETH Got IP");
Serial.println(ETH);
eth_connected = true;
break;
case ARDUINO_EVENT_ETH_LOST_IP:
Serial.println("ETH Lost IP");
eth_connected = false;
break;
case ARDUINO_EVENT_ETH_DISCONNECTED:
Serial.println("ETH Disconnected");
eth_connected = false;
break;
case ARDUINO_EVENT_ETH_STOP:
Serial.println("ETH Stopped");
eth_connected = false;
break;
default: break;
}
}
// Calculates modbus crc16
uint16_t crc16_modbus(uint8_t *data, uint16_t length) {
uint16_t crc = 0xFFFF;
uint16_t i, j;
for (i = 0; i < length; i++)
{
crc ^= (uint16_t)data[i]; // XOR byte into least sig. byte of crc
for (j = 0; j < 8; j++) // Loop over each bit
{
if (crc & 0x0001) // If the LSB is set
{
crc >>= 1; // Shift right and XOR 0xA001
crc ^= 0xA001;
}
else // Else LSB is not set
{
crc >>= 1; // Just shift right
}
}
}
return crc;
}
// Receive and save data from client
bool recv_data(NetworkClient* client) {
// Check if data is available
if (client->available() >= 4) {
Serial.println("Received data");
// Read first two bytes
rx_data.cmd = client->read();
rx_data.data_length = client->read();
// Check remaining data
if (rx_data.data_length + 2 >= client->available()) {
// Read remaining data
client->readBytes(rx_data.data_buffer, rx_data.data_length);
}
else {
client->flush();
return false;
}
// Combine the received CRC bytes
uint16_t crc = client->read() + (client->read() << 8);
// Calculate CRC
uint16_t crc_check = crc16_modbus((uint8_t*) &rx_data, 2 + rx_data.data_length);
// Flush remaining data
client->flush();
if (crc == crc_check) {
Serial.println("CRC OK");
return true;
}
else {
Serial.println("CRC NOK");
return false;
}
return (crc == crc_check);
}
return false;
}
// Parse data received from client
void parse_data(NetworkClient* client) {
uint32_t color;
switch(rx_data.cmd) {
case CMD_LED_OFF:
Serial.println("LED OFF");
// Check if data length is valid
if (rx_data.data_length != 1) {
break;
}
// Check of all leds should be turned off
if (rx_data.data_buffer[0] == 0xFF) {
// Turn all leds off
strip.clear();
break;
}
// Turn specific led off
strip.setPixelColor(uint16_t(rx_data.data_buffer[0]), strip.Color(0, 0, 0));
break;
case CMD_SET_LED_COLOR:
Serial.println("SET LED COLOR");
// Check if data length is valid
if (rx_data.data_length != 4) {
break;
}
// Get color from data
color = strip.Color(uint8_t(rx_data.data_buffer[1]), uint8_t(rx_data.data_buffer[2]), uint8_t(rx_data.data_buffer[3]));
// Check of all leds should be turned on
if (rx_data.data_buffer[0] == 0xFF) {
// Turn all leds on
for (uint16_t i = 0; i < strip.numPixels(); i++) {
strip.setPixelColor(i, color);
strip.show();
break;
}
}
Serial.print("Setting led with id:");
Serial.println(rx_data.data_buffer[0]);
Serial.print("Color:");
Serial.println(rx_data.data_buffer[1]);
Serial.println(rx_data.data_buffer[2]);
Serial.println(rx_data.data_buffer[3]);
// Turn specific led on
strip.setPixelColor(uint16_t(rx_data.data_buffer[0]), color);
strip.show();
break;
default:
break;
}
}
void setup() {
// Initialize serial logging
Serial.begin(115200);
// Initialize led strip
strip.begin();
strip.setBrightness(100);
strip.setPixelColor(0x01, strip.Color(255, 0, 0));
strip.show();
// Pass connection event handler to network library
Network.onEvent(onEvent);
// Start network handling
ETH.begin();
// Start server
server.begin();
}
void loop() {
// if an incoming client connects, there will be bytes available to read:
NetworkClient client = server.available();
if (client) {
// Check if data was received
if(recv_data(&client)) {
Serial.println("Parsing data...");
// Parse data
parse_data(&client);
}
}
}
The python script to controll the leds:
#!/usr/bin/python
# -*- coding: utf-8 -*-
##
# @file servo.py
#
# @brief Provides functions talk to arduino servo controller.
#
# @section author_main Author(s)
# - Created by Fabian Plaimauer.
# - Modified by ???.
"""! @brief Provides functions talk to arduino servo controller."""
import logging
import threading
import serial
CMD_LED_OFF = 0x01
CMD_LED_OFF_DATA_LENGTH = 0x01
CMD_SET_LED_COLOR = 0x02
CMD_SET_LED_COLOR_DATA_LENGTH = 0x04
class LEDController:
"""! LED controller class handles controlling the ws2815 led strip via sockets."""
def __new__(cls, configuration: dict):
"""! Singleton constructor."""
if not hasattr(cls, 'instance'):
cls.instance = super(LEDController, cls).__new__(cls)
return cls.instance
def __init__(self, configuration: dict) -> None:
"""! Initializes led settings.
@param led_config dictionary with config parameters
"""
logging.debug("called")
# Save parameters
self._config = configuration
self._url = f'socket://{self._config["ip"]}:{self._config["port"]}'
# Get mutex lock
self._lock = threading.Lock()
# Create empty serial handler
self._serial_handler = None
def connect(self) -> bool:
"""! Connect to arduino com port."""
logging.debug("called")
# Try opening serial port (thread-safe)
try:
# Lock this code section
with self._lock:
# Check if handler already exists
if not self._serial_handler:
self._serial_handler = serial.serial_for_url(
url = self._url,
timeout = self._config["timeout"],
write_timeout = self._config["write_timeout"]
)
except Exception as e:
logging.critical(e)
return False
return True
def check_connection(self) -> bool:
"""! Check if connected to arduino com port."""
logging.debug("called")
with self._lock:
return self._serial_handler.is_open if self._serial_handler else False
def disable(self, id = 0xFF):
"""! Disabled provided led, if no paramter is provided, will be disabled."""
logging.debug("called")
def set_color(self, id = 0xFF, green = 0x00, red = 0x00, blue = 0x00) -> bool:
"""! Sets color for provided led."""
logging.debug("called")
# Create data packet
packet = bytearray()
packet.append(CMD_SET_LED_COLOR)
packet.append(CMD_SET_LED_COLOR_DATA_LENGTH)
packet.append(id)
packet.append(green)
packet.append(red)
packet.append(blue)
# Calculate CRC
packet.extend(self._modbus_crc16(packet).to_bytes(2, 'little'))
return self._send_recv_data(packet, 0)
##### HELPER FUNCTIONS #####
# Helper function to send servo data and check response
def _send_recv_data(self, packet: bytearray, reply_length: int) -> bool | bytearray:
"""! Helper function to send data and check response."""
logging.debug("called")
try:
# Get mutex
with self._lock:
# Reset buffers
self._serial_handler.reset_input_buffer()
self._serial_handler.reset_output_buffer()
# Send packet
logging.error("write: %s", " ".join(f"0x{b:02X}" for b in packet))
self._serial_handler.write(packet)
if reply_length == 0:
return True
# Read data and wait until timeout
reply = self._serial_handler.read(reply_length)
logging.debug("received: %s", " ".join(f"0x{b:02X}" for b in reply))
# Check if timeout occured
if len(reply) == reply_length:
return reply
logging.debug("invalid reply length")
except Exception as e:
logging.error(e)
# Out of tries without success
logging.debug("failed sending data")
return False
# Helper function to calculate modbus crc16
def _modbus_crc16(self, data: bytes) -> int:
"""! Calculate the Modbus CRC16."""
crc = 0xFFFF
for _, byte in enumerate(data):
crc ^= byte
for _ in range(8):
if crc & 1:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc
config = {
"ip": "192.168.88.244",
"port": 8123,
"timeout": 3,
"write_timeout": 3,
}
import time
led = LEDController(config)
if led.connect():
led.set_color(0x03, 0xFF, 0xFF)
for i in range(1, 10):
led.set_color(i, 0x00, 0xFF)
time.sleep(1)
else:
print(led.check_connection())
print("FAiled to connect")
