I have written a python code that continuously plots live pressure data that communicates with an Arduino where the pressure transducer is connected. The code is continuously refreshes to plot the data but I'm having concerns on whether or not the data is being plotted in real-time. For example, when I increase pressure in my system, sometimes it will take 60s - 180s of delay to see the actual spike in pressure on the live-plot.
I believe its a code-related issue because I experience decent feedback speed in the first initial minute. However once the data logging has gone beyond 60s, I seem to lose the fast response rate and will have to wait a couple minutes to see said-spike in pressure on the live-data plot. More so, the Arduino is programmed to sample every 50 milliseconds, which makes me think I've written an inefficient code, as opposed to dealing with a hardware issue from the Arduino and/or pressure transducer.
The code can be seen below! Also, here's a great YouTube video link that I based my script off of: https://www.youtube.com/watch?v=PhDPnjF3_tA
#%% https://www.youtube.com/watch?v=PhDPnjF3_tA
%reset
#%% Importing Necessary Packages
import time
import serial
import csv
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from serial import Serial
from matplotlib.animation import FuncAnimation
from tqdm import tqdm
from IPython.display import FileLink
#%matplotlib inline
#%matplotlib notebook
#%matplotlib ipympl
#from matplotlib.pyplot import animation.FuncAnimation
#from tqdm import tqdm
import os
destination = r"C:\Users\kiera\OneDrive - UCB-O365\Desktop\CU_Boulder\Research_YifuDing\Pneumatic Bubble Testing\Device2 Testing\Plugged Device"
os.chdir(destination)
#%% Defining Arduino Communication
print(f'Saving data in folder "Live Data Recording"')
# Continue with the rest of your script...
# Constants
SERIAL_PORT = 'COM3'
BAUD_RATE = 9600
delay = 0.05 # 50 millisecond delay on Arduino
# Initialize serial connection
ser = Serial(SERIAL_PORT, BAUD_RATE)
print(f'Connected to Arduino port: {SERIAL_PORT}')
# Initialize empty lists for data storage
x_vals = []
sensorValue1_data = []
#%% Defining Data Calibration Function
# Data collection code block
def collect_data(samples, duration, file, stop_condition=None):
with tqdm(total=samples) as pbar: # Initialize tqdm with total samples
start_time = time.time()
line = 0
last_value = None
while line < samples:
getdata = ser.readline().decode('latin1')
data = getdata.rstrip('\r\n')
# Split data into separate values
values = data.split('\t')
# Convert each value to float
float_values = [float(value) for value in values]
# Write the float values to file
file.write('\t'.join(map(str, float_values)) + '\n')
line += 1
# Calculate progress based on actual samples collected
progress = line / samples * 100
pbar.n = line
pbar.refresh()
if stop_condition is not None:
if last_value is not None:
percent_drop = (last_value - float_values[0]) / last_value * 100
if percent_drop >= stop_condition:
break
last_value = float_values[0]
elapsed_time = time.time() - start_time
#if elapsed_time > duration / 1000:
#break
# Define filename and data collection parameters
filename = 'raw_data.csv'
calib_samples = 200 # number of calibration samples to be collected
calib_dur = calib_samples * 50 # Frequency of data collection is 100 ms
# Open file for writing
file = open(filename, 'w')
print('Created file.')
# Collect calibration data
print('Collecting calibration data...')
collect_data(calib_samples, calib_dur, file)
print('Calibration data collection complete.')
# Close and reopen file for appending
file.close()
file = open(filename, 'a')
#%% Plotting Calibrated Pressure Data
# Calibration code block
sensor1_range = 30 # psi
clean_calib = pd.read_csv("raw_data.csv", nrows=calib_samples).dropna()
P1_calib = clean_calib.iloc[:, 0]
P1_calib_mean = P1_calib.mean()
P1_calib_scaled = (P1_calib/1024)*sensor1_range
P1_mean = P1_calib_scaled.mean()
xaxis_calib = [(val+1)/((val+1)/((val+1)*50/1000)) for val in range(len(P1_calib_scaled))]
fig, ax = plt.subplots(1, dpi=300)
ax.scatter(xaxis_calib, P1_calib_scaled, label='P1 Calibration Data', linewidth=1.5)
ax.set_xlabel('Time (s)')
ax.set_ylabel('Pressure (psi)')
ax.set_title('Pressure Calibration Data')
ax.legend(loc='best')
ax.grid(True)
plt.savefig('Calibration Data.png', transparent=True, dpi=300)
FileLink('Calibration Data.png')
plt.show()
print('The mean calibration value for Sensor 1 is {0:.2f} psi.'.format(P1_mean))
#%% Live Pressure Data Recording
#x_vals = []
sensorValue1_data = []
psi_pressure_data = []
t_step = delay
FSO_to_psi = (30/1024)
P_calib_ref = P1_calib_mean
# Create function to read/process data from Arduino
def read_and_process_data():
line = ser.readline().decode('utf-8').strip()
sensorValues = line.split(', ')
sensorValue1_data.append(int(sensorValues[0]))
psi_pressure_data.append(int(sensorValues[0])*FSO_to_psi-P1_mean)
#psi_pressure_data = [x*FSO_to_psi for x in sensorValue1_data]
# Print the received values
#print(f'Time: {sensorValues[0]}, Pressure Transducer 1: {sensorValues[1]}')
# Create a function to update the plot
def update_plot(frame):
read_and_process_data()
plt.cla()
plt.plot(
np.array(range(len(sensorValue1_data)))*t_step,
psi_pressure_data, label='Pressure Transducer')
plt.xlabel('Time (s)')
plt.ylabel('Pressure (psi)')
plt.legend()
# Create a function to save data to a CSV file when plot window is closed
def on_close(event):
with open('live_pressure_data.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Time', 'Sensor1'])
for x, s1 in zip(np.array(range(len(sensorValue1_data)))*t_step,
psi_pressure_data):
writer.writerow([x, s1])
# Register the callback function for when the plot window is closed
fig, ax = plt.subplots()
fig.canvas.mpl_connect('close_event', on_close)
nvals=100
repeat_animation = 1
# Create a variable to hold the animation object
#ani = animation.FuncAnimation(
# fig, update_plot, cache_frame_data=False, blit=True,
# repeat=repeat_animation, interval=10, frames = nvals)
ani = animation.FuncAnimation(
fig, update_plot, interval=0.001)
# Display the animation
plt.show()
# saving to m4 using ffmpeg writer
#writervideo = animation.FFMpegWriter(fps=60)
#ani.save('LivePressureData_MOVIE.mp4', writer=writervideo)
#plt.close()