Skip to content

CAN service bus

CAN - сервисная шина управления внешними worker'ами VideoGrace. Она не переносит media и не заменяет пользовательский CommandLoop.

Основная задача CAN: головной сервер выдает задания сервисам и получает от них heartbeat, статус выполнения и события артефактов.

CAN используется там, где головному серверу нужен управляемый внешний исполнитель:

  • RTC nodes для WebRTC endpoint'ов;
  • recorder/transcoder/transcriber workers;
  • диагностические и эксплуатационные workers;
  • будущие consolidator/media сервисы.

Для клиента CAN не виден напрямую. Web-клиент работает с обычным CommandLoop, а core уже сам выбирает локальный gateway или внешнюю CAN-ноду.

Транспорт

Сервисы сами открывают исходящее WSS-соединение к головному серверу:

wss://<server>/can

Такой порядок важен эксплуатационно: recorder, tolmach, consolidator или RTC node могут находиться за NAT/firewall и не требуют входящих портов.

Авторизация

CAN использует отдельный service token для подключения worker'а к /can. Это не пользовательский access_token и не JWT пользователя.

Токен можно передать в HTTP upgrade:

Authorization: Bearer <service_token>

или в первом hello сообщении:

{
  "type": "hello",
  "token": "service-token",
  "node_id": "recorder-01",
  "role": "recorder",
  "version": "3.0.2605xx",
  "capabilities": ["recording", "storage_upload", "vad"]
}

Конфигурация:

[CAN]
Enabled = 1
ServiceToken = change-me
MaxJobs = 1000
MaxEventsPerJob = 100
FinishedJobTtlSec = 3600

Эквивалентные переменные окружения:

VG_CAN_ENABLED=1
VG_CAN_SERVICE_TOKEN=change-me
VG_CAN_MAX_JOBS=1000
VG_CAN_MAX_EVENTS_PER_JOB=100
VG_CAN_FINISHED_JOB_TTL_SEC=3600

Если ServiceToken пустой, CAN считается недоступным для production-сервисов. Если задан ServiceToken, endpoint /can может быть включен даже без явного Enabled = 1.

MaxJobs ограничивает количество задач в памяти сервера. Значение 0 или отрицательное значение отключает лимит, но для production это не рекомендуется. MaxEventsPerJob ограничивает историю событий одной задачи. FinishedJobTtlSec задает TTL для финальных задач completed, failed, cancelled.

Пользовательский токен для service jobs

CAN service token авторизует сам worker, но не дает ему права входить в пользовательские конференции. Если worker должен подключиться к CommandLoop как сервисный пользователь, сервер выдает отдельный короткий bearer token на основании записи в таблице service_accounts.

Для transcriber.start сервер поддерживает безопасный production-flow:

  1. Администратор создает сервисного пользователя, например transcriber.
  2. В БД добавляется service_accounts.name = 'transcriber', привязанная к client_id этого пользователя.
  3. Администратор ставит CAN job transcriber.start без payload.auth.
  4. Core выпускает короткий JWT для связанного пользователя и добавляет его в payload.auth.
  5. Transcriber логинится в обычный CommandLoop по auth.access_token, входит в конференцию и работает как receiver/AI observer.

Пример payload после инъекции токена:

{
  "conference_tag": "ops",
  "auth": {
    "type": "bearer",
    "access_token": "eyJ..."
  },
  "source": {
    "type": "conference_audio"
  }
}

Такой flow разделяет два уровня доверия:

  • VG_CAN_SERVICE_TOKEN разрешает worker'у принимать задания;
  • пользовательский JWT из service_accounts разрешает сервису выполнять действия внутри конференции.

Тестовый worker

Для проверки CAN-протокола есть диагностический сервис:

cmake --build build-mac --target CanWorker -j 4
./build-mac/Services/CanWorker/CanWorker \
  --url wss://localhost/can \
  --token change-me \
  --node-id recorder-dev \
  --role recorder \
  --capabilities recording,storage_upload

Можно использовать переменные окружения:

VG_CAN_URL=wss://localhost/can \
VG_CAN_SERVICE_TOKEN=change-me \
VG_CAN_NODE_ID=recorder-dev \
VG_CAN_ROLE=recorder \
./build-mac/Services/CanWorker/CanWorker

Worker регистрируется, шлёт heartbeat, принимает job, отвечает job_ack и тестовым job_event.

RTC translator worker

Первый внешний RTC-worker собирается отдельным сервисом RtcTranslator. Он регистрируется в CAN как role=rtc-translator с capability rtc,webrtc,rtp_bridge.

Запуск локально:

cmake --build build-mac --target RtcTranslator -j 4
./build-mac/Services/RtcTranslator/RtcTranslator \
  --url wss://localhost/can \
  --token change-me \
  --node-id rtc-dev \
  --bind 0.0.0.0

Переменные окружения:

VG_CAN_URL=wss://localhost/can \
VG_CAN_SERVICE_TOKEN=change-me \
VG_CAN_NODE_ID=rtc-dev \
VG_WEBRTC_BIND_ADDRESS=0.0.0.0 \
VG_WEBRTC_ADVERTISE_ADDRESS=rtc-node.example.com \
VG_WEBRTC_PORT_RANGE_BEGIN=43000 \
VG_WEBRTC_PORT_RANGE_END=43999 \
./build-mac/Services/RtcTranslator/RtcTranslator

ICE address и UDP ports

RtcTranslator подключается к головному серверу сам, исходящим WSS на /can. Поэтому головному серверу не нужен IP-адрес RTC-ноды: он видит только node_id и открытую CAN-сессию.

Адрес, по которому браузер будет подключаться к RTC-ноде, появляется позже - в ICE candidate, который генерирует libdatachannel внутри RtcTranslator. Для локальной сети может хватить host candidate. Для production, NAT и VPS нужно явно задать публичный адрес RTC-ноды:

VG_WEBRTC_ADVERTISE_ADDRESS=rtc-node.example.com

Также нужно открыть и зафиксировать UDP-диапазон WebRTC:

VG_WEBRTC_PORT_RANGE_BEGIN=43000
VG_WEBRTC_PORT_RANGE_END=43999

Если нужно привязать ICE socket к конкретному локальному интерфейсу:

VG_WEBRTC_BIND_ADDRESS=0.0.0.0

Важно различать настройки:

  • VG_CAN_URL - адрес головного сервера, куда RTC-нода сама подключается по WSS.
  • VG_CAN_NODE_ID - логический route handle внутри CAN, не сетевой адрес.
  • VG_WEBRTC_BIND_ADDRESS - bind для WebRTC/ICE sockets.
  • VG_WEBRTC_ADVERTISE_ADDRESS - публичный адрес, который будет подставлен в ICE candidate и должен быть достижим браузерами.
  • VG_WEBRTC_PORT_RANGE_BEGIN/END - UDP-порты WebRTC/ICE, которые должны быть открыты на firewall/NAT.
  • translator_host - адрес core/RTP translator, который core передает внешним RTC-нодам в rtc.endpoint.start; берется из VG_ADDR / [Network] Address.
  • translator_port - не настройка RTC-ноды; порт приходит в каждом rtc.endpoint.start от core и соответствует конкретному translator/media binding.

Если VG_WEBRTC_ADVERTISE_ADDRESS не задан, bridge попробует использовать VG_ADDR / [Network] Address. Если и они не заданы, клиенту уйдет адрес, который нашел libdatachannel; за NAT это часто приватный 192.168.x.x/10.x.x.x, и браузер снаружи до RTC-ноды не достучится.

RTP path и adjust packet

Внешний RtcTranslator не выбирает порт RTP-транслятора сам. Для каждого endpoint core присылает:

  • translator_host - адрес core/RTP translator, достижимый с RTC-ноды;
  • translator_port - UDP-порт конкретного media translator binding.

Для subscribe-направления RTC-нода отправляет на translator_host:translator_port service RTP adjust packet с правильным receiver_ssrc. Core translator запоминает source address этого packet'а и дальше отправляет media на этот адрес. Поэтому адрес RTC-ноды не настраивается на core и не передается в конфиге.

Для publish-направления browser RTP после нормализации также уходит с RTC-ноды на translator_host:translator_port с author_ssrc, выданным core.

Если Network.Address не задан, core использует 127.0.0.1 как последний fallback. Это корректно только для локального/in-process режима, но не для удаленной RTC-ноды.

MVP job types:

  • rtc.probe - health/capability probe;
  • rtc.endpoint.start - зарегистрировать RTC endpoint;
  • rtc.endpoint.ice - добавить remote ICE candidate от браузера;
  • rtc.endpoint.stop - остановить endpoint;
  • rtc.endpoint.list - вернуть текущие endpoints.

RtcTranslator использует общий WebRTC/RTP bridge из Engine/RTC. Он принимает signaling через CAN, генерирует SDP answer и ICE candidates, а media path держит отдельно от головного HTTPS-сервера.

Архитектурная граница для media bridge:

  • libdatachannel/WebRTC/RTP bridge живет в Engine/RTC;
  • bridge не знает про session_t, Proto::*, ControlWS или CAN;
  • серверный in-process gateway адаптирует ControlWS signaling к Engine bridge;
  • внешний RtcTranslator адаптирует CAN signaling к тому же Engine bridge;
  • дублировать WebRTC stack в Server и Services нельзя.

Базовые сообщения

CAN принимает JSON text frames. Binary frames для CAN-протокола не используются.

Успешная регистрация:

{
  "type": "hello_ok",
  "server_time": 1779000000000,
  "heartbeat_ms": 10000,
  "node_id": "recorder-01",
  "role": "recorder"
}

Heartbeat:

{
  "type": "heartbeat",
  "load": {
    "cpu": 12,
    "jobs": 1
  }
}

Ответ сервера:

{
  "type": "heartbeat_ok",
  "server_time": 1779000005000
}

Сервисные события:

{
  "type": "job_event",
  "job_id": "uuid",
  "status": "artifact_ready",
  "artifact": {
    "blob_id": "uuid",
    "kind": "mixed_recording",
    "content_type": "video/mp4",
    "size": 734003200
  }
}

Задание от сервера сервису:

{
  "type": "job",
  "job_id": "8f6c6e...",
  "job_type": "recording.start",
  "server_time": 1779000007000,
  "payload": {
    "conference_tag": "ops",
    "mode": "mix",
    "storage": "https"
  }
}

Подтверждение получения:

{
  "type": "job_ack",
  "job_id": "8f6c6e...",
  "status": "accepted"
}

Статус job_ack становится текущим статусом задачи. Для обычного worker'а это чаще всего accepted, для быстрых задач допустимо сразу вернуть completed или failed.

Событие выполнения:

{
  "type": "job_event",
  "job_id": "8f6c6e...",
  "status": "progress",
  "message": "segment uploaded",
  "data": {
    "segment": 12
  }
}

Сервер сохраняет status, message и data в истории задачи. Если status равен completed, failed или cancelled, задача становится финальной.

Отмена задания сервером:

{
  "type": "job_cancel",
  "job_id": "8f6c6e...",
  "job_type": "recording.start",
  "reason": "cancelled_by_admin",
  "server_time": 1779000009000
}

Admin API MVP

Админский API использует обычный пользовательский bearer token с правами platform.admin или platform.owner.

Получить состояние CAN:

GET /api/v1.0?admin_can
Authorization: Bearer <user_access_token>

Поставить задачу:

POST /api/v1.0?admin_can_submit_job
Authorization: Bearer <user_access_token>
Content-Type: application/json

{
  "job_type": "recording.start",
  "target_role": "recorder",
  "required_capability": "recording",
  "payload": {
    "conference_tag": "ops",
    "mode": "mix"
  }
}

Маршрутизация задания:

  • target_node_id - отправить конкретной ноде;
  • target_role - выбрать online-ноду с нужной ролью;
  • required_capability - дополнительно требовать capability;
  • если target_node_id задан, он имеет приоритет над выбором по роли;
  • если подходящей ноды нет, задача остаётся pending;
  • при следующем hello подходящего сервиса pending-задача будет отправлена автоматически.

Ответ успешной постановки содержит фактический статус задачи. Если worker уже online и подходит под фильтры, статус обычно сразу станет dispatched; иначе останется pending.

Отменить задачу:

POST /api/v1.0?admin_can_cancel_job
Authorization: Bearer <user_access_token>
Content-Type: application/json

{
  "job_id": "8f6c6e...",
  "reason": "cancelled_by_admin"
}

Семантика очереди:

  • очередь находится в памяти процесса core;
  • CAN не является Kafka/ZeroMQ/durable broker;
  • после рестарта core live jobs теряются и должны быть пересобраны из актуального состояния конференций и worker'ов;
  • MaxJobs защищает сервер от бесконечного роста очереди;
  • MaxEventsPerJob защищает от бесконечной истории событий в одной задаче;
  • финальные задачи удаляются после FinishedJobTtlSec;
  • pending задача отменяется локально без отправки worker'у;
  • dispatched/accepted задача получает job_cancel, если assigned-нода сейчас online;
  • completed, failed, cancelled - финальные состояния;
  • поздние события от worker'а после финального состояния сохраняются в истории, но не меняют итоговый статус.

Жизненный цикл live job:

Статус Источник Значение
pending core Задача создана, подходящего online worker'а пока нет.
dispatched core Задача отправлена выбранному worker'у.
accepted worker Worker подтвердил получение через job_ack.
completed worker Задача завершена успешно.
failed worker Задача завершена ошибкой.
cancelled core/worker Задача отменена администратором или сервисом.

Worker может использовать промежуточные статусы в job_event, например progress, artifact_ready, answer, ice_candidate. Для core финальными считаются только completed, failed, cancelled.

WebRTC через CAN

WebRTC routing описан отдельно в WebRTC transport. На уровне CAN внешний RTC worker регистрируется так:

{
  "type": "hello",
  "token": "service-token",
  "node_id": "rtc-fornex-1",
  "role": "rtc-translator",
  "capabilities": ["rtc", "webrtc", "rtp_bridge"]
}

Core публикует такие ноды клиентам как rtc_routes. Когда клиент отправляет webrtc_offer с выбранным rtc_node_id, core ставит CAN job rtc.endpoint.start.

Пример payload rtc.endpoint.start:

{
  "job_type": "rtc.endpoint.start",
  "target_node_id": "rtc-fornex-1",
  "required_capability": "rtc",
  "payload": {
    "endpoint_id": "server-generated-endpoint-key",
    "rtc_node_id": "rtc-fornex-1",
    "client_endpoint_id": "video-subscribe-uuid",
    "peer_id": "remote-video-1006-228800002",
    "conference_tag": "radio",
    "scope": "video-subscribe",
    "sdp": "v=0...",
    "device_id": 1006,
    "author_ssrc": 1042,
    "receiver_ssrc": 1044,
    "translator_host": "core.example.com",
    "translator_port": 5060
  }
}

RTC worker должен поднять endpoint, связать WebRTC с RTP translator и вернуть SDP answer:

{
  "type": "job_event",
  "job_id": "rtc-offer-...",
  "status": "answer",
  "data": {
    "endpoint_id": "server-generated-endpoint-key",
    "peer_id": "remote-video-1006-228800002",
    "conference_tag": "radio",
    "scope": "video-subscribe",
    "sdp": "v=0..."
  }
}

Core преобразует это событие обратно в клиентскую команду webrtc_answer.

Remote ICE candidate от браузера передается RTC worker'у отдельной задачей:

{
  "job_type": "rtc.endpoint.ice",
  "target_node_id": "rtc-fornex-1",
  "required_capability": "rtc",
  "payload": {
    "endpoint_id": "server-generated-endpoint-key",
    "rtc_node_id": "rtc-fornex-1",
    "peer_id": "remote-video-1006-228800002",
    "conference_tag": "radio",
    "scope": "video-subscribe",
    "candidate": "candidate:...",
    "sdp_mid": "0",
    "sdp_mline_index": 0
  }
}

ICE candidate от RTC worker'а возвращается через job_event:

{
  "type": "job_event",
  "job_id": "rtc-offer-...",
  "status": "ice_candidate",
  "data": {
    "endpoint_id": "server-generated-endpoint-key",
    "candidate": "candidate:...",
    "sdp_mid": "0",
    "sdp_mline_index": 0
  }
}

При закрытии клиентской сессии или endpoint'а core отправляет:

{
  "job_type": "rtc.endpoint.stop",
  "target_node_id": "rtc-fornex-1",
  "required_capability": "rtc",
  "payload": {
    "endpoint_id": "server-generated-endpoint-key",
    "rtc_node_id": "rtc-fornex-1"
  }
}

Если внешний RTC worker недоступен или endpoint не удается создать, core может использовать локальный WebRTC gateway или WSMedia fallback, в зависимости от текущей конфигурации транспорта.

Граница ответственности

CAN переносит только control plane:

  • регистрация сервисной ноды;
  • heartbeat и health;
  • задания start_recording, stop_recording, start_transcription, join_rtc_node;
  • статусы выполнения;
  • события о создании артефактов.

CAN не переносит RTP/RTCP media, записи, большие blob'ы и пользовательские сообщения.

Media идет через RTP/WSM/WebRTC или будущий encrypted RTP tunnel. Файлы и записи сервисы загружают через HTTPS storage API.

Live jobs и artifact jobs

CAN jobs для RTC и записи являются live-командами. Они живут в памяти головного сервера и привязаны к текущему состоянию online-ноды:

  • RTC node: поднять endpoint, остановить endpoint, переключить маршрут, запросить health;
  • recorder: стартовать live-запись, остановить live-запись, сообщить готовность сырого артефакта;
  • routing/moderation сервисы: выполнить текущую оперативную команду.

Такие задачи не persist'ятся в БД. После рестарта сервера или потери worker'а media-состояние считается потерянным и пересобирается из текущих подключений, а не из старой очереди.

Persistent pipeline начинается только после появления сырья в storage. Когда recorder прислал artifact_ready, сервер создает уже не live CAN job, а задачу постобработки артефакта:

  • транскодинг;
  • разбор дорожек;
  • VAD-разметка;
  • транскрипция;
  • summary;
  • экспорт.

Эти задачи должны храниться в БД, потому что их можно продолжить или повторить после рестарта. CAN в этом flow остается транспортом доставки задачи свободному worker'у, но источником истины является artifact pipeline.

Recorder flow

sequenceDiagram
    participant R as Recorder
    participant S as VGServer
    participant Store as HTTPS Storage
    participant P as Postprocess Worker

    R->>S: WSS /can hello
    S-->>R: hello_ok
    S-->>R: job start_recording(session_id, mode, upload policy)
    R-->>S: job_ack
    R->>Store: POST /api/storage/uploads
    R->>Store: PUT chunks
    R->>Store: complete
    R-->>S: job_event artifact_ready(blob_id, metadata)
    S->>S: create persistent artifact jobs
    S-->>P: CAN job transcode/transcribe/export

Текущее состояние

Реализован каркас:

  • endpoint /can;
  • bearer/token auth;
  • hello;
  • heartbeat;
  • прием job_ack, job_event, error;
  • registry online CAN sessions в CanManager;
  • in-memory job registry;
  • dispatch задач по target_node_id, target_role, required_capability;
  • pending job dispatch при подключении подходящего worker'а;
  • GET admin_can;
  • POST admin_can_submit_job;
  • POST admin_can_cancel_job;
  • CAN stats в admin dashboard.
  • очередь CAN jobs в admin dashboard с cancel для незавершенных задач.

Следующие шаги:

  • service-token model с несколькими ключами и capability policy;
  • recorder worker;
  • запись артефактов в conference_sessions / conference_artifacts / timeline_events;
  • persistent artifact pipeline для postprocess/transcription/export.