Задача о читателях и писателях. esp32

Про RWLock (Задача о читателях и писателях)

Подробно - в википедии Задача о читателях-писателях — Википедия

Вкратце:

У вас есть, к примеру, список длинный, с данными какими-то. И несколько задач, которые, скажем, каждую секунду бегают по этим спискам и читают данные из них.
А раз в час появляется задача-писатель, которая, например, удаляет или вставляет элементы в список.

Рано или поздно такая софтина повиснет, потому, что очередной читатель, идя по списку, внезапно обнаружит,
что ->next указывает уже в никуда

Можно было бы обойтись мутексом - захотел доступ - захвати мутекс! Но при таком подходе только один читатель или писатель будет иметь доступ,
остальные же будут ждать.

Решают данную проблему специальным объектом синхронизации, который называется rwlock (Readers-Writers Lock)

Его можно “захватывать” для чтения сколько угодно раз из разных задач, но на запись - только одной задачей

По какой-то необъяснимой причине во FreeRTOS нет такого примитива, как rwlock. В их мейллисте люди выражают удивление такому факту (2025 на дворе),
но бородатые авторы FreeRTOS отказываются реализовывать. Сами, говорят, делайте.

Ну и сделаем.

Для FreeRTOS я написал rwlock (приоритет - писателям).
Он простой и быстрый, но скорее всего не соберется компилятором Си++.
А может и соберется.
Кто его знает.
PS:
Оказывается, в Си++ “все уже придумано до нас”, но только начиная с C17: std::shared_mutex

// Реализация RWLock для ESP32. На Си, на Си++ скорее всего не скомпилируется.
// Можно использовать как в среде Arduino, так и в среде ESP-IDF
//
//  Инициализация:
//  rwlock_t rw = RWLOCK_INIT;
//
//  Задача-читатель:
//  ....
//  rw_lockr(&rw);
//  do_something();
//  rw_unlockr(&rw);
//
//  Задача-писатель:
//  ....
//  rw_lockw(&rw);
//  do_something();
//  rw_unlockw(&rw);

#include <stdatomic.h>
#include <stdint.h>

#include <freertos/FreeRTOS.h>
#include <freertos/semphr.h>
#include <freertos/task.h>

// Подсказки компилятору для предсказания ветвлений
#undef likely
#undef unlikely
#define unlikely(_X)   __builtin_expect(!!(_X), 0)
#define likely(_X)     __builtin_expect(!!(_X), 1)

// Двоичный семафор
#define lock(_Name) \
  do { \
    if (unlikely(_Name == NULL)) { \
      _Name = xSemaphoreCreateBinary(); \
      break; /* создан уже в заблокированном состоянии */\
    } \
    if (likely(_Name != NULL)) \
      while (xSemaphoreTake(_Name, portMAX_DELAY) == pdFALSE) { \
      /* Прошло 1200 часов. Пробуем снова. */ \
      } \
  } while( 0 )

//
#define unlock(_Name) \
  do { \
    if (likely(_Name != NULL)) \
      xSemaphoreGive(_Name); \
  } while( 0 )

// Структура RWLock (блокировка чтения/записи)
typedef struct {
  _Atomic uint32_t  wreq; // Сколько запросов на запись ожидают своей очереди
  _Atomic int       cnt;  // <0 — запись, 0 — свободно, >0 — чтение
  SemaphoreHandle_t sem;  // двоичный семафор, используется как блокирующий объект
} rwlock_t;

// Инициализатор для rwlock:
// rwlock_t my_lock = RWLOCK_INIT;

#define RWLOCK_INIT { 0, 0, NULL } 

// Захват объекта "на запись".
//
// Если /уже есть/ активные читатели или писатели, функция блокируется (повисает) на /rw->sem/,
// а так же сигнализирует, что новые запросы на чтение пока не принимаются.
//
// Если нет ни читателей ни писателей, захватываем /rw->sem/ и устанавливаем /cnt/ в 
// отрицательное значение, что означает «получена блокировка на запись».
//
void rw_lockw(rwlock_t *rw) {

  // Ставим флаг «запрос на запись» до захвата /rw->sem/,
  // чтобы новые читатели не успели влезть
  rw->wreq++;
try_again:  
  // Пытаемся захватить основной синхронизирующий объект.
  // Если он занят, ждём.
  lock(rw->sem); 

  // Получили семафор, но проверим, не успел ли читатель таки влезть.
  // Если успел — отпускаем и пробуем снова.
  if (rw->cnt != 0) {
    unlock(rw->sem);
    vPortYield();
    goto try_again;
  }
  rw->cnt = -1; // активный писатель
  rw->wreq--;
}

// Освобождение блокировки на запись
// Ожидаем, что /cnt/ = -1. Если нет — ошибка в логике RWLock.
//
void rw_unlockw(rwlock_t *rw) {
  rw->cnt = 0;
  unlock(rw->sem);
}

// Захват блокировки на чтение.
// Если уже есть писатель или запрос на запись, ждём.
//
// Это основной, чаще всего используемый тип блокировки.
void rw_lockr(rwlock_t *rw) {

  int cnt;

  // Ждём, пока нет писателей и запросов на запись
  while (atomic_load_explicit(&rw->cnt, memory_order_acquire) < 0 ||
         atomic_load_explicit(&rw->wreq, memory_order_acquire) > 0)
    vPortYield(); 

  // Атомарно увеличиваем число читателей
  cnt = atomic_fetch_add_explicit(&rw->cnt, 1, memory_order_acq_rel);

  // Первый читатель захватывает /rw->sem/, чтобы писатели сразу блокировались.
  // Если писатель всё же успел вклиниться, мы подождём, пока он завершит цикл try_again.
  // Это вроде как баг, но он несущественный: просто один из читателей ошибочно будет опознан как писатель,
  // ну и подождет, не рассыпется.
  if (!cnt)
      lock(rw->sem);
}

// Освобождение блокировки на чтение
//
void rw_unlockr(rwlock_t *rw) {
  if (atomic_fetch_sub(&rw->cnt, 1) == 1)
    unlock(rw->sem);
}

1 лайк

По мне, выбранный механизм неадекватен задаче.

Классическая задача читателей и писателей ставится в более общем виде и там, да. Здесь же более узкая задача и, применяя решение для более общего случая, мы получаем лишние, (не необходимые) тормоза.

Ну, может и не самый лучший пример, да

+1 . Классическая задача с семафорами.

1 лайк

А читатель захватывает доступ на 3 часа?
Чтобы прям ОДНОВРЕМЕННО читатели читали, нужна многопроцессорная система, не?

Любая кривая программа повиснет, даже блинк)))

1 лайк

ну да. ESP32, два ядра.

На самом деле этого мало.
Физически, как правило, есть только одна шина данных и одна шина адреса. Поэтому, сколько бы ни было ядер, читать/писать одновременно в разные (или в одно) места одного пула памяти невозможно. Есть, правда, исключения. Например, RAM и ROM, а также пул регистров, хотя и находятся внутри одного адресного пространства, могут быть подключены физически к разным шинам. Кроме того, существует т.н. двухпортовая память, которая позволяет одновременно читать содержимое памяти видеоадаптеру и читать/писать в тот же объем памяти процессору, но со стороны ЦПУ все равно доступ осуществляется через “одну дырочку”.

Именно.
“Прямая” программа должна понимать, что next, указывающий в никуда, индицирует конец списка.

2 лайка

Код, который я привел выше - плохой, негодный код. Он глючит при высокой нагрузке на многоядерной системе.

Вот исправленный. Его можно скопировать в файл с расширением .c и использовать как есть. Ну, допилить напильником для Си++ (обхявить функции как extern “C“ , например)

Комментарии написал ЧатГПТ (перевел мой корявый английский)

// -- Блокировка Readers/Writer (читатели/писатель) --
//
// Реализует классическую схему «много читателей, один писатель» — с приоритетом записи.
// Синхронизация реализована без блокировок, но также используется бинарный семафор,
// чтобы заставлять писателей ожидать: вместо активного ожидания в цикле vPortYield()
// мы захватываем семафор, фактически убирая задачу из списка активных у планировщика.
//
// Правила RWLock:
//
// 1. Ожидающие запросы на запись (/rw->wreq/) не дают новым читателям получить блокировку
//    (читатели будут ждать на /rw->sem/).
// 2. Писатели в очереди блокируются на семафоре, если есть активные читатели или другие писатели.
//
// Легко адаптируется под другие системы многозадачности,
// заменив sem_lock(), sem_unlock() и vPortYield().
//
//  Инициализация RWLock:
//  rwlock_t rw = RWLOCK_INITIALIZER_UNLOCKED;
//
//  Потоки-читатели:
//  ....
//  rw_lockr(&rw);
//  do_something();
//  rw_unlockr(&rw);
//
//  Потоки-писатели:
//  ....
//  rw_lockw(&rw);
//  do_something();
//  rw_unlockw(&rw);

#include <stdatomic.h>
#include <stdint.h>

#include <freertos/FreeRTOS.h>
#include <freertos/semphr.h>
#include <freertos/task.h>

// Тип RWLock:
// Объявление и инициализация: "static rwlock_t my_lock = RWLOCK_INITIALIZER_UNLOCKED;"

typedef struct {
  _Atomic uint32_t  wreq; // Число ожидающих запросов на запись (мягкий стоп для новых читателей)
  _Atomic int       cnt;  // <0 : блокировка записи, 0: свободно, >0: есть читатели
  SemaphoreHandle_t sem;  // бинарный семафор — объект ожидания
} rwlock_t;

#define RWLOCK_INITIALIZER_UNLOCKED { 0, 0, NULL }

#undef likely
#undef unlikely

#define unlikely(_X)   __builtin_expect(!!(_X), 0)
#define likely(_X)     __builtin_expect(!!(_X), 1)

// Захват семафора. Создаёт заблокированный семафор, если он ещё не создан.
//
#define sem_lock(_Name) { \
    if (unlikely(_Name == NULL)) \
      _Name = xSemaphoreCreateBinary(); \
    else \
      while (xSemaphoreTake(_Name, portMAX_DELAY) == pdFALSE) { } \
  }

// Освобождение семафора
//
#define sem_unlock(_Name) { \
    if (likely(_Name != NULL)) \
      xSemaphoreGive(_Name); \
  }

// void rw_lockw(rwlock_t *rw);
//
// Получение исключительного (писательского) доступа.
//
// Если есть активные читатели или писатели — поток блокируется на /rw->sem/,
// и выставляется намерение записи через /rw->wreq++/.
// Если никого нет — мы захватываем бинарный семафор /rw->sem/ и
// устанавливаем /cnt/ в отрицательное значение, указывая,
// что блокировка получена писателем.
//
void rw_lockw(rwlock_t *rw) {

  atomic_fetch_add_explicit(&rw->wreq, 1, memory_order_release); // Сигнал намерения записи
  sem_lock(rw->sem);                                             // Захват семафора (или блокировка)
  atomic_store_explicit(&rw->cnt, -1, memory_order_release);     // Отметить, что писатель получил блокировку
  atomic_fetch_sub_explicit(&rw->wreq, 1, memory_order_release); // Снять намерение записи
}

// void rw_unlockw(rwlock_t *rw)
//
// Освобождение писательской блокировки,
// ранее полученной rw_lockw(rwlock_t *).
//
// TODO: в DEBUG проверить, что /cnt/ == -1 (ожидаем такое значение здесь)
//
void rw_unlockw(rwlock_t *rw) {
  atomic_store_explicit(&rw->cnt, 0, memory_order_release); // /cnt/ == «нет читателей и писателей»
  sem_unlock(rw->sem);                                      // Разблокировать ожидающих читателей/писателей
}

// void rw_lockr(rwlock_t *rw)
//
// Получение общей (неисключительной) блокировки читателя.
// Первый читатель захватывает семафор, чтобы блокировать писателей.
//
// Этот тип блокировки используется чаще всего.
//
void rw_lockr(rwlock_t *rw) {

  int cnt;
  while ( true ) {

    // Цикл ожидания, если есть активный писатель или намерение записи
    //
    if (0 < atomic_load_explicit(&rw->wreq, memory_order_acquire) ||
        0 > (cnt = atomic_load_explicit(&rw->cnt, memory_order_acquire))) {
      // уступаем выполнение другим задачам
      vPortYield();
      continue;
    }

    // Пытаемся увеличить счетчик читателей
    if (atomic_compare_exchange_weak_explicit(&rw->cnt, &cnt, cnt + 1, memory_order_acq_rel, memory_order_relaxed)) {
      // первый читатель захватывает семафор
      if (cnt == 0)
        sem_lock(rw->sem);
      break;
    }
  }
}

// void rw_unlockr(rwlock_t *rw);
//
// Разблокировка читателя. Последний читатель освобождает семафор.
//
void rw_unlockr(rwlock_t *rw) {
  // Если мы были последним читателем — освободить семафор
  if (atomic_fetch_sub_explicit(&rw->cnt, 1, memory_order_acq_rel) == 1)
    sem_unlock(rw->sem);
}

Я недавно начал изучать FreeRTOS и тоже пытаюсь передать из одной задачи данные в несколько других задач.
Например, одна задача читает 4 датчика DS18B20 и помещает значения в очередь, и есть несколько задач, которые читают в разные моменты времени из очереди.
Пример. Задача читает далласы

float tempDS[4] = {0};
//читаем далласы и записываем значения в массив
xQueueOverwrite(tempDSQueue, tempDS); // перезаписываем данные поверх старых

в других задачах читаем из очереди, не удаляя данные, причем одни значения требуются в разных задачах.

float tempDS[4] = {0};
xQueuePeek(tempDSQueue, tempDS, 0);

Тестовый код работает нормально, но не уверен есть ли здесь подводные камни?
Будет ли это стабильно работать?

Нужна синхронизация. Потому что чтение и запись могут произойти одновременно. И тогда читатель может прочитать мусор

PS:

Наврал. Посмотрел сейчас, там все thread-safe. Не нужна дополнительная синхронизация.

А разве механизм очередей не так работает? Я глубоко не копал, но насколько знаю, запись в очередь одной задачей блокирует запись и чтение для других задач. Или я не прав?

1 лайк

прав-прав. я поспегил ответить, не уточнив

1 лайк

Так-то в esp-idf давно реализованы posix-threads со всеми плюшками, блэкджеком и девицами легкого поведения. Там по стандарту есть рид-райт лок.
Но каждый развлекается, как умеет! :wink: Светлой Хануки всем!

1 лайк

Т.е. ты очередь используешь просто как буффер, правильно?

Если так, то я бы сделал просто буфер (массив) и мутекс для синхронизации.

тамошний нам не подходит.

ну и попрактиковаться захотелось

PS: у меня с атомиками!

Ну это примерно как глобальный массив в скетче при процедурном программировании, к которому нет обращений в прерывании.
А с мутексом наверное проще объявить глобальные переменные для каждого далласа

float tempWater, tempAir, tempKotel;

или это колхоз?

Не очень понял о чём речь, но похоже на разумность. Общее правило – защищаемые (локируемые) сущности нужно дробить на как можно более мелкие части. Важно только, чтобы эти части были действительно самостоятельными.

Ну, вот, допустим, имеется массив и операции над его элементами. Если любая операция имеет дело только с одним (несколькими) элементами массива, то и локировать нужно один (несколько) элементов. А то некоторые всегда локируют массив целиком и процессы постоянно ждут друг друга, хотя и работают с разными элементами и ничуть не мешали бы друг другу, если бы работали параллельно.

2 лайка

Я вставлю 5 коп.
ЕСП32 - 32 разрядный процессор. В нем запись 32 битного слова и так атомарна. Вообще нет нужды ни в каких семафорах и бантиках с финтифлюшками, чтобы в одном потоке писать, а в другом читать такое значение. Даже трёх, если не имеет значение целостность всего комплекта, как выше написал наш ученый друг. НО речь идет о трёх Далласах… если одно значение будет на секунду старше… “второй свежести”, то мессир не расстроятся, надеюсь. :wink:

1 лайк

Вот главное применительно ко мне. Одна задача только пишет, а другие только читают. С этим теперь понятно-это одна операция.
Все оказывается просто.

добавлю: в AVR мы часто ловили глюки, если есть более одной задачи (например прерывания) которые работали даже с int, просто потому, что АВР-ки старые 8 битные. И “разрыв” потока управления мог прийтись посередине между записью первого и второго байта. И поток “читателя” - читал мусор.
Но новые МК, к счастью уже 32 разрядные и сохраняют слово атомарно, без разрыва в потоке управления.

1 лайк