Sii Polska

SII UKRAINE

SII SWEDEN

  • Szkolenia
  • Kariera
Dołącz do nas Kontakt
Wstecz

Sii Polska

SII UKRAINE

SII SWEDEN

Wstecz
Orkiestracja za pomocą Snowflake Tasks

Im dłużej pracuję ze Snowflake, tym bardziej się w nim zakochuję. Na początku myślałem, że to tylko następna usługa hurtowni danych w chmurze. Dlaczego więc powinienem się nią przejmować, skoro mam usługi data lake i prawie każda z nich udostępnia jakąś końcówkę (endpoint) SQL? A w plikach .parque mogę nawet mieć transakcje ACID (jak użyję formatu delta).

W artykule nie będziemy rozstrzygać, kiedy Snowflake jest lepszym wyborem. Chodzi o to, że Snowflake jest zbudowany dla Inżynierów przez Inżynierów. Dlatego każdy doświadczony programista będzie nim zachwycony.

Taski i Task Graph

Jednym z takich przykładowych narzędzi jest funkcjonalność Tasków i Task Graph, który jest w sumie implementacją podejścia DAG (directed acyclic graph – skierowany graf acykliczny) w kontekście potoków danych.

Czym więc są Taski i Task Graph? Zgodnie z dokumentacją:

„Taski to funkcje zdefiniowane przez użytkownika do automatyzacji i harmonogramowania procesu”.

Mogą uruchamiać następujące elementy:

  • pojedyncze zapytania SQL,
  • procedury składowane,
  • logikę, używając Snowflake Scripting.

Taski mogą być wyzwalane poprzez harmonogram lub poprzez strumienie (Streams) w tabelach, co daje dodatkową możliwość budowania bardziej dynamicznych przepływów ETL.

Task Graph jest używany do powiązania wielu Tasków razem w jeden graf DAG w celu zarządzania zależnościami. Każdy Task Graph składa się z Root Taska, który jest początkiem sekwencji, oraz Tasków przypiętych do niego.

Task Graph składający się z Root Taska i Tasków przypiętych do niego
Ryc. 1 Task Graph składający się z Root Taska i Tasków przypiętych do niego

Możesz stworzyć wiele Task Graphów dla różnych celów, takich jak ładowanie dzienne i miesięczne czy ładowanie danych sprzedażowych lub magazynowych.

Root Task

Jak już wspominałem, Root Task może być wyzwalany na wiele sposobów:

  • uruchamiany zgodnie z ustalonym harmonogramem,
  • wyzwolony przez stream, tak by uruchomić przetwarzanie w przypadku zmiany danych,
  • uruchomiony ręcznie.

Aby stworzyć Root Taska, odwołamy się do oficjalnej dokumentacji Snowflake – CREATE TASK | Snowflake Documentation.

Create Root Task

CREATE OR REPLACE TASK SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK -- Name of the TASK objects
    WAREHOUSE = SFPOCWH -- Warehouse that would be used to run tasks 
    COMMENT = 'Place your comments'
    ALLOW_OVERLAPPING_EXECUTION = FALSE
    AS
        DECLARE
	-- declaring variables and its starting values
            run_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID'));
            root_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
            status VARCHAR(30);
            start_time     timestamp DEFAULT (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
            end_time       timestamp;
        BEGIN
            -- log starting new batch run
            INSERT INTO SFPOCDB.SFPOC_TWITTER_DATA.BATCH_LOG
                VALUES (:run_task_id, :root_task_id,'START',:start_time,null);
        END;

Root Task jest początkiem przetwarzania. Zazwyczaj używam go w celu stworzenia jakiegoś wstępnego logowania, na przykład o rozpoczęciu procesu ETL. Oczywiście można używać tabeli TASK_HISTORY, która jest wbudowana w Snowflake, ale zauważyłem, że wpisy tam pojawiają się z pewnym opóźnieniem.

SELECT * FROM TABLE(SFPOCDB.INFORMATION_SCHEMA.TASK_HISTORY());

Własna tabela do logowania

Zdecydowałem więc, że stworzę swoją własną tabelę do logowania, aby mieć większą elastyczność i zbierać wszystkie informacje o stanie zadań tak szybko, jak to możliwe

W tym celu wykorzystałem funkcję SYSTEM$TASK_RUNTIME_INFO:

  • SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID') – UUID obecnego uruchomienia ETL
  • SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID') – UUID identyfikujący ROOT TASK do którego TASK jest podpięty

Więcej informacji można znaleźć w dokumentacji Snowflake.

Stwórzmy kolejnego Taska w Task Graph, który jest zależny od Root Task. Nazwiemy go TASK_A.

Create TASK_A

CREATE OR REPLACE TASK SFPOCDB.SFPOC_TWITTER_DATA.TASK_A
    WAREHOUSE = SFPOCWH
    COMMENT = 'Task A in a Task Graph'
    AFTER SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK --names of the tasks, that are predecessors of current task, separated with comma (,)
    AS
        DECLARE
            run_task_id         text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID'));
            task_id             text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_NAME'));
            query_id            text;
            status              VARCHAR(30);
            start_time          timestamp DEFAULT (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
            end_time            timestamp;
            affected_records    int;
            error_number        int;
            error_message       text;
        BEGIN
            -- log procedure start
            INSERT INTO DB_AT_DEV.STAGING.BATCHJOBRUNLOG
                VALUES (:run_task_id, :task_id, null, 'START', :start_time, null, null, null, null );      
 
            -- call the procedure
            CALL procedure_schema.procedure_name();
 
            -- log end
            query_id := SQLID;
            end_time := (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
            affected_records := sqlrowcount;
            
            -- log procedure finish success
            
            UPDATE DB_AT_DEV.STAGING.BATCHJOBRUNLOG
                SET 
                    QUERY_ID = :query_id,
                    STATUS = 'FINISHED',
                    END_TIME = :end_time,
                    AFFECTED_RECORDS = :affected_records
                WHERE 
                    RUN_ID = :run_task_id
                    AND TASK_ID = :task_id;
                           
            -- log procedure finish error
        EXCEPTION
            WHEN OTHER THEN
                query_id := SQLID;
                affected_records := sqlrowcount; 
                end_time := (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
 
                UPDATE DB_AT_DEV.STAGING.BATCHJOBRUNLOG
                    SET 
                        QUERY_ID = :query_id,
                        STATUS = 'FAILED',
                        END_TIME = :end_time,
                        AFFECTED_RECORDS = :affected_records,
                        ERROR_NUMBER = 1,
                        ERROR_MESSAGE = :sqlerrm
                    WHERE 
                        RUN_ID = :run_task_id
                        AND TASK_ID = :task_id;
                           RAISE;               
        END;

W tym Tasku opakowałem procedurę SQL, która wykonuje właściwy ETL (polecenie CALL procedure_schema.procedure_name(); ) dodatkową logiką, tak aby utrzymywać logi dotyczące Tasków w osobnej tabeli.

Użyłem dodatkowo takich zmiennych jak:

  • SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_NAME') – nazwa obecnego Taska
  • SQLID – UUID identyfikujący Root Task, do którego Task jest podpięty
  • SQLROWCOUNT – ID ostatniego uruchomionego zapytania

Warto podkreślić, że to była specyfika mojego zadania – logować informację o statusie Taska do osobnej tabeli. Ale możesz użyć innej struktury logowania Taska, dodając dodatkowe informacje lub logikę do przetwarzania ETL, wciąż utrzymując logikę biznesową w osobnych procedurach. Możesz nawet dokonać refactoringu i opakować logikę techniczną w procedury lub funkcję.

Takie podejście pozwala na łatwiejsze utrzymanie kodu w przyszłości. Odseparowanie technicznej i biznesowej części rozwiązania naprawdę oszczędza czas podczas implementacji zmian i naprawiania błędów.

Każdy z Tasków musi być ustawiony w status RESUME, by został uruchomiony.

ALTER TASK DB_AT_DEV.STAGING.LOAD_INTEGRATION_DIM_CUSTOMER_TASK RESUME;

Zaraz po stworzeniu lub zmianie w Tasku, ten jest standardowo ustawiany w stan SUSPEND, co oznacza, że jest wstrzymany.

Możesz także zmienić status wszystkich zadań powiązanych z Root Task, używając SYSTEM$TASK_DEPENDENTS_ENABLE – więcej w dokumentacji Snowflake.

Tworzenie logu po uruchomieniu całego potoku ETL

Kolejną rzeczą, którą chcielibyśmy zrobić, jest stworzenie logu po uruchomieniu całego potoku ETL. W tym celu możemy użyć innego typu zadań – Finalize Task. To specjalny rodzaj zadań, który przyłącza się do Root Taska, ale uruchamia się zawsze po zakończeniu wszystkich zadań w Task Graph (niezależnie od tego, czy zakończą się sukcesem, czy błędem).

Create Finalize Task

CREATE OR REPLACE TASK SFPOCDB.SFPOC_TWITTER_DATA.FINALIZE_TASK
    WAREHOUSE = SFPOCWH
    COMMENT = 'Finalize Task'
    FINALIZE = SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK –- indicates it is finalize task
    AS
        DECLARE
            run_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID'));
            root_task_id text DEFAULT (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
            status VARCHAR(30);
            start_time     timestamp DEFAULT (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
            end_time       timestamp;
            nb_of_failed   INT;
        BEGIN
            end_time := (SELECT CONVERT_TIMEZONE('UTC', 'Europe/Vienna', CURRENT_TIMESTAMP()));
            --log starting new batch run
            -- check if there are no tasks finished with FAILED status
            nb_of_failed := (SELECT COUNT(1) FROM DB_AT_DEV.STAGING.BATCHJOBRUNLOG WHERE STATUS = 'FAILED' AND RUN_ID=:run_task_id);
            status := (SELECT IFF(:nb_of_failed>=1,'FAILED','FINISHED'));
            -- update log
            UPDATE DB_AT_DEV.STAGING.BATCHRUNLOG
                SET 
                    STATUS = :status,
                    END_TIME = :end_time
                WHERE RUN_ID = :run_task_id AND ROOT_TASK_ID = :root_task_id
                ;
        END;

Zadania Finalize Task są świetne, gdy musimy posprzątać i zamknąć proces ETL, dlatego że zawsze uruchomią się jako ostatnie. Ja używam ich do logowania czasu zakończenia przetwarzania i jego statusu.

Podglądanie potoku

Mamy kilka możliwości podejrzenia naszego potoku.

Pierwsza z nich to użycie metody Show Tasks:

SHOW TASKS;
Podglądanie potoku – metoda Show Tasks
Ryc. 2 Podglądanie potoku – metoda Show Tasks

Może być użyteczna, ale do lepszego poglądu na wszystkie zadania lepszy jest wbudowany wizualizer Task Graph:

Podglądanie potoku – wykorzystanie wbudowanego wizualizera Task Graph
Ryc. 3 Podglądanie potoku – wykorzystanie wbudowanego wizualizera Task Graph

Tutaj możecie sprawdzić detale i definicję zadania, ale również historię uruchomień, status i inne parametry.

Uruchamianie zadań

Jak już wspomniałem, zadania mogą być uruchamiane według zadanego harmonogramu (jest to definiowane podczas tworzenia lub zmiany Root Taska), wyzwalane przez strumienie danych (streams) lub uruchamiane ręcznie. Mogą być również konfigurowane w taki sposób, by wiele instancji tego samego zadania przebiegało równolegle.

Aby uruchomić zadanie ręcznie, należy użyć polecenia:

EXECUTE TASK SFPOCDB.SFPOC_TWITTER_DATA.ROOT_TASK 
oferty pracy

Podsumowanie

Jest też wiele innych powodów, dla których uważam Taski w Snowflake za świetne rozwiązanie do budowy procesów ETL po stronie bazy danych. Dodatkowo, procedury składowane mogą być pisane nie tylko w SQL, ale też w Javie, Javascript’cie, Pythonie lub Scali.

Jeżeli chciałbyś dowiedzieć się więcej, sprawdź dokumentację na Introduction to tasks | Snowflake Documentation.

***

Jeśli interesują Cię implementacje, przykłady i fragmenty kodu, zajrzyj również do innych artykułów z kategorii development na twardo.

4.9/5
Ocena
4.9/5
Avatar

O autorze

Krzysztof Saniak

Krzysztof jest architektem rozwiązań informatycznych z prawie 20-letnim doświadczeniem, specjalizującym się w platformach danych. Dostarcza innowacyjne rozwiązania w zakresie Big Data, uczenia maszynowego i technologii chmurowych, takich jak AWS i Azure. Pracował dla klientów najwyższej klasy, takich jak Philip Morris International, Danone i Hapag Lloyd, koncentrując się na budowaniu skalowalnych hurtowni danych i platform analitycznych. Pasjonuje się wykorzystywaniem nowych technologii do rozwiązywania rzeczywistych problemów biznesowych, zapewniając optymalną dostawę techniczną. Krzysztof jest również silnym liderem, znanym ze strategicznego podejmowania decyzji i podejścia opartego na współpracy w promowaniu innowacji i wzrostu

Wszystkie artykuły autora

Zostaw komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *

Może Cię również zainteresować

Dołącz do nas

Sprawdź oferty pracy

Pokaż wyniki
Dołącz do nas Kontakt

This content is available only in one language version.
You will be redirected to home page.

Are you sure you want to leave this page?