Getting reliable 'timestamps' using pyfirmata

Hello all.

I’m entirely new to programming so bear with me. I only know (a bit of) python language so I’m trying to control my arduino nano through pyfirmata, because I am completely unfamiliar with the arduino IDE language. I know, I’m sorry, let me work on that.
I uploaded the sketch ‘StandardFirmata’ onto the arduino.
This post may look like a python-forum-question in the beginning, but it will be arduino-related, I promise.

Now: I’m measuring analog IN values and I need the moments in time with every measurement, so I can plot a graph.
I’m using a while-loop to store measurements in a list using read(), and use the python time.perf_counter() to store the ‘timestamps’ in another list.
The loop also checks some if-statements so it breaks after a maximum and minimum value is detected, along with the reading returning to begin value.
The begin value is the variable ‘equilibrium’.

I made sure there is a minimum of commands inside the loop, so voltages and time differences are all computed outside of the loop to make it more efficient. Time is measured in nanoseconds instead of seconds because I let somebody tell me that an integer is less CPU-consuming than a float.

the main measuring-loop function looks like this:

def collect_data(analog_in, equilibrium, times_ns, readings, main_flag=500):
“”"
Collect data from an analog IN pin from Arduino, with timestamps.

When min and max peaks are collected, will collect some more data
to show a return to equilibrium in the graph.

Parameters

analog_in : Analog IN pin on Arduino
equilibrium : Equilibrium reading (float)
times_ns : list for storing timestamps
readings : list for storing datapoints
main_flag : int, optional. Default is 500.

Returns

None.

“”"

Set some flags.

min_flag = equilibrium - 0.2
max_flag = equilibrium + 0.2
equilibrium_flag = round(equilibrium, ndigits=1)

while True:

Get time and reading.

reading = analog_in.read()
t_x = time.perf_counter_ns()

if main_flag <= 0:

When main flag reaches zero, measurements stop.

break

elif (
min(readings) < min_flag
and max(readings) > max_flag
and round(reading, ndigits=1) == equilibrium_flag
):

When a minimum and maximum reading is collected

and reading goes back to equilibrium: collect some more

data and reduce the main flag.

main_flag -= 1
times_ns.append(t_x)
readings.append(reading)

else:

Collect data.

times_ns.append(t_x)
readings.append(reading)

This all works pretty good, and it gives me a nice plot afterwards (figure_1, attached tot this post).
Nevermind the Dutch language axes labels, above the figure is the calculated time delta_t between the minumum and maximum reading. This is not correct. I timed the real time with a stopwatch at 5 seconds, more or less. So if the delta_t was around 5 seconds I would have bought it. A difference of 15 seconds however, is somewhat more than my reaction time I reckon.

I made these functions to calculate the readings and delta_t:

Convert nanoseconds in times_ns to seconds.

times =
for t in times_ns:
times.append(
(t - t_0) / (10**9)
)

Convert readings into voltages.

voltages =
for r in readings:
voltages.append(
r * 5.0
)


def get_delta_t(voltages, times):
“”"
Get the time gap between low voltage and high voltage.

Parameters

voltages : list of voltage readings.
times : list of time readings

Returns

delta_t : time gap in seconds (float)

“”"

Copy and sort voltages list.

voltages_sorted = voltages.copy()
voltages_sorted.sort()

Get index of highest value.

index_high = voltages.index(voltages_sorted[-1])

Get index of lowest value.

index_low = voltages.index(voltages_sorted[0])

Get time interval.

t_low = times[index_low]
t_high = times[index_high]
delta_t = t_high - t_low

return delta_t

After this I plot the voltages list as a function of the time list, and display delta_t above the figure. I will save you that function, as it is not relevant to the questions.

With everything I have shown, it comes down to this: I read an analog measurement from the arduino, and immediately register the time from my computer. When I do this once or twice, it turns out okay, with probably a little error.
When I do it a couple of million(?) times, like in the main loop, I get a vast error (around 300%) in measured times and calculated delta_t. This is unacceptable for my project so it’s an issue.

So here they come: the arduino questions.

-Is this error because of some or other delay in the serial connection? I tried to measure the time before the analog_in, and the other way around. This did not help.

-Is it maybe because I ‘overflow’ something? (I nicked the term ‘overflow’ from the internet, no idea what it means) So maybe I should give the poor little arduino some time to recover between looping?

-Should I use pyserial to flush the serial buffer in the loop? (no idea what it does. Again, internet wisdom) Or does board.exit() do that for me? Is once enough?

-In the arduino IDE language, I read about the millis() function, which I think is similar to python’s time.perf_counter() function, only with millis() it is the time registered by the arduino instead of my computer. Might this be a way to eliminate the delay in the serial connection? Is there a way to read millis() in pyfirmata? I can’t find it in the docs.

-If not, is there some kind of timestamp related to the values of read()? If so, how can I obtain them?
Like maybe a function that looks like millis.read()?

Thanks for reading this. If I posted this in the wrong category please let me know.

Figure_1.png

Try using pymata4 . It timestamps automatically and is is currently maintained and supported!

I didn’t mention it here yet, but I solved the error! I added a microsecond of delay in the main measuring loop.
This made all the difference for the accuracy in the timing measurements.

Only it raises the question of why the error in measurements in time are related to the quantity (or timing?) of the analog pin measurements?
Can somebody explain this? My guess is that some process consumes too much CPU? But is that a cap related to the arduino or my computer? Or maybe a data transfer overflow on the serial bus?

In a few days I am going to research how small I can make the delay, maybe this way I can find a ‘threshold’ in the delay time, where the measurements start to diverge with real time.
If I get a value on this threshold, maybe I can compare it with the CPU cycle times of the arduino or my computer. Then we might be able to find the culprit in this error.
I will post any progress on the matter here.

And thankyou MrY for the reply! I will definitely explore this lib. Does it have similar functions as pyfirmata? I hope I won’t have to rewrite too much code.
Else, I’m still gonna try. The description looks promising.

For those who are interested, the main measuring function now looks like this:

def collect_data(
board, analog_in, equilibrium, times_ns, readings, main_flag=100
):
“”"
Collect data from an analog IN pin from Arduino, with timestamps.

When min and max peaks are collected, will collect some more data
to show a return to equilibrium in the graph.

Parameters

analog_in : Analog IN pin on Arduino
equilibrium : Equilibrium reading (float)
times_ns : list for storing timestamps
readings : list for storing datapoints
main_flag : int, optional. Default is 500.

Returns

None.

“”"

Set some flags.

min_flag = equilibrium - 0.2
max_flag = equilibrium + 0.2
equilibrium_flag = round(equilibrium, ndigits=1)

while True:

Get time and reading.

reading = analog_in.read()
t_x = time.perf_counter_ns()
board.pass_time(0.000001) # This is the delay that solved my error.

if main_flag <= 0:

When main flag reaches zero, measurements stop.

break

elif (
min(readings) < min_flag
and max(readings) > max_flag
and round(reading, ndigits=1) == equilibrium_flag
):

When a minimum and maximum reading is collected

and reading goes back to equilibrium: collect some more

data and reduce the main flag.

main_flag -= 1
times_ns.append(t_x)
readings.append(reading)

else:

Collect data.

times_ns.append(t_x)
readings.append(reading)

The API is different. IMHO it is more consistent and easier to use. Below, I have an adapted version of your code. I am not doing your calculations with equalibrium, etc, but just wanted to demonstrate pymata4 and that the conversion would be simple. You could place your calculations within the callback or outside - it is up to you.

All the data is captured in the callback. I added a 0.25 second sleep just so that you could see the time differential in the printed output. Normally, you would not need a sleep within the callback.

import sys
import time
from pymata4 import pymata4


class CollectData:

   def __init__(self, analog_in=2, equilibrium=0, times_ns=None,
                readings=None, main_flag=100, differential=0):
       """
       :param analog_in: arduino analog input pin number
       :param differential: difference between current value and
                            previous value to consider the change
                            in value a change event
       """
       self.analog_in = analog_in
       self.equilibrium = equilibrium
       self.times_ns = times_ns
       self.readings = readings
       self.main_flag = main_flag

       self.differential = differential

       # Callback data indices
       self.CB_PIN_MODE = 0  # pin mode (see pin modes in private_constants.py)
       self.CB_PIN = 1  # pin number
       self.CB_VALUE = 2  # reported value
       self.CB_TIME = 3  # raw time stamp

       # instantiate pymata4
       self.board = pymata4.Pymata4()

       # set the pin mode for analog input
       self.board.set_pin_mode_analog_input(self.analog_in, self.the_callback, self.differential)
       while True:
           try:
               time.sleep(1)
           except KeyboardInterrupt:
               self.board.shutdown()
               sys.exit(0)

   def the_callback(self, data):
       self.times_ns.append(data[self.CB_TIME])
       self.readings.append(data[self.CB_VALUE])

       # This line just adds some delay so that the printed
       # output will demonstrate time differential.
       # It should probably be removed.
       time.sleep(0.25)

       # stop after 10 readings
       if len(self.readings) >= 10:
           formatted_time_list = []
           for timex in self.times_ns:
               formatted_time_list.append(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timex)))
           print(formatted_time_list)
           print(self.readings)
           self.board.shutdown()
           sys.exit(0)


my_times_ns = []
my_readings = []

cd = CollectData(analog_in=2, times_ns=my_times_ns, readings=my_readings)

Here is the output:

pymata4:  Version 1.02

Copyright (c) 2020 Alan Yorinks All Rights Reserved.

Opening all potential serial ports...
/dev/ttyACM0

Waiting 4 seconds(arduino_wait) for Arduino devices to reset...

Searching for an Arduino configured with an arduino_instance = 1
Arduino compatible device found and connected to /dev/ttyACM0

Retrieving Arduino Firmware ID...
Arduino Firmware ID: 2.5 FirmataExpress.ino

Retrieving analog map...
Auto-discovery complete. Found 20 Digital Pins and 6 Analog Pins


['2020-04-20 16:05:42', '2020-04-20 16:05:42', '2020-04-20 16:05:43', '2020-04-20 16:05:43', '2020-04-20 16:05:43', '2020-04-20 16:05:43', '2020-04-20 16:05:44', '2020-04-20 16:05:44', '2020-04-20 16:05:44', '2020-04-20 16:05:44']
[481, 481, 481, 481, 481, 481, 481, 481, 481, 481]

Any questions, I will be happy to answer them.

I forgot to mention that the example I provided assumes you have loaded [FirmataExpress](http://I forgot to mention that the example I provided assumes you have loaded FirmataExpress on to your Arduino.) on to your Arduino.

If you want to use StandardFirmata, just set the baud rate to 57600 when you create the board.

Thank you so much for the help. I made a lot of progress.
I made a prototype of my project last week, and everything worked really good at low operating speeds.

The Arduino bit however is too slow in registering data values change when I operate the construction at higher speeds. It will 'miss' voltage spikes completely, so I cannot get a proper reading.
I used the firmata express sketch along with the CollectData class that MrY made (thanks again) but, sadly, to no avail.

I guess the code and/or data transfer is still bottlenecking somewhere, somehow. The time has come for me to get acquainted with the Arduino language and see if I can write a 'protocol' made solely for this project. I hope this way I can make faster measurements, due to the arduino doing only what I need it to do.
My guess is that the firmata protocol (and any other protocol for that matter) is somewhat too versatile for my project, 'wasting' duty cycles on commands that I do not use. Is this correct?

I will learn the arduino language and see if I can build an efficient sketch to send data over the serial bus.
Maybe I can interpret it on the computer side with pyserial or something like that.

This learning process will take some time, any progress I make will be posted here.

You quite correct, Firmata is not a match for all projects. Just to let you know, the default cycle time of the StandardFirmata loop() function is about 20ms. With pymata4 you can adjust that down to 10ms.

Aha, there's my problem. I need measurements accurate to microseconds... I'm starting to doubt my weapon of choice.
I designed this project for measuring using an oscilloscope, which is ofcourse very accurate in timing.
Because of the COVID-19 going on I do not have access to an oscilloscope anymore, so I figured I could try to build a cheap alternative measurement system using Arduino.

I'm not giving up just yet, though. I came a long way already with the Arduino language, and I'm now struggling to interpret the serial prints in python.
When I can make sense of the HEX code, I can decide if Arduino has what it takes for these high speed measurements.

To be continued..

To convert the hex values you can use the ord() method. For example, if you are reading the data into a variable called c, you can get the decimal value by doing something like:

c = ord(self.serial_port.read())