Потоки
Поток — это ELT процесс и основной исполнитель. Поток состоит из нод. Тип потока определяет, какие ноды используются и какие настройки доступны пользователю.
Типы потоков:
- Integration flow
- Transformation flow
В таблице представлены допустимые комбинации потоков и нод:
| Flow type | Node | PostgreSQL | ClickHouse | Python |
|---|---|---|---|---|
| Integration flow | Source node | Yes | No | Yes |
| Integration flow | Target node | Yes | Yes | No |
| Transformation flow | Source & Target node | Yes | Yes | No |
Если в имени потока задать слеши, то поток сохранится в папку:
my_flow/flow_1_integration
Integration flow
Интеграционный поток предназначен для поставки данных из одной системы в другую. Основная цель данного типа потока поставка, не трансформация. Трансформацию можно выполнять на стороне источника, но только в необходимых случаях. Интеграционный поток состоит из двух нод: ноды источника и ноды цели. В нодах создаются настройки. Source node → Target node.
Source Node
Source Node предназначена для задания настроек источника. Сначала нужно выбрать одно из Подключений, после система сгененрирует дополнительные поля.
PostgresSQL
Для типа подключения PostgreSQL можно настроить инкрементальную загрузку, с помощью поля Mode. Далее необходимо определить колонку для дельты, задать ее тип. Дату с которой будет начинаться перваначальная загрузка, размер пакета во временном измерении и при необходимости расширение слева для временного интервала. Так же рекомендуем задавать интервал справа для присоединения "хвостов" в основной пакет. Данный интервал задается в поле Tail merge seconds.
Описание полей
| Поле | Описание |
|---|---|
| Connection | Имя подключения к базе данных источника (например, pg_Source) |
| Source schema name | Имя схемы в базе данных, где находится исходная таблица |
| Source table name | Наименование таблицы-источника для загрузки данных |
| Columns | Список извлекаемых колонок из таблицы (можно обновить через интерфейс) |
| Source SQL | При необходимости прописывается SQL трансформация, которая выполняется в источнике |
| Mode | Режим загрузки данных: full (полная) или delta (инкрементальная) |
| Delta column | Название колонки, по которой отслеживаются изменения для дельты |
| Delta column type | Тип данных дельта-колонки (например, date или timestamp) |
| Delta init start | Начальная дата или значение, с которого запустится первая инкрементальная загрузка |
| Batch grain | Гранулярность пакетирования данных при загрузке (например, по дням) |
| Tail merge seconds | Временной интервал в секундах для присоединения "хвоста" пакета к основному пакету |
| Safety seconds | Количество секунд для расширения интервала влево с целью перезаписи возможных опоздавших данных |
Python
Для типа подключения Python источник данных задается через пользовательский Python-скрипт. Такой вариант используется, когда данные нужно получить не напрямую из базы данных, а через API, файл, внешний сервис или другую нестандартную логику. Пользователь выбирает Python connection, затем выбирает один из загруженных файлов и функцию, которая будет первая запущена при запуске потока. Функция должна вернуть данные в виде обычного списка или DataFrame.
Описание полей
| Поле | Описание |
|---|---|
| Connection | Python-подключение, через которое будет выполняться скрипт |
| Python script | Существующий Python-скрипт, выбранный из рабочей папки скриптов |
| Name for new Python script | Имя или путь для нового скрипта, если пользователь загружает файл |
| Function name | Имя функции внутри Python-скрипта, которую должен вызвать Belt |
| Output columns | Список колонок, которые возвращает Python-функция |
| Params JSON | Дополнительные параметры в формате JSON, которые будут переданы в функцию |
| Mode | Режим загрузки данных: full или delta |
| Delta column | Логическая колонка, по которой строится инкрементальная загрузка |
| Delta column type | Тип delta-колонки: например date, timestamp или int |
| Delta init start | Начальное значение для первой инкрементальной загрузки |
| Batch grain | Размер одного пакета данных, например day |
| Safety days | Расширение интервала влево для повторного захвата данных и защиты от опоздавших записей |
Target Node
Target Node предназначена для задания настроек цели. Сначала нужно выбрать одно из Подключений, после система сгененрирует дополнительные поля.
PostgresSQL
Для типа подключения PostgreSQL нужно указать схему и целевую таблицу, а все остальные настройаки опциональны.
PostgreSQL
| Поле | Обязательность | Описание |
|---|---|---|
| Connection | Да | Имя подключения к целевой базе данных (например, pg_Target) |
| Target schema name | Да | Имя схемы в целевой базе данных, куда будут записываться данные |
| Journal table name | Наименование таблицы для ведения журнала или логов загрузки (опционально) | |
| Target table name | Да | Наименование целевой таблицы, в которую сохраняются данные |
| Truncate table before load | Да | Флаг полной очистки целевой таблицы перед началом каждой загрузки данных |
| Additional info(text, json) | Дополнительная информация в JSON для сохранения в журнале (опционально) |
ClickHouse
Для типа подключения ClickHouse нужно указать целевую таблицу, а все остальные настройаки опциональны.
ClickHouse
| Поле | Обязательность | Описание |
|---|---|---|
| Connection | Да | Имя подключения к целевой базе данных |
| Target table name | Да | Наименование целевой таблицы, в которую сохраняются данные |
| Truncate table before load | Да | Флаг полной очистки целевой таблицы перед началом каждой загрузки данных |
| Additional info(text, json) | Дополнительная информация в JSON для сохранения в журнале (опционально) |
Transformation flow
Трансформационный поток предназначен для преобразования данных с помощью SQL в целевой системе. Основной принцип платформы оставить свободу для обработки данных. Поэтому трансформации выполняются в цели с помощью чистого SQL.
Интеграционный поток состоит из трех нод: источник, трансформация и цель. В трансформационном потоке данные перегружаются пакетами. Source node → SQL node → Target node
Source Node
Source node предназначена для задания настроек источника. Есть два режима delta и full. При delta режиме будут забираться только новые, еще не загруженные пакеты. При full режиме каждый раз забираются все данные.
PostgresSQL
Для источника типа Postgres необходимо указать схему, таблицу и режим загрузки.
ClickHouse
Для источника типа ClickHouse необходимо указать таблицу и режим загрузки.
Transform Node
В данной ноде одно единственное поле для SQL кода. Полная свобода действий. Кроме того, что для таблицы источника должно быть добавлено системное условие {where_cond_source}. Это условие содержит условие выбора конкретного пакета данных. Так же есть якорь, внутри которого SQL код выполняется перед выполнением основного SQL трансформации -- @pre_hook_start -- @pre_hook_end
Ниже представлен пример реализации модели трансформации SCD2 (Slowly Changing Dimension Type 2) на SQL:
-- @pre_hook_start
-- Закрываем старые версии записей, если данные изменились
UPDATE public.price_20 d
SET valid_to = n."date"::date
FROM (
SELECT
code,
"date",
value
FROM public.price_10
{where_cond_source}
) n
WHERE d.code = n.code
AND d.valid_to = '9999-12-31'::date
AND d.value IS DISTINCT FROM n.value::float4
AND n."date"::date > d.valid_from;
-- @pre_hook_end
-- Выбираем новые записи для вставки в целевую таблицу
SELECT
code,
value::float4 AS value,
"date"::date AS valid_from,
'9999-12-31'::date AS valid_to
FROM public.price_10
{where_cond_source}
Target Node
Target Node предназначена для задания настроек цели. Сначала нужно выбрать одно из Подключений, после система сгененрирует дополнительные поля.
PostgresSQL
Для цели типа PostgreSQL можно настроить действия при конфликте ключевых колонок: обновить запись или ничего не делать. Если в трансформации дельта колонка изменила название, то это обязательно нужно указать в соответсвующем поле.
PostgresSQL
| Поле | Описание |
|---|---|
| Connection | Имя подключения к целевой базе данных (например, pg_Target) |
| Target schema name | Имя схемы в целевой базе данных, куда будут записываться результаты трансформации |
| Journal table name | Наименование таблицы для ведения журнала или логов загрузки (опционально) |
| Target table name | Наименование результирующей таблицы, в которую сохраняются данные |
| Columns | Список колонок целевой таблицы (можно обновить через интерфейс) |
| Truncate table before load | Флаг полной очистки целевой таблицы перед началом выполнения трансформации |
| Additional info(text, json) | Дополнительные параметры конфигурации в свободном текстовом формате или JSON (опционально) |
| Postgres version | Версия используемой СУБД PostgreSQL для корректного синтаксиса запросов |
| Target delta column | Колонка в целевой таблице для отслеживания инкрементальных изменений (опционально) |
| Key columns | Ключевые колонки (бизнес-ключи) для определения уникальности записей при обновлении (опционально) |
| On conflict | Стратегия разрешения конфликтов при совпадении ключевых значений, например nothing или update (опционально) |
ClickHouse
Для цели типа ClickHouse настрйки простые. Как известно основные механизмы обработки закладываются в самом движке ClickHouse при создании таблицы. Если в трансформации дельта колонка изменила название, то это обязательно нужно указать в соответсвующем поле.
ClickHouse
| Поле | Описание |
|---|---|
| Connection | Имя подключения к целевой базе данных ClickHouse (например, ch_mart) |
| Target table name | Наименование целевой таблицы в ClickHouse, в которую сохраняются данные |
| Target delta column | Колонка в целевой таблице для отслеживания инкрементальных изменений, если она отличается от исходной батч-колонки (опционально) |
| Columns | Список колонок целевой таблицы (можно обновить через интерфейс) |
| Additional info(text, json) | Дополнительные параметры конфигурации в свободном текстовом формате или JSON (опционально) |