Im dłużej pracuję ze Snowflake, tym bardziej się w nim zakochuję. Na początku myślałem, że to tylko następna usługa hurtowni danych w chmurze. Dlaczego więc powinienem się nią przejmować, skoro mam usługi data lake i prawie każda z nich udostępnia jakąś końcówkę (endpoint) SQL? A w plikach .parque mogę nawet mieć transakcje ACID (jak użyję formatu delta).
W artykule nie będziemy rozstrzygać, kiedy Snowflake jest lepszym wyborem. Chodzi o to, że Snowflake jest zbudowany dla Inżynierów przez Inżynierów. Dlatego każdy doświadczony programista będzie nim zachwycony.
Taski i Task Graph
Jednym z takich przykładowych narzędzi jest funkcjonalność Tasków i Task Graph, który jest w sumie implementacją podejścia DAG (directed acyclic graph – skierowany graf acykliczny) w kontekście potoków danych.
Czym więc są Taski i Task Graph? Zgodnie z dokumentacją:
„Taski to funkcje zdefiniowane przez użytkownika do automatyzacji i harmonogramowania procesu”.
Mogą uruchamiać następujące elementy:
- pojedyncze zapytania SQL,
- procedury składowane,
- logikę, używając Snowflake Scripting.
Taski mogą być wyzwalane poprzez harmonogram lub poprzez strumienie (Streams) w tabelach, co daje dodatkową możliwość budowania bardziej dynamicznych przepływów ETL.
Task Graph jest używany do powiązania wielu Tasków razem w jeden graf DAG w celu zarządzania zależnościami. Każdy Task Graph składa się z Root Taska, który jest początkiem sekwencji, oraz Tasków przypiętych do niego.

Możesz stworzyć wiele Task Graphów dla różnych celów, takich jak ładowanie dzienne i miesięczne czy ładowanie danych sprzedażowych lub magazynowych.
Root Task
Jak już wspominałem, Root Task może być wyzwalany na wiele sposobów:
- uruchamiany zgodnie z ustalonym harmonogramem,
- wyzwolony przez stream, tak by uruchomić przetwarzanie w przypadku zmiany danych,
- uruchomiony ręcznie.
Aby stworzyć Root Taska, odwołamy się do oficjalnej dokumentacji Snowflake – CREATE TASK | Snowflake Documentation.
Create Root Task
CREATE OR REPLACE TASK SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK -- Name of the TASK objects
WAREHOUSE = SFPOCWH -- Warehouse that would be used to run tasks
COMMENT = 'Place your comments'
ALLOW_OVERLAPPING_EXECUTION = FALSE
AS
DECLARE
-- declaring variables and its starting values
run_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID'));
root_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
status VARCHAR(30);
start_time timestamp DEFAULT (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
end_time timestamp;
BEGIN
-- log starting new batch run
INSERT INTO SFPOCDB.SFPOC_TWITTER_DATA.BATCH_LOG
VALUES (:run_task_id, :root_task_id,'START',:start_time,null);
END;
Root Task jest początkiem przetwarzania. Zazwyczaj używam go w celu stworzenia jakiegoś wstępnego logowania, na przykład o rozpoczęciu procesu ETL. Oczywiście można używać tabeli TASK_HISTORY, która jest wbudowana w Snowflake, ale zauważyłem, że wpisy tam pojawiają się z pewnym opóźnieniem.
SELECT * FROM TABLE(SFPOCDB.INFORMATION_SCHEMA.TASK_HISTORY());
Własna tabela do logowania
Zdecydowałem więc, że stworzę swoją własną tabelę do logowania, aby mieć większą elastyczność i zbierać wszystkie informacje o stanie zadań tak szybko, jak to możliwe
W tym celu wykorzystałem funkcję SYSTEM$TASK_RUNTIME_INFO:
SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID')– UUID obecnego uruchomienia ETLSYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID')– UUID identyfikujący ROOT TASK do którego TASK jest podpięty
Więcej informacji można znaleźć w dokumentacji Snowflake.
Stwórzmy kolejnego Taska w Task Graph, który jest zależny od Root Task. Nazwiemy go TASK_A.
Create TASK_A
CREATE OR REPLACE TASK SFPOCDB.SFPOC_TWITTER_DATA.TASK_A
WAREHOUSE = SFPOCWH
COMMENT = 'Task A in a Task Graph'
AFTER SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK --names of the tasks, that are predecessors of current task, separated with comma (,)
AS
DECLARE
run_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID'));
task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_NAME'));
query_id text;
status VARCHAR(30);
start_time timestamp DEFAULT (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
end_time timestamp;
affected_records int;
error_number int;
error_message text;
BEGIN
-- log procedure start
INSERT INTO DB_AT_DEV.STAGING.BATCHJOBRUNLOG
VALUES (:run_task_id, :task_id, null, 'START', :start_time, null, null, null, null );
-- call the procedure
CALL procedure_schema.procedure_name();
-- log end
query_id := SQLID;
end_time := (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
affected_records := sqlrowcount;
-- log procedure finish success
UPDATE DB_AT_DEV.STAGING.BATCHJOBRUNLOG
SET
QUERY_ID = :query_id,
STATUS = 'FINISHED',
END_TIME = :end_time,
AFFECTED_RECORDS = :affected_records
WHERE
RUN_ID = :run_task_id
AND TASK_ID = :task_id;
-- log procedure finish error
EXCEPTION
WHEN OTHER THEN
query_id := SQLID;
affected_records := sqlrowcount;
end_time := (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
UPDATE DB_AT_DEV.STAGING.BATCHJOBRUNLOG
SET
QUERY_ID = :query_id,
STATUS = 'FAILED',
END_TIME = :end_time,
AFFECTED_RECORDS = :affected_records,
ERROR_NUMBER = 1,
ERROR_MESSAGE = :sqlerrm
WHERE
RUN_ID = :run_task_id
AND TASK_ID = :task_id;
RAISE;
END;
W tym Tasku opakowałem procedurę SQL, która wykonuje właściwy ETL (polecenie CALL procedure_schema.procedure_name(); ) dodatkową logiką, tak aby utrzymywać logi dotyczące Tasków w osobnej tabeli.
Użyłem dodatkowo takich zmiennych jak:
SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_NAME')– nazwa obecnego TaskaSQLID– UUID identyfikujący Root Task, do którego Task jest podpiętySQLROWCOUNT– ID ostatniego uruchomionego zapytania
Warto podkreślić, że to była specyfika mojego zadania – logować informację o statusie Taska do osobnej tabeli. Ale możesz użyć innej struktury logowania Taska, dodając dodatkowe informacje lub logikę do przetwarzania ETL, wciąż utrzymując logikę biznesową w osobnych procedurach. Możesz nawet dokonać refactoringu i opakować logikę techniczną w procedury lub funkcję.
Takie podejście pozwala na łatwiejsze utrzymanie kodu w przyszłości. Odseparowanie technicznej i biznesowej części rozwiązania naprawdę oszczędza czas podczas implementacji zmian i naprawiania błędów.
Każdy z Tasków musi być ustawiony w status RESUME, by został uruchomiony.
ALTER TASK DB_AT_DEV.STAGING.LOAD_INTEGRATION_DIM_CUSTOMER_TASK RESUME;
Zaraz po stworzeniu lub zmianie w Tasku, ten jest standardowo ustawiany w stan SUSPEND, co oznacza, że jest wstrzymany.
Możesz także zmienić status wszystkich zadań powiązanych z Root Task, używając SYSTEM$TASK_DEPENDENTS_ENABLE – więcej w dokumentacji Snowflake.
Tworzenie logu po uruchomieniu całego potoku ETL
Kolejną rzeczą, którą chcielibyśmy zrobić, jest stworzenie logu po uruchomieniu całego potoku ETL. W tym celu możemy użyć innego typu zadań – Finalize Task. To specjalny rodzaj zadań, który przyłącza się do Root Taska, ale uruchamia się zawsze po zakończeniu wszystkich zadań w Task Graph (niezależnie od tego, czy zakończą się sukcesem, czy błędem).
Create Finalize Task
CREATE OR REPLACE TASK SFPOCDB.SFPOC_TWITTER_DATA.FINALIZE_TASK
WAREHOUSE = SFPOCWH
COMMENT = 'Finalize Task'
FINALIZE = SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK –- indicates it is finalize task
AS
DECLARE
run_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID'));
root_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
status VARCHAR(30);
start_time timestamp DEFAULT (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
end_time timestamp;
nb_of_failed INT;
BEGIN
end_time := (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
--log starting new batch run
-- check if there are no tasks finished with FAILED status
nb_of_failed := (SELECT COUNT(1) FROM DB_AT_DEV.STAGING.BATCHJOBRUNLOG WHERE STATUS = 'FAILED' AND RUN_ID=:run_task_id);
status := (SELECT IFF(:nb_of_failed>=1,'FAILED','FINISHED'));
-- update log
UPDATE DB_AT_DEV.STAGING.BATCHRUNLOG
SET
STATUS = :status,
END_TIME = :end_time
WHERE RUN_ID = :run_task_id AND ROOT_TASK_ID = :root_task_id
;
END;
Zadania Finalize Task są świetne, gdy musimy posprzątać i zamknąć proces ETL, dlatego że zawsze uruchomią się jako ostatnie. Ja używam ich do logowania czasu zakończenia przetwarzania i jego statusu.
Podglądanie potoku
Mamy kilka możliwości podejrzenia naszego potoku.
Pierwsza z nich to użycie metody Show Tasks:
SHOW TASKS;

Może być użyteczna, ale do lepszego poglądu na wszystkie zadania lepszy jest wbudowany wizualizer Task Graph:

Tutaj możecie sprawdzić detale i definicję zadania, ale również historię uruchomień, status i inne parametry.
Uruchamianie zadań
Jak już wspomniałem, zadania mogą być uruchamiane według zadanego harmonogramu (jest to definiowane podczas tworzenia lub zmiany Root Taska), wyzwalane przez strumienie danych (streams) lub uruchamiane ręcznie. Mogą być również konfigurowane w taki sposób, by wiele instancji tego samego zadania przebiegało równolegle.
Aby uruchomić zadanie ręcznie, należy użyć polecenia:
EXECUTE TASK SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK

Podsumowanie
Jest też wiele innych powodów, dla których uważam Taski w Snowflake za świetne rozwiązanie do budowy procesów ETL po stronie bazy danych. Dodatkowo, procedury składowane mogą być pisane nie tylko w SQL, ale też w Javie, Javascript’cie, Pythonie lub Scali.
Jeżeli chciałbyś dowiedzieć się więcej, sprawdź dokumentację na Introduction to tasks | Snowflake Documentation.
***
Jeśli interesują Cię implementacje, przykłady i fragmenty kodu, zajrzyj również do innych artykułów z kategorii development na twardo.
Zostaw komentarz