Про списки, многозадачность, SMP и быстрый malloc(). Или не про это

Введение

Типичная ситуация:

Одна часть вашего кода аллоцирует структуру (назовём её “struct gpio_config”), заполняет её поля и передаёт указатель другой части кода. Та, в свою очередь, либо сразу обрабатывает полученную информацию, либо откладывает указатель “до лучших времён” и обрабатывает его позже.

Как бы то ни было, в конце концов этот указатель попадает в “free()”, и цикл повторяется:

malloc --> free --> malloc --> free --> ...

(Ну или “new/delete”, если вы пишете на C++.)

А что если нам нужна скорость?

Представим, что у нас сотни структур аллоцируются и освобождаются в секунду. В этом случае “malloc()” начинает давать о себе знать - он не быстрый. Плюс ко всему, при частом “malloc/free” можно упереться в фрагментацию кучи.

Поэтому в быстром или высоконагруженном коде так обычно не пишут.

А как тогда пишут?

Один из популярных подходов - повторное использование памяти.

Идея простая: память выделяют один раз, а вместо освобождения кладут “освобождённый” блок в список свободных. При следующей аллокации сначала смотрят в этот список, и если он не пуст - берут блок оттуда, не вызывая “malloc()”.

Так устроена работа с памятью, например:

  • в сетевом стеке FreeBSD (и производных, см. “mbuf”),
  • в Linux (см. “sk_buff”),
  • и даже lwIP делает что-то похожее (структура “pbuf”, если память не изменяет).

Примерно так это выглядит в псевдокоде:

// Тут мы храним ненужные структуры. Вдруг понадобятся?
static struct gpio_config *unused = NULL;


// Аллокация
//
static struct gpio_config *Get(void) {

  // если список пуст - malloc, если нет - берём из списка
  if (list_is_empty(unused))
    return malloc(...);
  else
    return list_get_first_element(unused);
}


// Освобождение
//
static void Put(struct gpio_config *gc) {
  list_push_item(unused, gc);   // кладём в список
}

Функция “Get()” создаёт структуры, функция “Put()” их “уничтожает”. Всё красиво и просто.

Реальность подкрадывается

А теперь внезапно задача усложняется, и у нас появляется:

  1. несколько потоков (задач / threads);
  2. несколько ядер.

Возникает логичный вопрос: как синхронизироваться?
Что будет, если две задачи на разных ядрах одновременно вызовут “Put()”?

Тут обычно сразу звучат ответы:

  • “Мутекс!”
  • “Семафор!”
  • или кто-нибудь с форума предложит “просто замаскировать прерывания” - тоже, в общем-то, синхронизация.

И да, в подавляющем большинстве случаев так и делают: вводят объект синхронизации - мутекс, семафор или критическую секцию.

Но есть нюанс

Мутексы и семафоры - не бесплатные:

  • они медленные
  • они потребляют память
  • на FreeRTOS (ESP32) каждый мутекс/семафор - это довольно крупная структура со своей мишурой
  • их нельзя захватывать из прерывания

Критическая секция работает быстро и почти не жрёт память, но выключает прерывания, что тоже далеко не всегда допустимо.

В итоге получается парадокс:
пытаясь уйти от медленного “malloc()”, мы упираемся в медленные объекты синхронизации.

А можно без всего этого?

Можно ли обойтись без мутексов и семафоров, при этом сохранив:

  • корректную синхронизацию,
  • высокую скорость,
  • возможность работы из прерываний?

Можно.

Для этого воспользуемся атомарными операциями из стандарта C11
(любителям C++ стоит смотреть в сторону “std::atomic”, по Си++ я живу в 2007 году).

Lock-free список свободных элементов

Наш псевдокод начинает выглядеть так:

// Список неиспользуемых / освобождённых структур (LIFO)
static _Atomic(struct gpio_config *) unused = NULL;

Аллокатор

static struct gpio_config *Get(void) {

  struct gpio_config *ret = NULL;

  // Пытаемся забрать элемент из списка unused.
  // Логика такая:
  //
  // if (unused != NULL) {
  //   ret = unused;
  //   unused = unused->next;
  //   return ret;
  // }
  //
  do {
    if (NULL == (ret = atomic_load_explicit(&unused, memory_order_relaxed)))
      break;

  } while (!atomic_compare_exchange_weak_explicit(
              &unused,
              &ret,
              ret->next,
              memory_order_acquire,
              memory_order_relaxed));

  // Если список был пуст - аллоцируем новую структуру
  if (!ret)
    ret = malloc(sizeof(struct gpio_config));

  return ret;
}

Деаллокатор

static void Put(struct gpio_config *gc) {

  // Кладём освобождаемую структуру в список unused
  // Логика вставки в голову списка:
  //
  // gc->next = unused;
  // unused = gc;
  //
  do {
    gc->next = atomic_load_explicit(&unused, memory_order_relaxed);
  } while (!atomic_compare_exchange_weak_explicit(
              &unused,
              &gc->next,
              gc,
              memory_order_release,
              memory_order_relaxed));
}

Что в итоге получилось?

  • Никаких мутексов, семафоров и критических секций

  • Синхронизация достигается за счёт CAS-циклов

  • Код корректно работает:

    • в многозадачном приложении\среде,
    • на нескольких ядрах,
    • и даже в прерывании

На основе этого кода можно написать generic-list, быстрый и без блокировок, работающий и в прерывании и где угодно.

Но это мы сделаем в следующий раз, когда будет время и желание :slight_smile:

Спасибо за внимание, если уж до сюда дочитали. Happy coding!

Вы не поверите, но именно так и работает пара malloc/free. В точности так – список свободных участков, берём всегда наименьший-достаточный из свободных, освобождение == подвешивание к списку свободных.

Вы перестарались в упрощении описания и “выплеснули ребёнка”. Описанный Вам менеджер памяти ничем не отличается от того, что есть в системе, который Вы называете “медленным”.

2 лайка

Ну как минимум отличается синхронизацией. Щас посмотрю, что там “на самом деле“. Во-вторых, я всегда забираю голову списка, а malloc() будет искать подходящий блок.

Но суть не в этом, а в безблокировочном подходе. Может и выплеснул. Может - нет. Надобно проверять.

Я посмотрел на ESP32. Даже если продраться сквозь врапперы, там в конце это:

/*
This function should not be called directly as it does not check for failure / call heap_caps_alloc_failed()
Note that this function does 'unaligned' alloc calls if alignment <= UNALIGNED_MEM_ALIGNMENT_BYTES (=4) as the
allocator will align to that value by default.
*/
HEAP_IRAM_ATTR NOINLINE_ATTR void *heap_caps_aligned_alloc_base(size_t alignment, size_t size, uint32_t caps)
{
    void *ret = NULL;

    // Alignment, size and caps may need to be modified because of hardware requirements.
    esp_heap_adjust_alignment_to_hw(&alignment, &size, &caps);

    // remove block owner size to HEAP_SIZE_MAX rather than adding the block owner size
    // to size to prevent overflows.
    if (size == 0 || size > MULTI_HEAP_REMOVE_BLOCK_OWNER_SIZE(HEAP_SIZE_MAX) ) {
        // Avoids int overflow when adding small numbers to size, or
        // calculating 'end' from start+size, by limiting 'size' to the possible range
        return NULL;
    }

    if (caps & MALLOC_CAP_EXEC) {
        //MALLOC_CAP_EXEC forces an alloc from IRAM. There is a region which has both this as well as the following
        //caps, but the following caps are not possible for IRAM.  Thus, the combination is impossible and we return
        //NULL directly, even although our heap capabilities (based on soc_memory_tags & soc_memory_regions) would
        //indicate there is a tag for this.
        if ((caps & MALLOC_CAP_8BIT) || (caps & MALLOC_CAP_DMA)) {
            return NULL;
        }
        caps |= MALLOC_CAP_32BIT; // IRAM is 32-bit accessible RAM
    }

    if (caps & MALLOC_CAP_32BIT) {
        /* 32-bit accessible RAM should allocated in 4 byte aligned sizes
         * (Future versions of ESP-IDF should possibly fail if an invalid size is requested)
         */
        size = (size + 3) & (~3); // int overflow checked above
    }

    for (int prio = 0; prio < SOC_MEMORY_TYPE_NO_PRIOS; prio++) {
        //Iterate over heaps and check capabilities at this priority
        heap_t *heap;
        SLIST_FOREACH(heap, &registered_heaps, next) {
            if (heap->heap == NULL) {
                continue;
            }
            if ((heap->caps[prio] & caps) != 0) {
                //Heap has at least one of the caps requested. If caps has other bits set that this prio
                //doesn't cover, see if they're available in other prios.
                if ((get_all_caps(heap) & caps) == caps) {
                    //This heap can satisfy all the requested capabilities. See if we can grab some memory using it.
                    // If MALLOC_CAP_EXEC is requested but the DRAM and IRAM are on the same addresses (like on esp32c6)
                    // proceed as for a default allocation.
                    if ((caps & MALLOC_CAP_EXEC) &&
                        ((!esp_dram_match_iram() && esp_ptr_in_diram_dram((void *)heap->start)) ||
                         (!esp_rtc_dram_match_rtc_iram() && esp_ptr_in_rtc_dram_fast((void *)heap->start)))) {
                        //This is special, insofar that what we're going to get back is a DRAM address. If so,
                        //we need to 'invert' it (lowest address in DRAM == highest address in IRAM and vice-versa) and
                        //add a pointer to the DRAM equivalent before the address we're going to return.
                        ret = aligned_or_unaligned_alloc(heap->heap, MULTI_HEAP_ADD_BLOCK_OWNER_SIZE(size) + 4,
                                                        alignment, MULTI_HEAP_BLOCK_OWNER_SIZE());  // int overflow checked above
                        if (ret != NULL) {
#if CONFIG_HEAP_TASK_TRACKING
                            heap_caps_update_per_task_info_alloc(heap,
                                                                 MULTI_HEAP_ADD_BLOCK_OWNER_OFFSET(ret),
                                                                 multi_heap_get_full_block_size(heap->heap, ret),
                                                                 get_all_caps(heap));
#endif

                            MULTI_HEAP_SET_BLOCK_OWNER(ret);
                            ret = MULTI_HEAP_ADD_BLOCK_OWNER_OFFSET(ret);
                            uint32_t *iptr = dram_alloc_to_iram_addr(ret, size + 4);  // int overflow checked above
                            CALL_HOOK(esp_heap_trace_alloc_hook, iptr, size, caps);
                            return iptr;
                        }
                    } else {
                        //Just try to alloc, nothing special.
                        ret = aligned_or_unaligned_alloc(heap->heap, MULTI_HEAP_ADD_BLOCK_OWNER_SIZE(size),
                                                        alignment, MULTI_HEAP_BLOCK_OWNER_SIZE());
                        if (ret != NULL) {
#if CONFIG_HEAP_TASK_TRACKING
                            heap_caps_update_per_task_info_alloc(heap,
                                                                 MULTI_HEAP_ADD_BLOCK_OWNER_OFFSET(ret),
                                                                 multi_heap_get_full_block_size(heap->heap, ret),
                                                                 get_all_caps(heap));
#endif

                            MULTI_HEAP_SET_BLOCK_OWNER(ret);
                            ret = MULTI_HEAP_ADD_BLOCK_OWNER_OFFSET(ret);
                            CALL_HOOK(esp_heap_trace_alloc_hook, ret, size, caps);
                            return ret;
                        }
                    }
                }
            }
        }
    }

    //Nothing usable found.
    return NULL;
}

Как-то некорректно сравнивать это и мою функцию Get(). Да еще и с мутексами там все в их malloc(). Может в linux/glibc и более продвинутый аллокатор, но у нас вот то, что выше. А не то, что в книжках.

malloc() медленный.

непозволительно медленный.

настолько медленный, что во всех почти операционках используются вышеописанные аллокаторы. Да, собственно, это можно замерить.

Нет. Посмотрите ту цитату, что я привёл и найдите там хоть что-то про синхронизацию. Она была ещё ДО разговоров о синхронизации, которые появились потом. Там же Вы там просто описали алгоритм стандартного менеджера памяти, подав его как нечто другое:

Но это в чистом виде malloc / free. Никаких отличий.

Ваше “кладут “освобождённый” блок в список свободных” – это ровно то же самое, что делает free.

А я и не сравниваю. Я обсуждаю то, что процитировал.

Вы всерьез полагаете, что такая ситуация типична для Ардуино?
Нет, я не портив обсуждения на специализированном форуме общих вопросов программирования, но давайте посмотрим, каковы же рекомендации?

То есть, “если Вы хотите, чтобы Ваши программы работали быстро, пользуйтесь конкретной версией конкретного языка программирования”.

А как мне воспользоваться этим советом, если я вообще использую другой ЯП?
Мне почему-то казалось, что общие подходы к оптимизации программ не должны зависеть от используемого языка. Если же они зависят, то это уже явно “не в ту степь”.

В принципе - да, подтверждаю, что задача по распределению памяти может сжирать львиную долю (читай: в т.ч. более 99%) процессорного времени, что выливается в замедлении выполняемого кода на порядки (притом, нередко больше, чем на один).

Могу рассказать на собственном примере участия в сетевом проекте, когда меня попросили написать программу переноса базы данных из одного формата в другой. Специфика заключалась в том, что если выходной формат был вполне стандартным - XML, то входной - нет. А современные программисты, привыкшие работать “крупнопанельными” методами, работать с нестандартными форматами не умеют.
Написал программу, она отработала у меня 3.5 секунды, отослал ее коллеге (исходная база была размера явно непригодного для пересылки по e-mail, но зато более или менее общедоступна, поэтому пересылал только саму программу).
Получаю ответ (опять же, по e-mail, не в чате): результат работы моей программы подвесил его программу - она ни на что не реагирует. Через несколько минут получаю следующее письмо: все нормально, программа “повисела” 10 минут, после чего стала исправно работать с новыми данными.
Ну то есть, чтобы прочитать некоторый объем данных из файла, преобразовать его к новому формату и записать новый файл моей программе потребовалось примерно в 170 раз меньше времени, чем другой программе, от которой требовалась лишь половина работы.
Анализ показал, что если я в своей программе использовал самописный класс для работы с XML, то коллега - стандартный майкрософтовский.
Правда, коллега не очень внимательно читал доки Майкрософт и не знал, что в Windows менеджер памяти может работать в одном из двух режимов: кроме режима по умолчанию, когда приоритет отводится экономии памяти, есть еще режим, который работает существенно быстрее, на за счет перерасхода памяти.
Но я думаю, если бы даже знал, времени работы сопоставимого с 3.5 секунды (1.75 секунды?) он бы все равно не добился.
Так из-за чего такая разница?
Дело в том, что я проектировал свой класс, исходя из вполне конкретного назначения - переноса информации из одной БД в другую. А Майкрософт писал универсальный класс, который должен поддерживать полный объем средств редактирования.
Существенная разница - в наличии или отсутствии функции удаления. Отказ от этой функции позволил не просто упростить код. Если у Майкрософт экземпляр класса - это каждая закорючка в файле, то у меня - файл целиком.
В результате на файл, содержащий несколько десятков тысяч блоков, каждый из которых состоит из тысяч элементов, у Майкрософта приходится столько же обращений к менеджеру памяти, а у меня - всего единицы: первый раз выделяется объем в 64К, после чего он при необходимости несколько раз удваивается.

Отсюда вывод: если хотите, чтобы Ваши программы работали быстро, тщательно анализируйте стоящую задачу и проектируйте решение наиболее пригодное для конкретного случая. А не пытайтесь воспользоваться тайными функциями специальной версии конкретного языка программирования.

2 лайка

C11 поддерживается практически всеми компиляторами языка Си. Это старый стандарт. Язык Си - стандарт для embedded.

Для Си++ все то же самое есть в стандарте C++11. Стандарту сто лет в обед :).

Если вы используете другие ЯП, то тогда нужно использовать их аналоги атомных операций. Я не уверен, есть ли в Python таковые. Впрочем, если вы используете другие (отличные от Си\Си++ и ассемблера) языки, то о скорости можно забыть. Ну Раст, разве что. Если ваш компилятор не компилирует в машинный код для прямого исполнения процессором - не нужны вам ни атомики ни оптимизация аллокаций: у вас и помимо этого будет, что оптимизировать.

ну конечно.

принял пакетик данных, обработал, выкинул.

Не нуачо?
Вполне себе ответ на вопрос.

Но, вообще-то, мой спич был о том, что прежде, чем садиться писать код, неплохо бы сначала пораскинуть извилинами. Хотя бы для того, чтобы получившаяся программа не дергала менеджер памяти по 1000000 раз каждую минуту.

Это в корне не согласуется с тем, что Вы писали ранее.
В последнем варианте “слабым звеном” будет канал передачи данных, поэтому об издержках менеджера памяти здесь можно забыть.
Не следует оптимизировать ту часть кода, которая не влияет на производительность.

Ой, не скажите! Особенно, если речь идёт о параллельности. Вы тут на очень тонкий лёд вступаете!

2 лайка

Да даже без параллельности: наивно считать, что инструмент вместо тебя решит твою задачу.

Я не об этом, а об отсутствии других быстрых языков.

А бывают быстрые языки? Я не знал.

Хотя может я не прав.

Теперь знаете.

Понятно.

Но я всегда считал, что люди, которые гордятся знанием С и заявляют, что занимаются системным программированием - просто сикают.

Нет. Проверка показала, что на 4х ядрах, при сильной нагрузке, аллокатор глючит. Глючит страшно (фэйл менее чем через 100 аллокаций\free).

Там ABA случается.

А именно: один поток (задача, на CPU0) начинает делать Put() :

do {
    gc->next = atomic_load_explicit(&unused, memory_order_relaxed);

и в этот самый момент (нагрузка большая), второе ядро делает :


a = Get(); // взяли памяти
b = Get(); // и еще чуток
Put(a);    // вернули первый блок назад

С точки зрения CPU0 ничего не поменялось - головой списка как был так и остался A. Произойдет CAS , но новая голова будет иметь неправильный →next. Но по факту, →next уже указывает куда попало: можно получить закольцованный список, или потерять блок из списка или в самом плохом раскладе список разрушается вовсе.

CAS на CPU0 пройдет без ошибок: ведь блок А с точки зрения CPU0 никуда не пропадал. Только CPU1 знает, что пока CPU0 раздумывал, список успели поменять.

Пока не придумал, как сделать чистый lockless менеджер. Остановился на гибриде: от атомиков отказался, сделал списки свободных блоков per-CPU. Используется критическая секция для списка блоков.

Правда, при таком подходе, если Get() будет делать всегда ядро 1 а Put() - ядро 0, то вся память утечет ко второму ядру и кончится. Чтобы этого не происходило, пришлось еще вводить переодическую балансировку свободных списков. Сейчас поотлаживаюсь и выложу уже рабочий код тогда, вместе с таймингами для стандартного malloc()/free() и моего put()/get()

Вот очередная итерация. lockless и след простыл, но зато все работает.

Быстрее, чем malloc()/free() в 6 раз на ESP32-S3 @ЕвгенийП


// Аллокатор блоков фиксированного размера для multitask/SMP.
//
// SMP/Thread safety:
// Протестирован на ESP32-S3 в среде Arduino (4 задачи на 2 ядра, миллион итераций, каждая итерация - рандомные 
//  get, put, delay и yield())
//
// Скорость: четыре задачи аллоцируют\освобождают серии по 7 блоков, 64 байта каждый, на двух ядрах, 10000 раз:
//
//  malloc()/free() => 1445 миллисекунд, в среднем
//  mb_get()/mb_put() => 245 миллисекунд, в среднем
//
// Объявление:
// static mb_pool my_pool;
//
// Инициализация:
// mb_initialize(&my_pool, sizeof(MyMessage), X, Y);  // X - ноль или максимальное кол-во блоков.
//                                                    // Y - ноль или количество блоков, которые нужно преаллоцировать
//
// Аллокация:
// a = mb_get(&my_pool);
// b = mb_get(&my_pool);
//
// Освобождение:
// mb_put(&my_pool, a);
//
//

#include <stdlib.h>
#include <string.h>
#include <stdatomic.h>

#include <Arduino.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>


// MB - сокращение от "memory block" (блок памяти).
//
// Когда MB возвращается в пул (через mb_put()),  его содержимое перезаписывается (overlay) структурой node_link.
// Это означает, что минимальный размер MB должен быть не меньше sizeof(void *).
//
struct node_link {
    struct node_link *next;
};


// Пул MB.
//
// Содержит:
//  per-CPU freelists (по одному списку на каждое ядро)
//  per-CPU критические секции (по одному spinlockу на список)
//
// Идея:
//  каждое ядро работает со своим списком
//  возврат блока всегда идёт в список владельца
//  никогда не берём больше одного lockа за раз
//
struct mb_pool {
    size_t            size;                        // Размер одного MB (без служебного байта)
    _Atomic(size_t)   count;                       // Сколько MB уже выделено через malloc()
    size_t            max_count;                   // Максимально допустимое число malloc()
    struct node_link *local[portNUM_PROCESSORS];  // Per-CPU freelist
    portMUX_TYPE      lock[portNUM_PROCESSORS];    // Per-CPU критические секции (на ESP32 это spinlockи)
};

#if __GNUC__
// Константный инициализатор.
//
// Пример использования:
//   static MB_POOL(tcpip_pool, 128, 0);
// size = 128 означает, что размер блоков = 128 байт
// max_count = 0 означает "без ограничения по вызову malloc".
//
#define MB_POOL(_Name, _Size, _MaxCount)               \
    struct mb_pool _Name = {                           \
        .size = (_Size),                               \
        .count = 0,                                    \
        .max_count = (_MaxCount),                      \
        .local[0 ... portNUM_PROCESSORS-1] = NULL,     \
        .lock[0 ... portNUM_PROCESSORS-1] =             \
            portMUX_INITIALIZER_UNLOCKED,              \
    }
#endif // в других компиляторах нету троеточия.

void mb_put(struct mb_pool *, void *);

// Динамическая инициализация пула.
//
// Аргументы:
//  /mb_size/   - размер одного блока
//  /max_count/ - если >0, ограничивает общее число malloc()
//  /reserve/   - сколько блоков зарезервировать заранее
//
// reserve + max_count ---> пул не зависимый от системного malloc().
//
//  Если malloc() на этапе reserve не сработал - это не ошибка, просто у вас кончилась память.
//  Данная функция не ждет память при OOM событиях, а просто пропускает резервирование очередного блока
//
bool mb_initialize(struct mb_pool *pool,
                   size_t mb_size,
                   size_t max_count,
                   size_t reserve) {

    if (!pool)
        return false;

    // Блок должен быть достаточного размера, чтобы вместить struct node_link
    // при возврате в пул.
    if (mb_size < sizeof(struct node_link))
        return false;

    // Если max_count задан - ограничиваем reserve
    if (max_count && reserve > max_count)
        reserve = max_count;

    pool->size = mb_size;
    pool->max_count = max_count;

    for (int i = 0; i < portNUM_PROCESSORS; i++) {
        pool->local[i] = NULL;
        //pool->lock[i]  = portMUX_INITIALIZER_UNLOCKED;
        spinlock_initialize(&pool->lock[i]);
    }

    // Предварительно резервируем блоки, если было попрошено.
    // Блоки равномерно распределяются между CPU, чтобы избежать перекоса freelistов.
    int cpu = 0;
    while (reserve-- > 0) {
        uint8_t *buf = (uint8_t *)malloc(pool->size + 1);
        if (buf) {
            buf[pool->size] = cpu++;   // записываем owner CPU
            if (cpu >= portNUM_PROCESSORS)
                cpu = 0;

            mb_put(pool, buf);
        }
    }

    return true;
}


// Выделение MB.
//
// Алгоритм:
//  1) Пытаемся взять блок из локального freelistа текущего CPU
//  2) Если список пуст  увеличиваем счётчик и идём в malloc()
// Вызывать из прерывания можно лишь тогда, когда исключен вызов к malloc() : (mb_initialize() вызван с лимитом, 
// и с преаллоком равным лимиту - память будет зарезервирована заранее и тогда вызывать из прерывания можно)
//
void *mb_get(struct mb_pool *mb) {

    struct node_link *n;
    uint8_t *ret = NULL;

    int cpu = xPortGetCoreID();

    // Fast-path: пробуем взять блок из локального списка
    portENTER_CRITICAL(&mb->lock[cpu]);

    if ((n = mb->local[cpu]) != NULL) {
        mb->local[cpu] = n->next;
        portEXIT_CRITICAL(&mb->lock[cpu]);

        ret = (uint8_t *)n;
        ret[mb->size] = cpu;   // помечаем владельца блока
        return ret;
    }

    portEXIT_CRITICAL(&mb->lock[cpu]);

    // Slow-path: freelist пуст, идём в malloc().
    // Атомарно увеличиваем счётчик маллоков, проверяя, не превышен ли лимит.
    size_t old;
    do {
        old = atomic_load(&mb->count); // TODO: relaxed?
        if (mb->max_count && old >= mb->max_count)
            return NULL;
    } while (!atomic_compare_exchange_weak(&mb->count, &old, old + 1));

    // Выделяем блок + 1 служебный байт под owner CPU.
    // Служебный байт расположен в конце, чтобы не ломать выравнивание указателя (malloc() возвращает выровненные адреса).
    ret = (uint8_t *)malloc(mb->size + 1);
    if (ret) {
        ret[mb->size] = cpu;
        return ret;
    }

    // malloc() не удался - откатываем счётчик
    atomic_fetch_sub(&mb->count, 1);
    return NULL;
}


// Возврат MB в пул.
//
// Указатель /p/ может быть NULL - в этом случае ничего не делаем.
//
// Блок возвращается в freelist того CPU, который его выделял (owner CPU), даже если mb_put()
// вызывается с другого ядра.
//
// Примечание:
//  В Xtensa-порту FreeRTOS portENTER_CRITICAL эквивалентен
//  portENTER_CRITICAL_ISR, поэтому функцию можно вызывать из ISR,
//  при условии что MB находится во внутренней RAM (а не в SPIRAM).
//
void mb_put(struct mb_pool *mb, void *p) {

    if (!p)
        return;

    struct node_link *n = (struct node_link *)p;
    uint8_t *c = (uint8_t *)p;
    uint8_t cpu = c[mb->size];

    // Проверяем корректность owner CPU
    if (cpu < portNUM_PROCESSORS) {

        portENTER_CRITICAL(&mb->lock[cpu]);

        n->next = mb->local[cpu];
        mb->local[cpu] = n;

        portEXIT_CRITICAL(&mb->lock[cpu]);

    }
#ifdef DEBUG
    else {
        abort();
    }
#endif
}

А меня-то чего в суе упоминать, я то чем виноват?

Про скорость замечание.

Пруф того, что моя реализация и то, что делает простой malloc()/free() - немного разные вещи.

А я где-то утверждал обратное?

Вы бы перечитали, что я Вам там писал, похоже Вы вообще не поняли о чём это было.