Hey gang, new here and new to programming too lol.
I have an arduino script that I'm using to toggle a couple of relays and the script serial output is being read by a python script I have, which then plots data. I'm trying to make it so the python script can change some variables in the arduino script, but I'm having serious trouble. Any tips?
Arduino Script:
///// Fatigue Testing Script /////
// By: Luna Mircea
// V3-Changelog: Added dynamic parameter updates via serial input
// Pin definitions
const int InPin = 4; // Digital pin controlling relay 1 (input)
const int OutPin = 5; // Digital pin controlling relay 2 (output)
#define HI_PSI_PIN A7 // High-pressure sensor pin (analog pin A7)
// Pressure calibration
#define AGREE_SLOPE 1.005
#define AGREE_OFFSET 0
#define HIGHPSI 300.0
#define PSI_HV_COUNT 920
#define PSI_LV_COUNT 102
// Timing and cycle variables
unsigned long InActivationTime = 500; // Time input relay is active (in milliseconds)
unsigned long OutActivationTime = 500; // Time output relay is active (ms)
unsigned int totalCycles = 10; // Total number of cycles
unsigned int cycleCount = 0; // Counter for completed cycles
unsigned int TDelay = 500; // Time between relay activations
// Logging and pressure variables
bool logging = false;
float pressure = 0.0;
// Function prototypes
void logPressure();
void performRelayCycleWithLogging();
void delayWithPressureLogging(unsigned long duration);
void updateParameter(String input);
void setup() {
Serial.begin(250000); // Start serial communication
pinMode(InPin, OUTPUT);
pinMode(OutPin, OUTPUT);
pinMode(HI_PSI_PIN, INPUT);
digitalWrite(InPin, LOW);
digitalWrite(OutPin, LOW);
Serial.println("Welcome to the Fatigue Testing System!");
Serial.println(__DATE__); // Compilation date
Serial.println(__TIME__); // Compilation time
Serial.println("--------------------------------------------------");
}
void loop() {
// Handle incoming serial commands
if (Serial.available() > 0) {
String command = Serial.readStringUntil('\n');
updateParameter(command);
}
// Toggle logging with spacebar
if (logging) {
// Log pressure continuously
logPressure();
// Perform relay cycle with logging
if (cycleCount < totalCycles) {
performRelayCycleWithLogging();
} else {
Serial.println("Test complete. Logging stopped.");
logging = false; // Stop logging
digitalWrite(InPin, LOW);
digitalWrite(OutPin, LOW);
}
} else {
// Ensure both relays are off when logging is stopped
digitalWrite(InPin, LOW);
digitalWrite(OutPin, LOW);
}
// Check for user input to toggle logging
if (Serial.available() > 0)
{
int incomingByte = Serial.read();
if (incomingByte == ' ')
{
logging = !logging;
if (logging)
{
Serial.println();
Serial.println("--------------- LOGGING STARTED ----------------------");
Serial.print("millis,pressure");
Serial.println(); // Print newline
}
}
}
}
void updateParameter(String input) {
if (input.startsWith("InActivationTime=")) {
InActivationTime = input.substring(17).toInt();
Serial.print("Updated InActivationTime to ");
Serial.println(InActivationTime);
} else if (input.startsWith("OutActivationTime=")) {
OutActivationTime = input.substring(18).toInt();
Serial.print("Updated OutActivationTime to ");
Serial.println(OutActivationTime);
} else if (input.startsWith("TDelay=")) {
TDelay = input.substring(7).toInt();
Serial.print("Updated TDelay to ");
Serial.println(TDelay);
} else if (input.startsWith("totalCycles=")) {
totalCycles = input.substring(12).toInt();
Serial.print("Updated totalCycles to ");
Serial.println(totalCycles);
} else {
Serial.print("Unknown command: ");
Serial.println(input);
}
}
void logPressure() {
static unsigned long lastLogTime = 0;
unsigned long currentTime = millis();
if (currentTime - lastLogTime >= 10) { // Log every 10ms
lastLogTime = currentTime;
int value = analogRead(HI_PSI_PIN); // Declare value here
if (value < PSI_LV_COUNT) {
pressure = 0.0;
} else if (value > PSI_HV_COUNT) {
pressure = HIGHPSI;
} else {
pressure = AGREE_SLOPE * (HIGHPSI * ((float)(value - PSI_LV_COUNT) / (PSI_HV_COUNT - PSI_LV_COUNT))) + AGREE_OFFSET;
}
Serial.print(millis());
Serial.print(",");
Serial.println(pressure);
}
}
void performRelayCycleWithLogging() {
// Relay 1 Activation
Serial.println("Activating Relay 1...");
digitalWrite(InPin, HIGH);
delayWithPressureLogging(InActivationTime); // Wait while logging pressure
digitalWrite(InPin, LOW);
Serial.println("Deactivating Relay 1...");
delayWithPressureLogging(TDelay); // Delay between relay activations
// Relay 2 Activation
Serial.println("Activating Relay 2...");
digitalWrite(OutPin, HIGH);
delayWithPressureLogging(OutActivationTime); // Wait while logging pressure
digitalWrite(OutPin, LOW); // Ensure this turns off Relay 2
Serial.println("Deactivating Relay 2...");
delayWithPressureLogging(TDelay); // Delay before next cycle
// Increment cycle count
cycleCount++;
Serial.print("Cycle ");
Serial.print(cycleCount);
Serial.println(" complete.");
}
void delayWithPressureLogging(unsigned long duration) {
unsigned long startTime = millis();
while (millis() - startTime < duration) {
logPressure(); // Log pressure repeatedly during the delay
}
}
Python script:
"""
@author: Luna Mircea
Version 7 - Print All Serial Data
"""
########## Pressure Arduino Serial Plot Script ##########
import serial
import csv
import os
import matplotlib.pyplot as plt
import pandas as pd
from threading import Thread, Lock
import msvcrt
import time
# Set up serial connection
arduino_port = 'COM6' # Replace with the port where your Arduino is connected
baud_rate = 250000 # Baud rate must match the rate defined in your Arduino code
# Global stop flag
stop_flag = False
# Lock for serial port access
serial_lock = Lock()
# Generate a unique filename
def get_unique_filename(folder_path, prefix):
counter = 1
filename = f"{folder_path}/{prefix}_{counter}.csv"
while os.path.exists(filename):
counter += 1
filename = f"{folder_path}/{prefix}_{counter}.csv"
return filename
def plot_csv_data(filename):
# Skip the header row and read data
df = pd.read_csv(filename, names=["millis", "pressure"], skiprows=1)
# Convert 'millis' and 'pressure' columns to numeric, coercing errors
df["millis"] = pd.to_numeric(df["millis"], errors='coerce')
df["pressure"] = pd.to_numeric(df["pressure"], errors='coerce')
# Drop rows with NaN values
df = df.dropna()
# Calculate min and max for x-axis
xmin = int(df["millis"].min())
xmax = int(df["millis"].max())
# Plot the data
plt.plot(df["millis"], df["pressure"], color='black', linewidth=0.8)
plt.xlabel('Milliseconds')
plt.ylabel('Pressure (PSI)')
plt.title('Pressure Log')
# Set x-axis ticks from min to max in steps of 500
plt.xticks(range(xmin, xmax + 1, 500))
plt.grid(True)
# Save the plot as an image
plot_filename = filename.replace(".csv", "_Log.png")
plt.savefig(plot_filename, bbox_inches='tight')
plt.close()
# Read data from serial and save to CSV, while printing all serial output to terminal
def read_from_serial(ser, writer):
global stop_flag
last_print_time = time.time() # Track the last print time
while not stop_flag:
with serial_lock:
if ser.in_waiting > 0:
line = ser.readline().decode('utf-8').strip()
print(line) # Print all serial data to the terminal
if "," in line:
writer.writerow(line.split(","))
# Send updated parameters to Arduino
def send_parameters_to_arduino(ser):
parameters = {
"InActivationTime": input("Enter Input Valve Open time (ms): ").strip(),
"OutActivationTime": input("Enter Output Valve Open time (ms): ").strip(),
"TDelay": input("Enter Valve Delay (ms): ").strip(),
"totalCycles": input("Enter total number of cycles: ").strip()
}
for key, value in parameters.items():
command = f"{key}={value}\n"
arduino.write(command.encode())
# Main function
def main():
global stop_flag
username = input("Hi! Please enter your name: ").strip()
testname = input("Thanks! Now enter the test name: ").strip()
folder_name = f"Fatigue_Test_{username}_{testname}"
os.makedirs(folder_name, exist_ok=True)
filename = get_unique_filename(folder_name, "pressure_log")
try:
with serial.Serial(arduino_port, baud_rate, timeout=1) as ser:
print(f"Connected to {arduino_port}")
# Send parameters to Arduino
send_parameters_to_arduino(ser)
input("Got it! Parameters sent to Arduino! Press Enter to start the test...")
ser.write(b' ') # Send a space character to the Arduino to start the test
with open(filename, "w", newline="") as file:
writer = csv.writer(file)
thread = Thread(target=read_from_serial, args=(ser, writer))
thread.start()
print("Press ESC to stop the test and generate the plot.")
while not stop_flag:
if msvcrt.kbhit() and msvcrt.getch() == b'\x1b': # Check for ESC key
print("\nESC pressed. Stopping the test...")
stop_flag = True
thread.join()
plot_csv_data(filename)
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
Excuse my shit coding plz!