Ось задача: у вас є добре налагоджена програма, яка обробляє один потік. Вона використовує призначене ядро процесора і цикли, щоб досягти понад мілісекундну точність. Потім настає розуміння, що програма просто зациклює ядро процесора більшість часу чекаючи на момент часу, коли виконати маленький шматок роботи. Занадто грубо, правда? Тож ви вирішуєте вдосконалити програму, щоб обробляла кілька таких потоків одним і тим самим ядром: хай чекає менше, але виконує більше корисної роботи. Ось як це зробив я і які уроки виніс.

Початок

Щоб говорити предметно, розгляньмо головний цикл програми:

void StreamLoop() {
    while (true) {
        auto work = GetNext();
        busyWaitUntil(work.GetTime());
        work.DoIt();
    }
}

Пам’ятаймо, що це дуже велике спрощення. Насправді є кілька етапів виконання задачі з очікуваннями у різних місцях в межах однієї ітерації циклу. І код старий; напевне, в ньому розв’язано багато тонких проблем. Тож переписування — останнє до чого можна вдатися.

Підготовка

Отже, подумалось, мені напевне знадобиться вивернути програму навиворіт навколо циклів очікування. І використати boost coroutine2 замість викликання функцій, щоб передавати нитку виконання у потік і з потоку. Як то:

CoroT::pull_type stream(StreamLoop);  // запустити співфункцію
auto nextTime = stream.get();  // коли її виконати наступного разу

while (true) {
    busyWaitUntil(nextTime);   // зачекати, доки не прийде час
    stream.pull();             // віддати виконання у потік
    nextTime = stream.get();   // поновити очікування
}

Головний цикл потоку здебільшого залишається таким самим, проте, цикли очікування перетворюються на передачу в головну співфункцію:

void StreamLoop(CoroT::push_type yield) {
    while (true) {
        auto work = GetNext();
        yield(work.GetTime());  // <- єдиний змінений рядок
                                // віддати керування у зовнішній цикл
        work.DoIt();
    }
}

Все це простий рефакторинг. Кілька виправлень у системі збірці, і код проходить автоматичні тести. Поки що функціональність не змінилася.

Багатопотокове планування

Тепер легко налаштувати програму для обробки кількох циклів потоків. Просто треба вирішувати, який потік виконувати на кожній ітерації зовнішнього циклу. Оскільки нам відомий бажаний час виконання, можна використати чергу з пріоритетами як розклад чи план.

struct Task {
    CoroT::pull_type stream;
    TimeT nextTime;

    // Ініціалізувати співфункцію
    Task(int): stream(StreamLoop), nextTime(stream.get())
    {}

    // Впорядкувати за зростанням (нижчий пріоритет спочатку)
    bool operator<(const Task &o) const { return nextTime > o.nextTime; }
};

std::priority_queue<Task> schedule;
for (int i = 0; i < nStreams, ++i) {
    schedule.emplace(i);
}

while (true) {
    auto task = Dequeue(schedule);  // пересунути найранішу задачу

    busyWaitUntil(task.nextTime);   // зачекати, доки не настане бажаний час
    task.stream.pull();             // віддати виконання потокові
    task.nextTime = task.stream.get();   // поновити час очікування

    Enqueue(task, schedule);  // перепланувати задачу
}

Знову ж, кілька годин і тести проходять. Зважте, що старий код все ще не потрібно змінювати!

Ускладнення

Насправді це міг бути щасливий кінець історії. У ідеальному світі. Але в нашому випадку програма не була однонитковою. Щоб обійти обмеження планувальника процесів Windows, запускається кілька ниток. Цикл потоку працював в контексті цих різних ниток. Одна нитка 20 мс, інша 20 мс, потім перемикання назад до першої і так далі. У цьому випадку більше не можна використовувати boost coroutine: співфункції і ниткова безпека.

Тепер стає зрозуміло, що неминуче доведеться змінювати старий код. Проте все ж, це можна зробити охайно. Ціль — провести рефакторинг функції циклу потоку у функтор з притаманним станом.

struct StreamLoop {
    std::function<TimeT()> handler;  // активний обробник
    WorkT work;                      // автоматичні змінні стають частиною функтору

    StreamLoop() {
        // початковий стан
        handler = std::bind(&StreamLoop::FirstHalf, this);
    }

    TimeT operator()() {
        return handler();
    }

private:
    TimeT FirstHalf() {
        work = GetNext();
        handler = std::bind(&StreamLoop::SecondHalf, this);
        return work.GetTime();
    }

    TimeT SecondHalf() {
        work.DoIt();
        handler = std::bind(&StreamLoop::FirstHalf, this);
        return TimeT::now();
    }
}

Якість обслуговування

Компроміс, про який треба пам’ятати — якість обслуговування (QoS). Оскільки кілька незалежних потоків обробляються послідовно, може статися, що їхні часові інтервали невдало співпадають. Отже, так можуть з’явитися небажані затримки. Цю проблему можна насправді вирішити, придивившись до потоків ретельніше. Якщо можна рознести роботу потоків вибором початкової фази, нам пощастило.

Висновки

  • Задачу було розв’язано поступово, крок за кроком з ретельним тестуванням між різними етапами
  • Неоціненно мати модульні тести
  • Навіть якщо кофункції boost не можна було вжити у остаточному варіанті, вони допомогли створити прототип розв’язку і перевірити планувальник
  • Навіть коли доведеться переробити код, все одно може бути більш практично його дороблювати, а не розпочинати із самого початку.