📖 Kafka — Формат сообщений (Transfer Request)
Формат предназначен для описания задания на копирование, перемещение или синхронизацию данных между устройствами. В каждом сообщении описывается один исходный объект (файл или папка): конкретное устройство + относительный путь, и одно или несколько назначений. Если destinations не указано, DFMS добавляет объект в очередь и подбирает назначение по внутренним правилам (TransferRule). DFMS обрабатывает сообщение так: валидность → доступность → (destinations? запуск : постановка в очередь + правила). Если ресурсы (USB/хранилище/путь) временно недоступны, сообщение остаётся в ожидании и будет обработано после появления ресурсов.
1) Структура JSON
Вариант A: с явными назначениями (destinations)
{
	  "operation": "copy | move | sync",
	  "source": {
	    "type": "usb | storage (опционально)",
	    "id": "04-1671 | WDZW5PDG | TS1",
	    "path": "relative/path/to/object"
	  },
	  "destinations": [
	    { "type": "storage", "id": "TS1", "path": "target/folder" },
	    { "type": "usb", "id": "any", "path": "target/folder" }
	  ],
	  "strategy": "max_free_space | first_available | round_robin | fill_sequentially",
	  "priority": 10,
	  "tags": ["backup", "finance"],
	  "callbackUrl": "https://example.local/dfms/callback",
	  "kafkaTopic": "ops_status"
	}
Вариант B: без destinations (автовыбор по внутренним правилам)
{
	  "operation": "copy",
	  "source": {
	    "device": "04-1671",
	    "path": "exports/daily.json"
	  },
	  "priority": 100,
	  "tags": ["needs-processing"],
	  "callback": "https://example.local/dfms/callback",
	  "kafka_topic": "ops_status"
	}
2) Поля
3) Стратегии
По умолчанию (если strategy не указана) DFMS ведёт себя как first_available.
4) Callback / уведомления
Если задан callbackUrl/callback и/или kafkaTopic/kafka_topic, DFMS будет отправлять уведомления на стадиях: on_start, on_progress, on_complete, on_error.
HTTP callback: DFMS делает POST на callbackUrl (или callback) и добавляет заголовок X-DFMS-Stage со значением on_start/on_progress/on_complete/on_error. Если указан kafkaTopic/kafka_topic, такие же отчёты публикуются в указанный Kafka-топик (key по умолчанию: имя проекта). По умолчанию on_progress отправляется не чаще, чем раз в 10% прогресса.
В payload поле kafka — это метаданные входящего Kafka сообщения (topic/partition/offset/key), которое инициировало задание. Отчёты публикуются в тот топик, который указан в kafkaTopic/kafka_topic в исходном запросе.
Примеры callback/отчётов (payload)
События: transfer_started, transfer_progress, transfer_completed, transfer_error.
transfer_started
{
	  "event": "transfer_started",
	  "timestamp": "2026-02-02T10:30:00+03:00",
	  "station_id": "P100",
	  "job_id": 123,
	  "folder": {
	    "id": 6180,
	    "name": "PROJECT_001",
	    "tags": [1, 2],
	    "tags_names": ["needs-processing", "processed"]
	  },
	  "source_path": "/media/konstantin/04-1671/PROJECT_001",
	  "dest_path": "/media/konstantin/disk_3_tb/test_storage_1/kafka/inbox/PROJECT_001",
	  "operation_type": "copy",
	  "target": { "name": "Тестовое Хранилище 1", "type": "storage" },
	  "progress": 0.0,
	  "total_size": 123456789,
	  "transferred_size": 0,
	  "files_count": 23,
	  "status": "running",
	  "error_message": "",
	  "kafka": { "message_id": 999, "topic": "DFMS-CONSUMER", "partition": 0, "offset": 123, "key": "dfms-test-1700000000000-abcd" }
	}
transfer_progress
{
	  "event": "transfer_progress",
	  "timestamp": "2026-02-02T10:30:05+03:00",
	  "station_id": "P100",
	  "job_id": 123,
	  "folder": { "id": 6180, "name": "PROJECT_001", "tags": [1, 2], "tags_names": ["needs-processing", "processed"] },
	  "source_path": "/media/konstantin/04-1671/PROJECT_001",
	  "dest_path": "/media/konstantin/disk_3_tb/test_storage_1/kafka/inbox/PROJECT_001",
	  "operation_type": "copy",
	  "target": { "name": "Тестовое Хранилище 1", "type": "storage" },
	  "progress": 42.5,
	  "total_size": 123456789,
	  "transferred_size": 52428800,
	  "files_count": 23,
	  "status": "running",
	  "error_message": "",
	  "kafka": { "message_id": 999, "topic": "DFMS-CONSUMER", "partition": 0, "offset": 123, "key": "dfms-test-1700000000000-abcd" }
	}
transfer_completed
{
	  "event": "transfer_completed",
	  "timestamp": "2026-02-02T10:30:20+03:00",
	  "station_id": "P100",
	  "job_id": 123,
	  "folder": { "id": 6180, "name": "PROJECT_001", "tags": [1, 2], "tags_names": ["needs-processing", "processed"] },
	  "source_path": "/media/konstantin/04-1671/PROJECT_001",
	  "dest_path": "/media/konstantin/disk_3_tb/test_storage_1/kafka/inbox/PROJECT_001",
	  "operation_type": "copy",
	  "target": { "name": "Тестовое Хранилище 1", "type": "storage" },
	  "progress": 100.0,
	  "total_size": 123456789,
	  "transferred_size": 123456789,
	  "files_count": 23,
	  "status": "completed",
	  "error_message": "",
	  "kafka": { "message_id": 999, "topic": "DFMS-CONSUMER", "partition": 0, "offset": 123, "key": "dfms-test-1700000000000-abcd" }
	}
transfer_error
{
	  "event": "transfer_error",
	  "timestamp": "2026-02-02T10:30:20+03:00",
	  "station_id": "P100",
	  "job_id": 123,
	  "folder": { "id": 6180, "name": "PROJECT_001", "tags": [1, 2], "tags_names": ["needs-processing", "processed"] },
	  "source_path": "/media/konstantin/04-1671/PROJECT_001",
	  "dest_path": "/media/konstantin/disk_3_tb/test_storage_1/kafka/inbox/PROJECT_001",
	  "operation_type": "copy",
	  "target": { "name": "Тестовое Хранилище 1", "type": "storage" },
	  "progress": 12.0,
	  "total_size": 123456789,
	  "transferred_size": 12345,
	  "files_count": 23,
	  "status": "error",
	  "error_message": "I/O error: device disconnected",
	  "kafka": { "message_id": 999, "topic": "DFMS-CONSUMER", "partition": 0, "offset": 123, "key": "dfms-test-1700000000000-abcd" }
	}
5) Статусы обработки входящего сообщения
6) JSON Schema (draft-07)
{
	  "$schema": "http://json-schema.org/draft-07/schema#",
	  "type": "object",
	  "required": ["operation", "source"],
	  "additionalProperties": false,
	  "properties": {
	    "operation": { "type": "string", "enum": ["copy", "move", "sync"] },
	    "source": {
	      "type": "object",
	      "required": ["path"],
	      "additionalProperties": false,
	      "properties": {
	        "type": { "type": "string", "enum": ["usb", "storage"] },
	        "id": { "type": "string" },
	        "device": { "type": "string" },
	        "path": { "type": "string", "minLength": 1 }
	      }
	    },
	    "destinations": {
	      "type": "array",
	      "minItems": 0,
	      "items": {
	        "type": "object",
	        "required": ["type", "id", "path"],
	        "additionalProperties": false,
	        "properties": {
	          "type": { "type": "string", "enum": ["usb", "storage"] },
	          "id": { "type": ["string", "null"] },
	          "path": { "type": "string", "minLength": 1 }
	        }
	      }
	    },
    "strategy": {
      "type": "string",
      "enum": ["max_free_space", "first_available", "round_robin", "fill_sequentially"]
    },
	    "priority": { "type": "integer" },
	    "tags": { "type": "array", "items": { "type": "string" } },
	    "callbackUrl": { "type": "string", "format": "uri" },
	    "callback": { "type": "string", "format": "uri" },
	    "kafkaTopic": { "type": "string" },
	    "kafka_topic": { "type": "string" }
	  }
	}
7) Примеры
Копирование без назначения (DFMS выберет цель по внутренним правилам)
{
	  "operation": "copy",
	  "source": { "device": "04-1671", "path": "exports/daily.json" },
	  "priority": 100,
	  "tags": ["needs-processing"],
	  "kafka_topic": "ops_status"
	}
Копирование файла с USB (label) в storage (код)
{
	  "operation": "copy",
	  "source": { "type": "usb", "id": "04-1671", "path": "docs/reports/report.pdf" },
  "destinations": [
    { "type": "storage", "id": "TS1", "path": "reports/2023" }
  ],
  "priority": 1,
  "tags": ["finance", "2023"],
  "callbackUrl": "https://example.local/dfms/callback"
}
Перемещение папки из storage на любой USB
{
  "operation": "move",
  "source": { "type": "storage", "id": "TS2", "path": "photos/trip" },
  "destinations": [
    { "type": "usb", "id": "any", "path": "Pictures/TripBackup" }
  ],
  "priority": 5,
  "kafkaTopic": "ops_status"
}
Синхронизация папки на два USB (round_robin)
{
  "operation": "sync",
  "source": { "type": "storage", "id": "TS1", "path": "media/video" },
  "destinations": [
    { "type": "usb", "id": "MediaUSB1", "path": "backup/video" },
    { "type": "usb", "id": "MediaUSB2", "path": "backup/video" }
  ],
  "strategy": "round_robin",
  "tags": ["media", "video"],
  "callbackUrl": "https://example.local/dfms/callback",
  "kafkaTopic": "ops_status"
}