Thread-safe data transfer interlock using std::atomic and std::memory_order

After reading this post, I decided I’d try to understand the C++ Memory Model stuff, at least at a basic level. I followed a link posted by @PieterP as well as some Google search results.

As a usage example, I came up with the ESP32 code below. The idea is to have a writer task (simulates handling sensors and gathering their data) that writes to a large, shared global data structure and an independent reader task that processes data from that structure.

It uses two atomic bool variables for the synchronization interlock: newDataAvailable and dataAck as well as two state machines. The goal was to do the task / data synchronization without using FreeRTOS structures such as queues. The interlock sequence is:

  • Writer writes data to the shared struct
  • Writer raises newDataAvailable flag
  • Reader sees newDataAvailable flag
  • Reader accesses data from struct
  • Reader raises dataAck flag
  • Writer sees dataAck flag
  • Writer lowers newDataAvailable flag
  • Reader sees lowered newDataAvailable flag
  • Reader lowers dataAck flag
  • Cycle repeats

If all the memory accesses are consistent, the reader will print series of incrementing integers, starting at 0, without gaps. The code as worked for several combinations of relative task priorities and core assignments, but I have a couple of questions:

  • Is the code as written guaranteed to work under all conditions given the C++ memory model?
  • The code seems rather complex, is there a simpler way to do it?

Thanks.

#include "Arduino.h"
#include <atomic>

void writerTask(void *parms);
void readerTask(void *parms);

using SensorData = struct {
  uint32_t a = 0;
  uint32_t b = 0;
  uint32_t c = 0;
  uint32_t d = 0;
};

SensorData sensorData;

std::atomic<bool> newDataAvailable = { false };
std::atomic<bool> dataAck = { false };

void setup() {
  Serial.begin(115200);
  vTaskDelay(4000);
  Serial.println("Starting");

  BaseType_t returnCode = xTaskCreatePinnedToCore(writerTask, "Writer Task", 2000, NULL, 5, NULL, 1);
  if (returnCode != pdPASS) {
    log_e("Failed to create Writer Task");
    vTaskDelete(NULL);
  }

  returnCode = xTaskCreatePinnedToCore(readerTask, "Reader Task", 2000, NULL, 5, NULL, 1);
  if (returnCode != pdPASS) {
    log_e("Failed to create Reader Task");
    vTaskDelete(NULL);
  }
}

void writerTask(void *parms) {
  uint32_t sensorValue = 0;
  bool newSensorData = true;
  uint32_t lastDataTime;
  uint32_t waitTime;
  log_i("Starting writerTask");

  enum WriterState {
    READY_TO_SEND, WAITNG_ACK_SET, WAITING_ACK_RESET
  };

  WriterState state = READY_TO_SEND;

  for (;;) {
    switch (state) {
      case READY_TO_SEND:
        if (newSensorData) {
          sensorData.a = sensorValue++;
          sensorData.b = sensorValue++;
          sensorData.c = sensorValue++;
          sensorData.d = sensorValue++;
          state = WAITNG_ACK_SET;
          newDataAvailable.store(true, std::memory_order_release);
          newSensorData = false;
          waitTime = random(100, 500);
          lastDataTime = millis();
        } else {
          if (millis() - lastDataTime >= waitTime) {
            // Simulate time needed to acquire new sensor data
            newSensorData = true;
          }
        }
        break;

      case WAITNG_ACK_SET:
        // Wait for Reader to Set Ack signal
        if (dataAck.load(std::memory_order_acquire)) {
          state = WAITING_ACK_RESET;
          newDataAvailable.store(false, std::memory_order_release);
        }
        break;

      case WAITING_ACK_RESET:
        // Wait for Reader to Reset Ack signal
        if (!dataAck.load(std::memory_order_acquire)) {
          state = READY_TO_SEND;
        }
        break;

      default:
        break;
    }

    // Do other processing stuff here
    vTaskDelay(1);
  }
}

void readerTask(void *parms) {
  log_i("Starting readerTask");

  enum ReaderState {
    READY_FOR_DATA, WAITING_WRITER_RESET
  };

  ReaderState state = READY_FOR_DATA;

  for (;;) {
    switch (state) {
      case READY_FOR_DATA:
        // Wait for Writer to Set Available Signal
        if (newDataAvailable.load(std::memory_order_acquire)) {
          Serial.println(sensorData.a);
          Serial.println(sensorData.b);
          Serial.println(sensorData.c);
          Serial.println(sensorData.d);
          state = WAITING_WRITER_RESET;
          dataAck.store(true, std::memory_order_release);
        }
        break;

      case WAITING_WRITER_RESET:
        // Wait for Writer to Reset Available Signal
        if (!newDataAvailable.load(std::memory_order_acquire)) {
          state = READY_FOR_DATA;
          dataAck.store(false, std::memory_order_release);
        }
        break;

      default:
        break;
    }

    // Do other processing stuff here
    vTaskDelay(1);
  }
}

void loop() {
}

What the Atomic sequence does is stop any other process from accessing that variable, it can shut off interrupts, maybe internal, depends on the structure of the processor.

The reason is that memory location(s) is being used as a semaphore register (see Semaphore (programming) - Wikipedia ) and cannot be touched by any other process when being used. When setting or clearing it: The sequence is "read, modified, and written back." If something else reads it or writes to it it will corrupt the value. This is used in almost all multitasking systems. Back in the old days sometimes processor were chosen because they had the "Atomic" function in there repotra. Yes I am an old assembler person.

Try this link: and follow the TAS instruction, assembly - TAS instruction 68000 - Stack Overflow Like I said I am an old assemnbler person.

1 Like

I completely understand what "atomic" access means. But in C++ it's a lot more involved that your simple explanation and goes beyond an assembly language understanding. It's inexorably intertwined with the C++ Memory Model and optimization at both compile time and run time. For example, see:
https://stackoverflow.com/questions/12346487/what-do-each-memory-order-mean
http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync
https://rigtorp.se/spinlock/
https://en.cppreference.com/w/cpp/atomic/atomic
https://en.cppreference.com/w/cpp/atomic/atomic/store
https://en.cppreference.com/w/cpp/atomic/atomic/load
https://en.cppreference.com/w/cpp/language/memory_model

It does look correct.

I'm struggling to see a recovery path if either task has to be restarted. If that's important.

I think you can simplify the state machine a bit. If that's important.

Yeah. I think what you're doing works with a single std::atomic<bool>. The flag would indicate that the buffered data is valid or not. The writer sets it (have valid data) then waits for it to clear (no longer valid). The reader reads it, uses the buffered data, then clears it.

I considered that. But, I couldn't come up with a way to do it with a single std::atomic<bool> without having both tasks do accesses with both std::memory_order_release and std::memory_order_acquire on the same atomic variable. Wasn't sure if that would be a good thing.

What you're doing is essentially a simplified spin lock. The differences from a spin lock are...

  • A single pair of contenders instead of N
  • There is a dedicated writer
  • There is a dedicated reader
  • Flag instead of a lock

Which, in my mind, means a single bool is a reasonable choice.

The memory order argument has to do with what happens before or after the store or load. In other words, if your code was only concerned with the bool value then the memory order argument is irrelevant (memory_order_relaxed would be the best choice in that case).

Because you want the bool to be set after filling in sensorData and the sensorData to be read after the bool is set then you're concerned with the correct memory order argument. As long as you get that correct, which I believe you are, then a single bool is fine.

If you're feeling paranoid you can always use sequentially consistent (memory_order_seq_cst).

I agree with Coding_Badly that the state machine is overkill, and a single boolean flag that is set by the producer and cleared by the consumer should suffice.

You can also view this problem as a Single-Producer/Single-Consumer (SPSC) queue with one element.
A lock-free SPSC queue can be implemented with a single atomic variable that counts the number of elements in the queue.

FTFY :smile:

https://www.youtube.com/watch?v=KeLBd2EJLOU&t=5075s

As a result of this [incorrect use of explicit memory order release], we did a scrub of all uses of memory_order in our entire [Microsoft Visual C++] code base. There were three uses: one was incorrect, one was questionable (so we just switched it to sequential consistency), and one was okay.
And we're also instituting a rule that nobody may use relaxed atomics, even on ARM, without a thorough code review and getting an approved exception for using it.

I'm still on the fence though: when I read code that uses sequential consistency for all memory orders, does this mean that it's safer, or does it mean that the author did not know anything about memory order?


I think atomics are a fascinating topic, and I'd strongly encourage studying them further if you're interested, but I would add the following disclaimer: you probably shouldn't use them in real-world code.
They're just too error prone, it's incredibly hard to reason about the correctness of a program that uses them, and it's so easy to mess up. Instead, use existing patterns and implementations (that have peer-reviewed papers backing them up): for example, use a SPSC queue datastructure if you need to pass data from one task to another, or use a futex, condition variable, or monitor abstraction. A good implementation will use atomics under the hood, but you usually don't want to have to deal with that while writing application logic.

You might think that a SPSC queue is simple enough to get right. At least I thought so, until I pulled up the Wikipedia page on the bounded-buffer problem: the lock-free C++ implementation it lists has the wrong memory orders. And node-based atomic data structures are even harder to get right (see the ABA problem).

Some more resources you may find useful:

1 Like

Could you please post an updated version of my code using that paradigm so I can compare it against my original? Thanks.

Noted. Thanks.

void writerTask(std::stop_token stop) {
    uint32_t sensorValue = 0;
    while (!stop.stop_requested()) {
        bool ready_to_send = !newDataAvailable.load(std::memory_order_acquire);
        if (ready_to_send) {
            sensorData.a = sensorValue++; sensorData.b = sensorValue++; sensorData.c = sensorValue++; sensorData.d = sensorValue++;
            newDataAvailable.store(true, std::memory_order_release);
        }
        // Do other processing stuff here
        std::this_thread::sleep_for(10ms);
    }
}

void readerTask(std::stop_token stop) {
    while (!stop.stop_requested()) {
        // Wait for Writer to Set Available Signal
        if (newDataAvailable.load(std::memory_order_acquire)) {
            auto local_copy = sensorData;
            newDataAvailable.store(false, std::memory_order_release);
            std::cout << local_copy.a << '\t' << local_copy.b << '\t' << local_copy.c << '\t' << local_copy.d << '\n';
        }
        // Don't eat up all the CPU time
        std::this_thread::sleep_for(1ms);
    }
}

You want the completion of the local copy in the consumer to happen-before the next write in the producer. This is achieved by the release-acquire synchronization (L29→L14).
You also want the writing of the data in the producer to happen-before the copy in the consumer (this is the same as before), so that's the another release-acquire pair (L17→L27).

1 Like

Thanks, that's pretty much what I expected it to look like. I actually began with that, then I started thinking too much. Maybe I misinterpreted or read too much into these statements from https://en.cppreference.com/w/cpp/atomic/memory_order:

I wondered what would happen if the writerTask ended up loading the atomic value it just set to 'true' before the readerTask loaded it. Same question if the readerTask read the atomic value it just set to 'false' before the writerTask read it. Would those actions somehow break the "promise" of synchronization between the two tasks?

1 Like

Continuing my OCD pursuit of understanding, I'm still stuck on the phrase I quoted in Post 10:

The synchronization is established only between the threads releasing and acquiring the same atomic variable. Other threads can see different order of memory accesses than either or both of the synchronized threads.

Let's say the writerTask in @PieterP's example first does the store operation: newDataAvailable.store(true, std::memory_order_release);. Then (due to relative thread priority or whatever reason) writerTask happens to do its load operation: bool ready_to_send = !newDataAvailable.load(std::memory_order_acquire); before the readerTask does its load operation: if (newDataAvailable.load(std::memory_order_acquire)) {.

So, did the first load in writerTask "use up" the release / acquire pair synchronization and thus leave the load in readerTask dangling as an "unpaired" acquire operation?

Am I over thinking this?

That sentence strikes me as poorly written. For example, "other threads can see different order" applies only to threads not participating in the synchronization so why bother mentioning them? If they are not participating and they access that memory their behaviour is undefined. Unless ordering is irrelevant to those threads. In which case why bother mentioning them?

For what it's worth, that sentence confuses me as well. I see it as unnecessary annoying noise. Especially where it's placed.

Yeah. Nothing is getting used up. The atomic operations are not locks. They can be used to implement locks. It's all about ordering and consistency.

The operations are not strictly paired. For example, the Arc code in Rust mixes release / acquire and sequential consistency. The pairing is common but it is not required.

You may be. But it's still a good discussion.

1 Like

No, nothing is used up, you don't need uniquely coupled pairs of release-acquires, you can have one thread release data and many threads acquiring it.

The condition for synchronization is simply that the load-acquire reads the value stored by the store-release. It does not matter if there are any other reads of the value in between, that doesn't change the value, so the condition is still satisfied.

You should view the store-release as a way to make sure that all changes to variables that were made before the store-release are published to memory before the store-release itself is published to memory.
Similarly, the load-acquire ensures that any load operations that follow it are performed after the load-acquire itself.

For example:

store sensorData.d
store sensorData.b
store sensorData.a
store sensorData.c
▲ wait for all stores to complete, write any cached values to memory ▲
store-release newDataAvailable
load-acquire newDataAvailable
▼ wait for load-acquire to complete, clear any values cached in registers ▼
load sensorData.a
load sensorData.c
load sensorData.d
load sensorData.b

You don't know anything about the order of the individual stores to sensorData, but you do know that if the store-release of newDataAvailable has completed, then all stores to sensorData have completed as well.
Similarly, you don't know anything about the order of the loads of sensorData, but you do know that the values you load are at least as new as the value you loaded from newDataAvailable.

Combining the two, you know that the sensor data was written to memory before the atomic flag was set to true, so if you then read the flag as true, and you get the sensor data from memory after loading the flag, you know for sure that you read the full data that was written by the producer. After the acquire, all values stored by the producer before the release are visible to the consumer.

If you did not use a store-release of the atomic flag, for example, you could have a situation where the new value of the flag reaches the consumer thread before the new values of the sensor data:

Thread 1                       Thread 2

store sensorData.d
store sensorData.b
store newDataAvailable
                               load-acquire newDataAvailable
                               ▼ wait for load-acquire to complete ▼
                               load sensorData.a
                               load sensorData.c
                               load sensorData.d
                               load sensorData.b
store sensorData.a
store sensorData.c

This is bad, because you did not receive the new values for a and c yet. Similar problems arise when you use a normal load instead of a load-acquire in the consumer.

I'd recommend reading Jeff Preshing’s Memory Barriers Are Like Source Control Operations.

1 Like

@PieterP, @Coding_Badly:
Thanks for the replies. What you described is how I thought it worked until I got wrapped around the axel reading that section from https://en.cppreference.com/w/cpp/atomic/memory_order.

2 Likes

Sorry to drag this up again, but I have one more question. I understand that load(std::memory_order_acquire) and store(x, std::memory_order_release); each provide a one-way block against memory access instructions being moved before or after (respectively) the atomic operation. A "diode" if you will.

So, what about std::memory_order_seq_cst? Does it block in both directions? Or is the direction of the diode still determined by whether it's a load or store operation?

What does std::memory_order_seq_cst do that the other two don't?

Yes.

I believe the rest are implementation details. For example, on some processors sequential consistency might be implemented using a memory barrier (a big hammer).

Specifically, it disallows reordering store-releases and load-acquires of different atomic variables. That is

▲ store-release A
▼ load-acquire B

cannot be reordered to

▼ load-acquire B
▲ store-release A

As a consequence, all sequentially consistent atomic operations have a total ordering that all threads can agree on, and the order of these SC atomic operations is an interleaved version of the sequences of SC atomic operations of all individual threads. This makes them much easier to reason about, because this essentially serializes all SC atomic operations, and it is often necessary for correctness.

For example:

std::atomic<bool> stop{false};
std::thread t{[&stop] {
    while (!stop.load(std::memory_order_relaxed))
        compute(...);
}};
stop.store(true, std::memory_order_???);
t.join();

Internally, the thread will perform some store-release operation when the user-provided function returns, let's call this atomic flag done, and joining the thread will load-acquire the done flag when waiting for the thread to finish, and to synchronize with it.
If you store stop with memory_order_release, it might get reordered with this hidden acquire of done, causing livelock. You need sequential consistency to avoid that.

1 Like

Which one of my questions is that the answer to?

Sorry about that...

But, as usual, @PieterP provides a better answer. Post #17 is the better choice.

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.