W dzisiejszym świecie, gdzie dane generowane są w ogromnych ilościach w czasie rzeczywistym, przetwarzanie tych danych na bieżąco staje się kluczowe dla wielu firm. Google Cloud Storage i Google Pub/Sub to dwa potężne narzędzia, które mogą wspólnie efektywnie w tym pomóc.
W artykule omówię, jak zintegrować te usługi, aby stworzyć skalowalne i niezawodne rozwiązanie do przetwarzania danych na żywo.
Wprowadzenie do Google Cloud Storage i Pub/Sub
Google Cloud Storage to skalowalna usługa przechowywania obiektów, która umożliwia przechowywanie i dostęp do danych w dowolnym czasie i miejscu. Stanowi więc idealne miejsce do gromadzenia dużych ilości danych, takich jak pliki multimedialne, dokumenty i dane z aplikacji.
Google Cloud Pub/Sub to globalna usługa przesyłania wiadomości w czasie rzeczywistym, która umożliwia tworzenie, wysyłanie i odbieranie wiadomości między aplikacjami. Dzięki Pub/Sub można łatwo skalować przetwarzanie danych i tworzyć elastyczne architektury komunikacyjne.
Integracja Google Cloud Storage i Pub/Sub
Aby przetwarzać dane w czasie rzeczywistym za pomocą Google Cloud Storage i Pub/Sub, należy skonfigurować przepływ pracy, który pozwoli na przesyłanie danych z jednego systemu do drugiego.
Krok 1: Konfiguracja bucketa w Google Cloud Storage
- Zaloguj się do konsoli Google Cloud.
- Przejdź do sekcji „Storage” i utwórz nowy bucket.
- Skonfiguruj uprawnienia dostępu, aby zapewnić odpowiedni poziom bezpieczeństwa.
Krok 2: Konfiguracja tematu (ang. topic) w Google Pub/Sub
- Przejdź do sekcji „Pub/Sub” w konsoli Google Cloud.
- Utwórz nowy temat, który będzie służył do przesyłania wiadomości o nowych danych w bucketcie.
Krok 3: Skonfiguruj powiadomienia dla bucketa
- Przejdź do sekcji „Storage” i wybierz utworzony bucket.
- Skonfiguruj powiadomienia dla tego bucketa, aby wysyłał wiadomości do tematu Pub/Sub w przypadku dodania nowych plików.
Przykład konfiguracji za pomocą CLI:
gsutil notification create -t <PUBSUB_TOPIC> -f json gs://<BUCKET_NAME>
Przetwarzanie danych w czasie rzeczywistym
W tym punkcie omówię proces odbierania, przetwarzania i reagowania na dane w czasie rzeczywistym przy użyciu Google Cloud Storage i Pub/Sub.
Krok 4: Subskrypcja i przetwarzanie wiadomości
Aby przetwarzać dane w czasie rzeczywistym, musisz utworzyć subskrypcję na temat Pub/Sub i skonfigurować proces, który będzie odbierał i przetwarzał wiadomości.
Tworzenie subskrypcji:
- Przejdź do sekcji „Pub/Sub” i wybierz utworzony temat.
- W Google Cloud Console, na pasku nawigacyjnym po lewej stronie, kliknij „Pub/Sub”.
- Kliknij na nazwę tematu, który utworzyłeś wcześniej.
- Utwórz nową subskrypcję dla tego tematu.
- Kliknij „Create subscription”.
- Wprowadź nazwę subskrypcji.
- Wybierz typ dostarczania wiadomości (np.: „Pull” lub „Push”).
- Jeśli wybierzesz „Push”, podaj URL endpointu, do którego wiadomości mają być wysyłane.
- Kliknij „Create”.
Przykład konfiguracji za pomocą CLI:
gcloud pubsub subscriptions create <SUBSCRIPTION_NAME> --topic=<PUBSUB_TOPIC>
Krok 5: Przetwarzanie danych
Po utworzeniu subskrypcji, musisz skonfigurować aplikację, która będzie odbierać wiadomości z subskrypcji i przetwarzać dane w czasie rzeczywistym. Możesz użyć różnych narzędzi oraz frameworków, do których należą Cloud Functions, Dataflow, czy nawet własnych rozwiązań opartych na językach takich jak Python lub Java.
Przetwarzanie wiadomości w Pythonie
Poniżej przedstawię przykładowy kod w Pythonie, który odbiera wiadomości z subskrypcji i przetwarza je:
- Instalacja wymaganych bibliotek:
- Zainstaluj bibliotekę Google Cloud Pub/Sub:
pip install google-cloud-pubsub
- Przykład kodu do odbierania i przetwarzania wiadomości:
from google.cloud import pubsub_v1
# Inicjalizacja klienta Pub/Sub
subscriber = pubsub_v1.SubscriberClient()
subscription_path = 'projects/{project_id}/subscriptions/{subscription}'
def callback(message):
print(f'Received message: {message.data}')
# Przetwarzaj dane z wiadomości
# Na przykład, odczytaj dane z Google Cloud Storage i przeprowadź analizę
message.ack()
# Subskrybuj wiadomości i ustaw callback
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f'Listening for messages on {subscription_path}')
try:
# Przetwarzaj wiadomości do momentu przerwania
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
W powyższym przykładzie funkcja callback jest wywoływana za każdym razem, gdy nowa wiadomość jest odbierana z subskrypcji. W tej funkcji można umieścić logikę przetwarzania danych, taką jak odczytanie danych z Google Cloud Storage, przetworzenie ich i zapisanie wyników.
Przetwarzanie danych w Google Cloud Functions
Google Cloud Functions to usługa bezserwerowa, która pozwala na uruchamianie kodu w odpowiedzi na zdarzenia. Możesz użyć Cloud Functions do przetwarzania wiadomości z Pub/Sub.
- Tworzenie funkcji w Cloud Functions:
- Przejdź do sekcji „Cloud Functions” w konsoli Google Cloud.
- Kliknij „Create function”.
- Wybierz typ wyzwalacza jako „Cloud Pub/Sub”.
- Wybierz temat Pub/Sub, który będzie wyzwalał funkcję.
- Przykładowy kod funkcji:
import json
from google.cloud import storage
def process_pubsub(event, context):
# Odbierz wiadomość z Pub/Sub
message_data = base64.b64decode(event['data']).decode('utf-8')
message_json = json.loads(message_data)
# Przetwarzaj dane (na przykład, odczytaj plik z Cloud Storage)
client = storage.Client()
bucket_name = message_json['bucket']
file_name = message_json['name']
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(file_name)
data = blob.download_as_string()
# Przeprowadź dalszą analizę danych
print(f'Przetwarzanie pliku: {file_name} z bucketa: {bucket_name}')
print(f'Dane: {data}')
W tym przykładzie funkcja process_pubsub jest wywoływana, gdy nowa wiadomość jest odbierana z tematu Pub/Sub. Funkcja odczytuje dane z wiadomości, pobiera odpowiedni plik z Google Cloud Storage i przetwarza go.
Monitorowanie i optymalizacja
Aby zapewnić niezawodność i wydajność rozwiązania, należy monitorować działanie systemu i optymalizować go w razie potrzeby.
Monitorowanie:
- Google Cloud Monitoring:
- Skorzystaj z Google Cloud Monitoring, aby śledzić metryki takie jak opóźnienia w przetwarzaniu wiadomości, wykorzystanie zasobów i błędy.
- Ustaw alerty, aby być powiadamianym o potencjalnych problemach.
- Logowanie:
- Skonfiguruj logowanie dla swoich aplikacji przetwarzających dane, aby mieć wgląd w przebieg przetwarzania i łatwo diagnozować problemy.
Optymalizacja:
- Skalowanie:
- Skaluj swoje instancje przetwarzające dane w zależności od obciążenia. Google Cloud Functions i Cloud Run automatycznie skalują się w odpowiedzi na ruch.
- Optymalizacja kodu:
- Optymalizuj kod przetwarzający, aby minimalizować opóźnienia i zwiększać wydajność. Zadbaj o efektywne zarządzanie zasobami, takie jak pamięć i CPU.
Podsumowanie
Przetwarzanie danych w czasie rzeczywistym z użyciem Google Cloud Storage i Pub/Sub pozwala na tworzenie skalowalnych i elastycznych rozwiązań, które mogą sprostać wymaganiom współczesnych aplikacji. Poprzez odpowiednią konfigurację i monitorowanie możesz zapewnić, że Twoje systemy działają efektywnie i niezawodnie, dostarczając wartościowe dane w czasie rzeczywistym.
***
Jeśli ciekawią Cię narzędzia wykorzystywane w branży IT, zajrzyj koniecznie do innych artykułów naszych specjalistów.
Zostaw komentarz