Асинхронность в NodeJS

Асинхронность в NodeJS

КодингКодинг

Для того, чтобы разобраться в многопоточности, для начала, нужно вообще понять что такое "поток". Если открыть википедию, дам 99% на то, что вы так и не поймёте что там описано. Какие-то ядра, какие-то задачи и тп. Поэтому попробую выразить всё что там описано простыми словами. Изначально термин thread (поток) произошёл от термина task (задача). Есть операционная система, она что-то должна сделать. Это и есть задача. Со временем развития, задачи усложнялись. Это уже не было разовой операцией - последовательное выполнение нескольких задач. Вот это и стали называть "потоком". И, как следствие, когда у нас появились многоядерные процессоры - мы смогли раскидывать такие задачи по свободным ядрам. Это если вкратце.

Если попробовать нарисовать пример в масштабах вебсайтов, то, представьте себе социальную сеть в которой вы одновременно смтрите видео, пишете другим пользователям сообщения и у вас обновляются ставки в букмекерской конторе. Что-то такое. Понятно, от жизни это далеко и, скорее всего, неприменимо. Почти все сайты обходятся без мультипоточности на внешней стороне. Но вот back end как раз таки обычно обрабатывает все запросы в отдельных потоках. Почему? Перечитайте написаное выше и сделайте выводы сами.

Синхронная натура NodeJS

Но вот природа самого JavaScript'a - однопоточна. Изначально это вообще был язык, который выполнял скрипт, делая элементарные манипуляции над вещами на страничках сайтов. Возможно, валидации и ещё что-то. В любом случае это не подразумевало то, что мультипоточность когда-то понадобится.

В одно прекрасное утро, а, может быть, вечер, Ryan Dahl встал и решил придумать как использовать JavaScript на back end. При этом важно подчеркнуть, что задумка была - не изменять концепт языка - однопоточность и асинхронность. Конечно, главная фишка в том, что теперь разработчики front end и back end писали на одном языке. Но и сама асинхронность открывала новые возможности.

Я всё это описал потому, чтобы сделался вывод: "Поскольку главная идея NodeJS - сохранить природу JavaScript - он никогда не станет многопоточным". Получается все приложения в Node.js однопоточные? Некоторые - да. На самом деле мы можем управлять процессами не изменяя природу JS. И об этом - эта статья. Научиться максимально пользоваться асинхронностью и потоками в NodeJS.

Контроль асинхронности

Первое и самое важное, что делает JS таким сильным - асинхронность. Мы можем последовательно выполнять код нашей программы, помещая асинхронные запросы в специальную Message Queue. Ещё это называют неблокирующим воодом-выводом (non blocking I/O). То есть, если у нас есть асинхронная операция вида I/O (например, запрос к базе данных) - она будет помещена в Message Queue. При этом JavaScript продолжит выполнять все операции, как и раньше, одним потоком, но мы уже не ждём конца операции - мы ждём события когда операция завершится. Когда закончится очередной Event Loop, сработает обработчик этого события из коллбека (если он есть). В итоге получается, что основной код выполняется в рамках одного потока, а всё в программе где мы ждём каких-то ответо - выполняется параллельно. Это по-простому. Все мы знаем что такое коллбеки и промисы.

Но тут есть два важных аспекта.

1. Не блокируй то что не нужно блокировать
Сегодня на Node.js часто встречаешь приложения, в которых код написан корректно, но природа асинхронности используется неверно. В общем, смотрите, есть такой код:

async function updateUser(userId, update) {
  const result = await UserModel.updateById(userId, update);
  await sendNotificationToUser(userId, 'USER_UPDATE', update);
  return { updatedRowsCount: result }; 
}

И у нас получается вот что. Мы обновляем пользователя - это необходимая операция. А потом отправляем ему нотификацию. И вот тут получается такой момент, что, пользователь - обновлён, мы уже готовы ему отдать ответ, но, вместо этого, мы ждём пока отправится нотификация. А пользователь подключил к нотификациям смс. Поэтому наш сервис пробует отправить смс через где-то-в-сети провайдера, тот пробует обработать, но у него там тоже сбой и операция повисла. А мы могли ещё 500 секунд назад отдать пользователю ответ! Но если убрать await - мы можем не дожидаться ответа.

async function updateUser(userId, update) {
  const result = await UserModel.updateById(userId, update);
  sendNotificationToUser(userId, 'USER_UPDATE', update);
  return { updatedRowsCount: result }; 
}

Исходя из этого я обращаю внимание на то, что асинхронность - это не "способ получить результат от стороннего API", а именно возможность выполнять I/O операции параллельно в коде программы.

2. Обработка синхронных нагрузок в Message Queue
Если вернуться к синхронности - тут всё кажется неоднозначным. Дело в том, что, когда мы забиваем в синхронность большие операции - наш код может надолго зависнуть. Представьте, ваш сервер заблокирован на 10 секунд! Самый популярный пример - работа с преобразованием данных полученных из базы или стороннего API. Например, мы получили данные с зашифрованными email.

async function getUsers() {
  const users = await UserModel.findAll();
  for (const user of users) {
    user.email = decryptEmail(user.email);
  }
}

Пока наша функция не доработает - никакой новый код не начнёт работу. Просто представим что мы получили пользователей миллион. Всё. Программа стала.

Решение - разбить код на небольшие кусочки и выполнять их после основной логики. Чтобы код выполнился после EventLoop'a - достаточно положить его в setTimeout с 0. Осталось нарубить его на кусочки. В Node.js мы можем использовать функцию setImmediate. По поведению она полностью аналогична в браузерах:

setTimeout(() => {}, 0) или setImmediate(myFunc)

То есть, каждый раз вызов нашего метода будет попадать в отдельный тик, выполнять небольшую операцию, при этом не блокируя код. В итоге это должно выглядеть как-то так:

async function getUsers() {
  const users = await UserModel.findAll();
  decryptUsersEmail(users);
}

function decryptUsersEmail(users) {
  if (!decryptUsersEmail.startIndex) {
    decryptUsersEmail.startIndex = 0;
  }
  const start = decryptUsersEmail.startIndex;
  const end = decryptUsersEmail.startIndex + 10;
  for (let i = start; i < end; i += 1) {
    if(!users[i]) {
      return true;
    }
    user[i].email = decryptEmail(user.email);
  }
  decryptUsersEmail.startIndex += 10;
  return setImmediate(decryptUsersEmail);
}

Там где мы проверяем есть ли стартовый индекс в начале - это мемоизация (мало ли кто-то не знает). Ну и вычисления наши всё равно будут занимать время. Поэтому такой ситуации следует избегать. Однако теперь наш код не блокируется вычислениями. Другие параллельные операции будут происходить как обычно. Это нам и было нужно.

p.s. Вот с промисами такое не работает. Как только промис выполнится - логика его then'able функции попадёт в Job Queue. А оттуда оно достанется сразу после окончания кода текущей функции и попадёт в call stack. То есть логика выполнится не дожидаясь завершения Event Loop'a и заблокирует нас :( Promise для нас не подходит. То же касается и async/await. Такая очередь называется Microtask Queue (также называют: Job Queue или Promise Jobs Queue)

Как стать фоновым процессом

Если вы пробовали самостоятельно воспроизвести описанное выше, вы уже могли ощутить, что любые вычисления существенно съедают память. И, даже если мы не блокируем ими - всё равно нагрузку хотелось бы вынести из нашего потока. То есть, нам нужен фоновый процесс, чтобы запустить наши вычисления, как отдельный task (см. выше) который отработает на любом из ядер нашего процессора и просто вернёт нам результат в коллбек. В идеале нам нужно получить что-то такое:

// Логика нашего `script.js` со своим environment без "общей" памяти.
const service = createService('script.js')
// Отправляем входные данные в data
service.compute(data, function(err, result) {
  // Получаем результат в коллбеке
})

В Node.js для этого есть background process'ы, которые мы можем получить через fork. Создадим логику, которую хотим вынести в отдельный процесс:

function longComputation(message) {
  const { someString } = JSON.parse(message);
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
    console.log(someString);
  };
  return sum;
};

process.on('spawn', longComputation);

Теперь достаточно его форкнуть в нашу программу.

const { fork } = require('child_process');
const data = { someString: 'Hello world!' };
const forked = fork('script.js', [JSON.stringify(data)], { detached: true });

Ещё сильной стороной у fork'a будет наличие каналов .send и .on через которые мы можем передавать данные между нашим основным и дочерним процессом. Например, если бы мы из нашего дочернего процесса захотели передавать в родительский каждую тысячную иттерацию её номер. Отправим из дочернего процесса сообщение:

function longComputation(message) {
  const { someString } = JSON.parse(message);
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
    console.log(someString);
    if (i % 1000 === 0) {
      process.send(`Hello world ${i} times`);
    }
  };
  return sum;
};

process.on('spawn', longComputation);

Теперь осталось лишь поймать наше сообщение в основной программе:

const { fork } = require('child_process');
const data = { someString: 'Hello world!' };
const forked = fork('script.js', [JSON.stringify(data)], { detached: true });
forked.on('message', (ourMessage) => {
  console.log(ourMessage);
});

Иногда ещё ловят код ответа дочернего процесса:

child.on('exit', code => {
  console.log(`Exit code is: ${code}`);
});

В общем, проблема вроде бы решена.

Масштабируемся в worker-farm

С форками есть одна незаметная проблема. Для того, чтобы создать форк - мы съедаем память. Сам форк - тоже съедает память. И у нас может возникнуть такая ситуация, когда в параллель пошли одинаковые запросы на большие вычисления. Часто это ещё и перевязано с запросами в базу данных. Как итог - создаются отдельные соединения с базой данных, ORM не справляется с нагрузкой, база данных - не успевает обрабатывать данные и выпадает. И процесс наш выпадает. И, что самое плохое, ++всё что мы делали - не завершилось++.

Логично, что полностью избежать проблемы не получится - мы всё таки хотим в вычислениях выносить логику в подпроцессы. Очевидным решением будет, чтобы однотипные task'и, которые мы создаём в подпроцессах, выполнялись друг за другом. И ещё хорошо было бы, чтобы они не пересоздавались - мы же используем всё те же настройки, зачем нам тратить на это память. Для этого в Node.js есть worker-farm.

npm i worker-farm

Принцип тот же. Мы подключаем модуль worker-farm и запрашиваем наш скрипт с помощью него. Столько раз, сколько захотим.

const workerFarm = require('worker-farm');

const forked = workerFarm(require.resolve('./script.js'));
for (let i = 0; i < 10; i+= 1) {
  const data = { someString: `Hello world ${i}!` };
  forked(JSON.stringify(data), function (err, result) {
    console.log(result);
    if (result === 5) {
      workerFarm.end(forked);
    }
  });
}

Я думаю код говорит сам за себя. Где это применять - тут уж на ваше усмотрение. Любое место где вам нужно или потенциально может быть много подпроцессов.

Да здравствуют worker_threads!

В январе 2020 года с версии 10.15.0 в Node.js появились worker_threads. Но до версии 11.7.0, чтобы его включить - вам понадобится включить флаг --experimental-worker. Иначе они будут недоступны. Почему это прорыв? Дело в том, что создание worker_thread'a дешевле создания fork'a. Намного!

Но тут есть один важный подводный камень. Они настолько удобны что мы можем их насоздавать столько, что они полностью забьют нашу систему. Поэтому даже в руководстве сказано - их вызовы нужно держать под контролем. Самый лучший способ, чем изобретать велосипед - взять готовый npm пакет, например, workerpool или thread-loader. Всё зависит от того, для чего вам нужен pool для worker'ов.

Как и для форков - в worker_thread'ах есть event-driven канал, а, значит, можно обмениваться сообщениями между кодом нашей основной программы и воркером. По аналогии с worker-farm мы создаём и worker_threads:

const { Worker } = require('worker_threads');

for (let i = 0; i < 10; i+= 1) {
  const data = { someString: `Hello world ${i}!` };
  const workerForTask = new Worker(require.resolve('./script.js'), data);
  workerForTask.on('message', (result) => {
    const data = result;
  });
  workerForTask.on('error', errorHandler);
  workerForTask.on('exit', (code) => {
    const logMessage = code ? `${data} ends with the error` : `${data} success`;
    logger.log(logMessage);
  });
}

Важный момент - данные которые ушли в воркер - это отдельная программа и они связи с нашими данными не имеют. Поэтому можно с ними делать что угодно. Отправим ответ назад из script.js.

const { workerData, parentPort } = require('worker_threads')

function longComputation() {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
    if (i % 1000 === 0) {
      workerData.someString += i;
    }
  };
  return sum;
};

longComputation();
parentPort.postMessage({ resultString: workerData.someString });

Понятно, что кейс когда мы обрабатываем тучу изображений и отслеживаем прогресс отсылая сообщения на каждый процент - это дело обычное. И так - с чем угодно.

Ещё в документации в примере вызов worker_treads обёрнут в Promise. В принципе, так делать вполне нормально, но всё - ситуационно. Всё таки, каждая работа с worker_treads требует изначальной проектировки. Поэтому стоит обязательно почитать документацию про них.

Что же про браузеры?

В браузерах у нас есть Web Workers, которые поддерживаются всеми современными браузерами. Это не тема этой статьи, поэтому описывать как они работают тут я не буду. Возможно позже посвящу этому статью. Скажу только, что их поведение похоже с поведением worker'ов в Node.js, но технический подход в работе с ними иной.

Итог

Правильно пользоваться асинхронностью - крайне важное умение для NodeJS разработчика. Но если мы делаем форк или воркер - вам сразу нужен хорошо настроенный логгер.