CAN service bus
CAN - сервисная шина управления внешними worker'ами VideoGrace. Она не переносит media и не заменяет пользовательский CommandLoop.
Основная задача CAN: головной сервер выдает задания сервисам и получает от них heartbeat, статус выполнения и события артефактов.
CAN используется там, где головному серверу нужен управляемый внешний исполнитель:
- RTC nodes для WebRTC endpoint'ов;
- recorder/transcoder/transcriber workers;
- диагностические и эксплуатационные workers;
- будущие consolidator/media сервисы.
Для клиента CAN не виден напрямую. Web-клиент работает с обычным CommandLoop, а core уже сам выбирает локальный gateway или внешнюю CAN-ноду.
Транспорт
Сервисы сами открывают исходящее WSS-соединение к головному серверу:
wss://<server>/can
Такой порядок важен эксплуатационно: recorder, tolmach, consolidator или RTC node могут находиться за NAT/firewall и не требуют входящих портов.
Авторизация
CAN использует отдельный service token для подключения worker'а к /can. Это не пользовательский access_token и не JWT пользователя.
Токен можно передать в HTTP upgrade:
Authorization: Bearer <service_token>
или в первом hello сообщении:
{
"type": "hello",
"token": "service-token",
"node_id": "recorder-01",
"role": "recorder",
"version": "3.0.2605xx",
"capabilities": ["recording", "storage_upload", "vad"]
}
Конфигурация:
[CAN]
Enabled = 1
ServiceToken = change-me
MaxJobs = 1000
MaxEventsPerJob = 100
FinishedJobTtlSec = 3600
Эквивалентные переменные окружения:
VG_CAN_ENABLED=1
VG_CAN_SERVICE_TOKEN=change-me
VG_CAN_MAX_JOBS=1000
VG_CAN_MAX_EVENTS_PER_JOB=100
VG_CAN_FINISHED_JOB_TTL_SEC=3600
Если ServiceToken пустой, CAN считается недоступным для production-сервисов. Если задан ServiceToken, endpoint /can может быть включен даже без явного Enabled = 1.
MaxJobs ограничивает количество задач в памяти сервера. Значение 0 или отрицательное значение отключает лимит, но для production это не рекомендуется. MaxEventsPerJob ограничивает историю событий одной задачи. FinishedJobTtlSec задает TTL для финальных задач completed, failed, cancelled.
Пользовательский токен для service jobs
CAN service token авторизует сам worker, но не дает ему права входить в пользовательские конференции. Если worker должен подключиться к CommandLoop как сервисный пользователь, сервер выдает отдельный короткий bearer token на основании записи в таблице service_accounts.
Для transcriber.start сервер поддерживает безопасный production-flow:
- Администратор создает сервисного пользователя, например
transcriber. - В БД добавляется
service_accounts.name = 'transcriber', привязанная кclient_idэтого пользователя. - Администратор ставит CAN job
transcriber.startбезpayload.auth. - Core выпускает короткий JWT для связанного пользователя и добавляет его в
payload.auth. - Transcriber логинится в обычный
CommandLoopпоauth.access_token, входит в конференцию и работает как receiver/AI observer.
Пример payload после инъекции токена:
{
"conference_tag": "ops",
"auth": {
"type": "bearer",
"access_token": "eyJ..."
},
"source": {
"type": "conference_audio"
}
}
Такой flow разделяет два уровня доверия:
VG_CAN_SERVICE_TOKENразрешает worker'у принимать задания;- пользовательский JWT из
service_accountsразрешает сервису выполнять действия внутри конференции.
Тестовый worker
Для проверки CAN-протокола есть диагностический сервис:
cmake --build build-mac --target CanWorker -j 4
./build-mac/Services/CanWorker/CanWorker \
--url wss://localhost/can \
--token change-me \
--node-id recorder-dev \
--role recorder \
--capabilities recording,storage_upload
Можно использовать переменные окружения:
VG_CAN_URL=wss://localhost/can \
VG_CAN_SERVICE_TOKEN=change-me \
VG_CAN_NODE_ID=recorder-dev \
VG_CAN_ROLE=recorder \
./build-mac/Services/CanWorker/CanWorker
Worker регистрируется, шлёт heartbeat, принимает job, отвечает job_ack и тестовым job_event.
RTC translator worker
Первый внешний RTC-worker собирается отдельным сервисом RtcTranslator. Он регистрируется в CAN как role=rtc-translator с capability rtc,webrtc,rtp_bridge.
Запуск локально:
cmake --build build-mac --target RtcTranslator -j 4
./build-mac/Services/RtcTranslator/RtcTranslator \
--url wss://localhost/can \
--token change-me \
--node-id rtc-dev \
--bind 0.0.0.0
Переменные окружения:
VG_CAN_URL=wss://localhost/can \
VG_CAN_SERVICE_TOKEN=change-me \
VG_CAN_NODE_ID=rtc-dev \
VG_WEBRTC_BIND_ADDRESS=0.0.0.0 \
VG_WEBRTC_ADVERTISE_ADDRESS=rtc-node.example.com \
VG_WEBRTC_PORT_RANGE_BEGIN=43000 \
VG_WEBRTC_PORT_RANGE_END=43999 \
./build-mac/Services/RtcTranslator/RtcTranslator
ICE address и UDP ports
RtcTranslator подключается к головному серверу сам, исходящим WSS на /can. Поэтому головному серверу не нужен IP-адрес RTC-ноды: он видит только node_id и открытую CAN-сессию.
Адрес, по которому браузер будет подключаться к RTC-ноде, появляется позже - в ICE candidate, который генерирует libdatachannel внутри RtcTranslator. Для локальной сети может хватить host candidate. Для production, NAT и VPS нужно явно задать публичный адрес RTC-ноды:
VG_WEBRTC_ADVERTISE_ADDRESS=rtc-node.example.com
Также нужно открыть и зафиксировать UDP-диапазон WebRTC:
VG_WEBRTC_PORT_RANGE_BEGIN=43000
VG_WEBRTC_PORT_RANGE_END=43999
Если нужно привязать ICE socket к конкретному локальному интерфейсу:
VG_WEBRTC_BIND_ADDRESS=0.0.0.0
Важно различать настройки:
VG_CAN_URL- адрес головного сервера, куда RTC-нода сама подключается по WSS.VG_CAN_NODE_ID- логический route handle внутри CAN, не сетевой адрес.VG_WEBRTC_BIND_ADDRESS- bind для WebRTC/ICE sockets.VG_WEBRTC_ADVERTISE_ADDRESS- публичный адрес, который будет подставлен в ICE candidate и должен быть достижим браузерами.VG_WEBRTC_PORT_RANGE_BEGIN/END- UDP-порты WebRTC/ICE, которые должны быть открыты на firewall/NAT.translator_host- адрес core/RTP translator, который core передает внешним RTC-нодам вrtc.endpoint.start; берется изVG_ADDR/[Network] Address.translator_port- не настройка RTC-ноды; порт приходит в каждомrtc.endpoint.startот core и соответствует конкретному translator/media binding.
Если VG_WEBRTC_ADVERTISE_ADDRESS не задан, bridge попробует использовать VG_ADDR / [Network] Address. Если и они не заданы, клиенту уйдет адрес, который нашел libdatachannel; за NAT это часто приватный 192.168.x.x/10.x.x.x, и браузер снаружи до RTC-ноды не достучится.
RTP path и adjust packet
Внешний RtcTranslator не выбирает порт RTP-транслятора сам. Для каждого endpoint core присылает:
translator_host- адрес core/RTP translator, достижимый с RTC-ноды;translator_port- UDP-порт конкретного media translator binding.
Для subscribe-направления RTC-нода отправляет на translator_host:translator_port service RTP adjust packet с правильным receiver_ssrc. Core translator запоминает source address этого packet'а и дальше отправляет media на этот адрес. Поэтому адрес RTC-ноды не настраивается на core и не передается в конфиге.
Для publish-направления browser RTP после нормализации также уходит с RTC-ноды на translator_host:translator_port с author_ssrc, выданным core.
Если Network.Address не задан, core использует 127.0.0.1 как последний fallback. Это корректно только для локального/in-process режима, но не для удаленной RTC-ноды.
MVP job types:
rtc.probe- health/capability probe;rtc.endpoint.start- зарегистрировать RTC endpoint;rtc.endpoint.ice- добавить remote ICE candidate от браузера;rtc.endpoint.stop- остановить endpoint;rtc.endpoint.list- вернуть текущие endpoints.
RtcTranslator использует общий WebRTC/RTP bridge из Engine/RTC. Он принимает signaling через CAN, генерирует SDP answer и ICE candidates, а media path держит отдельно от головного HTTPS-сервера.
Архитектурная граница для media bridge:
- libdatachannel/WebRTC/RTP bridge живет в
Engine/RTC; - bridge не знает про
session_t,Proto::*, ControlWS или CAN; - серверный in-process gateway адаптирует ControlWS signaling к Engine bridge;
- внешний
RtcTranslatorадаптирует CAN signaling к тому же Engine bridge; - дублировать WebRTC stack в Server и Services нельзя.
Базовые сообщения
CAN принимает JSON text frames. Binary frames для CAN-протокола не используются.
Успешная регистрация:
{
"type": "hello_ok",
"server_time": 1779000000000,
"heartbeat_ms": 10000,
"node_id": "recorder-01",
"role": "recorder"
}
Heartbeat:
{
"type": "heartbeat",
"load": {
"cpu": 12,
"jobs": 1
}
}
Ответ сервера:
{
"type": "heartbeat_ok",
"server_time": 1779000005000
}
Сервисные события:
{
"type": "job_event",
"job_id": "uuid",
"status": "artifact_ready",
"artifact": {
"blob_id": "uuid",
"kind": "mixed_recording",
"content_type": "video/mp4",
"size": 734003200
}
}
Задание от сервера сервису:
{
"type": "job",
"job_id": "8f6c6e...",
"job_type": "recording.start",
"server_time": 1779000007000,
"payload": {
"conference_tag": "ops",
"mode": "mix",
"storage": "https"
}
}
Подтверждение получения:
{
"type": "job_ack",
"job_id": "8f6c6e...",
"status": "accepted"
}
Статус job_ack становится текущим статусом задачи. Для обычного worker'а это чаще всего accepted, для быстрых задач допустимо сразу вернуть completed или failed.
Событие выполнения:
{
"type": "job_event",
"job_id": "8f6c6e...",
"status": "progress",
"message": "segment uploaded",
"data": {
"segment": 12
}
}
Сервер сохраняет status, message и data в истории задачи. Если status равен completed, failed или cancelled, задача становится финальной.
Отмена задания сервером:
{
"type": "job_cancel",
"job_id": "8f6c6e...",
"job_type": "recording.start",
"reason": "cancelled_by_admin",
"server_time": 1779000009000
}
Admin API MVP
Админский API использует обычный пользовательский bearer token с правами platform.admin или platform.owner.
Получить состояние CAN:
GET /api/v1.0?admin_can
Authorization: Bearer <user_access_token>
Поставить задачу:
POST /api/v1.0?admin_can_submit_job
Authorization: Bearer <user_access_token>
Content-Type: application/json
{
"job_type": "recording.start",
"target_role": "recorder",
"required_capability": "recording",
"payload": {
"conference_tag": "ops",
"mode": "mix"
}
}
Маршрутизация задания:
target_node_id- отправить конкретной ноде;target_role- выбрать online-ноду с нужной ролью;required_capability- дополнительно требовать capability;- если
target_node_idзадан, он имеет приоритет над выбором по роли; - если подходящей ноды нет, задача остаётся
pending; - при следующем
helloподходящего сервиса pending-задача будет отправлена автоматически.
Ответ успешной постановки содержит фактический статус задачи. Если worker уже online и подходит под фильтры, статус обычно сразу станет dispatched; иначе останется pending.
Отменить задачу:
POST /api/v1.0?admin_can_cancel_job
Authorization: Bearer <user_access_token>
Content-Type: application/json
{
"job_id": "8f6c6e...",
"reason": "cancelled_by_admin"
}
Семантика очереди:
- очередь находится в памяти процесса core;
- CAN не является Kafka/ZeroMQ/durable broker;
- после рестарта core live jobs теряются и должны быть пересобраны из актуального состояния конференций и worker'ов;
MaxJobsзащищает сервер от бесконечного роста очереди;MaxEventsPerJobзащищает от бесконечной истории событий в одной задаче;- финальные задачи удаляются после
FinishedJobTtlSec; pendingзадача отменяется локально без отправки worker'у;dispatched/acceptedзадача получаетjob_cancel, если assigned-нода сейчас online;completed,failed,cancelled- финальные состояния;- поздние события от worker'а после финального состояния сохраняются в истории, но не меняют итоговый статус.
Жизненный цикл live job:
| Статус | Источник | Значение |
|---|---|---|
pending |
core | Задача создана, подходящего online worker'а пока нет. |
dispatched |
core | Задача отправлена выбранному worker'у. |
accepted |
worker | Worker подтвердил получение через job_ack. |
completed |
worker | Задача завершена успешно. |
failed |
worker | Задача завершена ошибкой. |
cancelled |
core/worker | Задача отменена администратором или сервисом. |
Worker может использовать промежуточные статусы в job_event, например progress, artifact_ready, answer, ice_candidate. Для core финальными считаются только completed, failed, cancelled.
WebRTC через CAN
WebRTC routing описан отдельно в WebRTC transport. На уровне CAN внешний RTC worker регистрируется так:
{
"type": "hello",
"token": "service-token",
"node_id": "rtc-fornex-1",
"role": "rtc-translator",
"capabilities": ["rtc", "webrtc", "rtp_bridge"]
}
Core публикует такие ноды клиентам как rtc_routes. Когда клиент отправляет webrtc_offer с выбранным rtc_node_id, core ставит CAN job rtc.endpoint.start.
Пример payload rtc.endpoint.start:
{
"job_type": "rtc.endpoint.start",
"target_node_id": "rtc-fornex-1",
"required_capability": "rtc",
"payload": {
"endpoint_id": "server-generated-endpoint-key",
"rtc_node_id": "rtc-fornex-1",
"client_endpoint_id": "video-subscribe-uuid",
"peer_id": "remote-video-1006-228800002",
"conference_tag": "radio",
"scope": "video-subscribe",
"sdp": "v=0...",
"device_id": 1006,
"author_ssrc": 1042,
"receiver_ssrc": 1044,
"translator_host": "core.example.com",
"translator_port": 5060
}
}
RTC worker должен поднять endpoint, связать WebRTC с RTP translator и вернуть SDP answer:
{
"type": "job_event",
"job_id": "rtc-offer-...",
"status": "answer",
"data": {
"endpoint_id": "server-generated-endpoint-key",
"peer_id": "remote-video-1006-228800002",
"conference_tag": "radio",
"scope": "video-subscribe",
"sdp": "v=0..."
}
}
Core преобразует это событие обратно в клиентскую команду webrtc_answer.
Remote ICE candidate от браузера передается RTC worker'у отдельной задачей:
{
"job_type": "rtc.endpoint.ice",
"target_node_id": "rtc-fornex-1",
"required_capability": "rtc",
"payload": {
"endpoint_id": "server-generated-endpoint-key",
"rtc_node_id": "rtc-fornex-1",
"peer_id": "remote-video-1006-228800002",
"conference_tag": "radio",
"scope": "video-subscribe",
"candidate": "candidate:...",
"sdp_mid": "0",
"sdp_mline_index": 0
}
}
ICE candidate от RTC worker'а возвращается через job_event:
{
"type": "job_event",
"job_id": "rtc-offer-...",
"status": "ice_candidate",
"data": {
"endpoint_id": "server-generated-endpoint-key",
"candidate": "candidate:...",
"sdp_mid": "0",
"sdp_mline_index": 0
}
}
При закрытии клиентской сессии или endpoint'а core отправляет:
{
"job_type": "rtc.endpoint.stop",
"target_node_id": "rtc-fornex-1",
"required_capability": "rtc",
"payload": {
"endpoint_id": "server-generated-endpoint-key",
"rtc_node_id": "rtc-fornex-1"
}
}
Если внешний RTC worker недоступен или endpoint не удается создать, core может использовать локальный WebRTC gateway или WSMedia fallback, в зависимости от текущей конфигурации транспорта.
Граница ответственности
CAN переносит только control plane:
- регистрация сервисной ноды;
- heartbeat и health;
- задания
start_recording,stop_recording,start_transcription,join_rtc_node; - статусы выполнения;
- события о создании артефактов.
CAN не переносит RTP/RTCP media, записи, большие blob'ы и пользовательские сообщения.
Media идет через RTP/WSM/WebRTC или будущий encrypted RTP tunnel. Файлы и записи сервисы загружают через HTTPS storage API.
Live jobs и artifact jobs
CAN jobs для RTC и записи являются live-командами. Они живут в памяти головного сервера и привязаны к текущему состоянию online-ноды:
- RTC node: поднять endpoint, остановить endpoint, переключить маршрут, запросить health;
- recorder: стартовать live-запись, остановить live-запись, сообщить готовность сырого артефакта;
- routing/moderation сервисы: выполнить текущую оперативную команду.
Такие задачи не persist'ятся в БД. После рестарта сервера или потери worker'а media-состояние считается потерянным и пересобирается из текущих подключений, а не из старой очереди.
Persistent pipeline начинается только после появления сырья в storage. Когда recorder прислал artifact_ready, сервер создает уже не live CAN job, а задачу постобработки артефакта:
- транскодинг;
- разбор дорожек;
- VAD-разметка;
- транскрипция;
- summary;
- экспорт.
Эти задачи должны храниться в БД, потому что их можно продолжить или повторить после рестарта. CAN в этом flow остается транспортом доставки задачи свободному worker'у, но источником истины является artifact pipeline.
Recorder flow
sequenceDiagram
participant R as Recorder
participant S as VGServer
participant Store as HTTPS Storage
participant P as Postprocess Worker
R->>S: WSS /can hello
S-->>R: hello_ok
S-->>R: job start_recording(session_id, mode, upload policy)
R-->>S: job_ack
R->>Store: POST /api/storage/uploads
R->>Store: PUT chunks
R->>Store: complete
R-->>S: job_event artifact_ready(blob_id, metadata)
S->>S: create persistent artifact jobs
S-->>P: CAN job transcode/transcribe/export
Текущее состояние
Реализован каркас:
- endpoint
/can; - bearer/token auth;
hello;heartbeat;- прием
job_ack,job_event,error; - registry online CAN sessions в
CanManager; - in-memory job registry;
- dispatch задач по
target_node_id,target_role,required_capability; - pending job dispatch при подключении подходящего worker'а;
GET admin_can;POST admin_can_submit_job;POST admin_can_cancel_job;- CAN stats в admin dashboard.
- очередь CAN jobs в admin dashboard с cancel для незавершенных задач.
Следующие шаги:
- service-token model с несколькими ключами и capability policy;
- recorder worker;
- запись артефактов в
conference_sessions / conference_artifacts / timeline_events; - persistent artifact pipeline для postprocess/transcription/export.