Перейти к содержанию

Потоки

Поток — это ELT процесс и основной исполнитель. Поток состоит из нод. Тип потока определяет, какие ноды используются и какие настройки доступны пользователю.

Типы потоков:

  • Integration flow
  • Transformation flow

В таблице представлены допустимые комбинации потоков и нод:

Flow type Node PostgreSQL ClickHouse Python
Integration flow Source node Yes No Yes
Integration flow Target node Yes Yes No
Transformation flow Source & Target node Yes Yes No

Если в имени потока задать слеши, то поток сохранится в папку:

my_flow/flow_1_integration

Integration flow

Интеграционный поток предназначен для поставки данных из одной системы в другую. Основная цель данного типа потока поставка, не трансформация. Трансформацию можно выполнять на стороне источника, но только в необходимых случаях. Интеграционный поток состоит из двух нод: ноды источника и ноды цели. В нодах создаются настройки. Source node → Target node.

Source Node

Source Node предназначена для задания настроек источника. Сначала нужно выбрать одно из Подключений, после система сгененрирует дополнительные поля.

PostgresSQL

Для типа подключения PostgreSQL можно настроить инкрементальную загрузку, с помощью поля Mode. Далее необходимо определить колонку для дельты, задать ее тип. Дату с которой будет начинаться перваначальная загрузка, размер пакета во временном измерении и при необходимости расширение слева для временного интервала. Так же рекомендуем задавать интервал справа для присоединения "хвостов" в основной пакет. Данный интервал задается в поле Tail merge seconds.

Описание полей
Поле Описание
Connection Имя подключения к базе данных источника (например, pg_Source)
Source schema name Имя схемы в базе данных, где находится исходная таблица
Source table name Наименование таблицы-источника для загрузки данных
Columns Список извлекаемых колонок из таблицы (можно обновить через интерфейс)
Source SQL При необходимости прописывается SQL трансформация, которая выполняется в источнике
Mode Режим загрузки данных: full (полная) или delta (инкрементальная)
Delta column Название колонки, по которой отслеживаются изменения для дельты
Delta column type Тип данных дельта-колонки (например, date или timestamp)
Delta init start Начальная дата или значение, с которого запустится первая инкрементальная загрузка
Batch grain Гранулярность пакетирования данных при загрузке (например, по дням)
Tail merge seconds Временной интервал в секундах для присоединения "хвоста" пакета к основному пакету
Safety seconds Количество секунд для расширения интервала влево с целью перезаписи возможных опоздавших данных

Python

Для типа подключения Python источник данных задается через пользовательский Python-скрипт. Такой вариант используется, когда данные нужно получить не напрямую из базы данных, а через API, файл, внешний сервис или другую нестандартную логику. Пользователь выбирает Python connection, затем выбирает один из загруженных файлов и функцию, которая будет первая запущена при запуске потока. Функция должна вернуть данные в виде обычного списка или DataFrame.

Описание полей
Поле Описание
Connection Python-подключение, через которое будет выполняться скрипт
Python script Существующий Python-скрипт, выбранный из рабочей папки скриптов
Name for new Python script Имя или путь для нового скрипта, если пользователь загружает файл
Function name Имя функции внутри Python-скрипта, которую должен вызвать Belt
Output columns Список колонок, которые возвращает Python-функция
Params JSON Дополнительные параметры в формате JSON, которые будут переданы в функцию
Mode Режим загрузки данных: full или delta
Delta column Логическая колонка, по которой строится инкрементальная загрузка
Delta column type Тип delta-колонки: например date, timestamp или int
Delta init start Начальное значение для первой инкрементальной загрузки
Batch grain Размер одного пакета данных, например day
Safety days Расширение интервала влево для повторного захвата данных и защиты от опоздавших записей

Target Node

Target Node предназначена для задания настроек цели. Сначала нужно выбрать одно из Подключений, после система сгененрирует дополнительные поля.

PostgresSQL

Для типа подключения PostgreSQL нужно указать схему и целевую таблицу, а все остальные настройаки опциональны.

PostgreSQL
Поле Обязательность Описание
Connection Да Имя подключения к целевой базе данных (например, pg_Target)
Target schema name Да Имя схемы в целевой базе данных, куда будут записываться данные
Journal table name Наименование таблицы для ведения журнала или логов загрузки (опционально)
Target table name Да Наименование целевой таблицы, в которую сохраняются данные
Truncate table before load Да Флаг полной очистки целевой таблицы перед началом каждой загрузки данных
Additional info(text, json) Дополнительная информация в JSON для сохранения в журнале (опционально)

ClickHouse

Для типа подключения ClickHouse нужно указать целевую таблицу, а все остальные настройаки опциональны.

ClickHouse
Поле Обязательность Описание
Connection Да Имя подключения к целевой базе данных
Target table name Да Наименование целевой таблицы, в которую сохраняются данные
Truncate table before load Да Флаг полной очистки целевой таблицы перед началом каждой загрузки данных
Additional info(text, json) Дополнительная информация в JSON для сохранения в журнале (опционально)

Transformation flow

Трансформационный поток предназначен для преобразования данных с помощью SQL в целевой системе. Основной принцип платформы оставить свободу для обработки данных. Поэтому трансформации выполняются в цели с помощью чистого SQL.

Интеграционный поток состоит из трех нод: источник, трансформация и цель. В трансформационном потоке данные перегружаются пакетами. Source node → SQL node → Target node

Source Node

Source node предназначена для задания настроек источника. Есть два режима delta и full. При delta режиме будут забираться только новые, еще не загруженные пакеты. При full режиме каждый раз забираются все данные.

PostgresSQL

Для источника типа Postgres необходимо указать схему, таблицу и режим загрузки.

ClickHouse

Для источника типа ClickHouse необходимо указать таблицу и режим загрузки.

Transform Node

В данной ноде одно единственное поле для SQL кода. Полная свобода действий. Кроме того, что для таблицы источника должно быть добавлено системное условие {where_cond_source}. Это условие содержит условие выбора конкретного пакета данных. Так же есть якорь, внутри которого SQL код выполняется перед выполнением основного SQL трансформации -- @pre_hook_start -- @pre_hook_end

Ниже представлен пример реализации модели трансформации SCD2 (Slowly Changing Dimension Type 2) на SQL:

-- @pre_hook_start
-- Закрываем старые версии записей, если данные изменились
UPDATE public.price_20 d
SET valid_to = n."date"::date
FROM (
    SELECT 
        code, 
        "date", 
        value
    FROM public.price_10
    {where_cond_source}
) n
WHERE d.code = n.code
  AND d.valid_to = '9999-12-31'::date
  AND d.value IS DISTINCT FROM n.value::float4
  AND n."date"::date > d.valid_from;
-- @pre_hook_end

-- Выбираем новые записи для вставки в целевую таблицу
SELECT
    code,
    value::float4 AS value,
    "date"::date AS valid_from,
    '9999-12-31'::date AS valid_to
FROM public.price_10
{where_cond_source}

Target Node

Target Node предназначена для задания настроек цели. Сначала нужно выбрать одно из Подключений, после система сгененрирует дополнительные поля.

PostgresSQL

Для цели типа PostgreSQL можно настроить действия при конфликте ключевых колонок: обновить запись или ничего не делать. Если в трансформации дельта колонка изменила название, то это обязательно нужно указать в соответсвующем поле.

PostgresSQL
Поле Описание
Connection Имя подключения к целевой базе данных (например, pg_Target)
Target schema name Имя схемы в целевой базе данных, куда будут записываться результаты трансформации
Journal table name Наименование таблицы для ведения журнала или логов загрузки (опционально)
Target table name Наименование результирующей таблицы, в которую сохраняются данные
Columns Список колонок целевой таблицы (можно обновить через интерфейс)
Truncate table before load Флаг полной очистки целевой таблицы перед началом выполнения трансформации
Additional info(text, json) Дополнительные параметры конфигурации в свободном текстовом формате или JSON (опционально)
Postgres version Версия используемой СУБД PostgreSQL для корректного синтаксиса запросов
Target delta column Колонка в целевой таблице для отслеживания инкрементальных изменений (опционально)
Key columns Ключевые колонки (бизнес-ключи) для определения уникальности записей при обновлении (опционально)
On conflict Стратегия разрешения конфликтов при совпадении ключевых значений, например nothing или update (опционально)

ClickHouse

Для цели типа ClickHouse настрйки простые. Как известно основные механизмы обработки закладываются в самом движке ClickHouse при создании таблицы. Если в трансформации дельта колонка изменила название, то это обязательно нужно указать в соответсвующем поле.

ClickHouse
Поле Описание
Connection Имя подключения к целевой базе данных ClickHouse (например, ch_mart)
Target table name Наименование целевой таблицы в ClickHouse, в которую сохраняются данные
Target delta column Колонка в целевой таблице для отслеживания инкрементальных изменений, если она отличается от исходной батч-колонки (опционально)
Columns Список колонок целевой таблицы (можно обновить через интерфейс)
Additional info(text, json) Дополнительные параметры конфигурации в свободном текстовом формате или JSON (опционально)