Source code for tom_pittgoogle.consumer_stream_python

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""Consumer class to pull Pub/Sub messages via a Python client, and work with data.

Pub/Sub Python Client docs: https://googleapis.dev/python/pubsub/latest/index.html

Used by `BrokerStreamPython`, but can be called independently.

Use-case: Save alerts to a database

    The demo for this implementation assumes that the real use-case is to save alerts
    to a database rather than view them through a TOM site.
    Therefore, the `Consumer` currently saves the alerts in real time,
    and then simply returns a list of alerts after all messages are processed.
    That list is then coerced into an iterator by the `Broker`.
    If the user really cares about the `Broker`'s iterator, `stream_alerts` can
    be tweaked to yield the alerts in real time.

Basic workflow:

.. code:: python

    consumer = ConsumerStreamPython(subscription_name)

    alert_dicts_list = consumer.stream_alerts(
        user_filter=user_filter,
        **kwargs,
    )
    # alerts are processed and saved in real time. the list is returned for convenience.

See especially:

.. autosummary::
   :nosignatures:

   ConsumerStreamPython.authenticate
   ConsumerStreamPython.touch_subscription
   ConsumerStreamPython.stream_alerts
   ConsumerStreamPython.callback
   ConsumerStreamPython.save_alert

"""

# from concurrent.futures.thread import ThreadPoolExecutor
from django.conf import settings
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
# from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
from google import auth
from google.cloud import logging as gc_logging
from google_auth_oauthlib.helpers import credentials_from_session
from requests_oauthlib import OAuth2Session
import queue
from queue import Empty

from .utils.templatetags.utility_tags import avro_to_dict


PITTGOOGLE_PROJECT_ID = "ardent-cycling-243415"


[docs]class ConsumerStreamPython: """Consumer class to manage Pub/Sub connections and work with messages. Initialization does the following: - Authenticate the user via OAuth 2.0. - Create a `google.cloud.pubsub_v1.SubscriberClient` object. - Create a `queue.Queue` object to communicate with the background thread running the streaming pull. - Make sure the subscription exists and we can connect. Create it, if needed. To view logs, visit: https://console.cloud.google.com/logs - Make sure you are logged in, and your project is selected in the dropdown at the top. - Click the "Log name" dropdown and select the subscription name you instantiate this consumer with. TODO: Give the user a standard logger. """ def __init__(self, subscription_name, ztf_fields=None): """Authenticate user; create client; set subscription path; check connection.""" user_project = settings.GOOGLE_CLOUD_PROJECT self.database_list = [] # list of dicts. fake database for demo. # authenticate the user self.get_credentials(user_project) # instantiate client self.client = pubsub_v1.SubscriberClient(credentials=self.credentials) # logger log_client = gc_logging.Client( project=user_project, credentials=self.credentials ) self.logger = log_client.logger(subscription_name) # subscription self.subscription_name = subscription_name self.subscription_path = f"projects/{user_project}/subscriptions/{subscription_name}" # Topic to connect the subscription to, if it needs to be created. # If the subscription already exists but is connected to a different topic, # the user will be notified and this topic_path will be updated for consistency. self.topic_path = f"projects/{PITTGOOGLE_PROJECT_ID}/topics/{subscription_name}" self.touch_subscription() self._set_ztf_fields(ztf_fields) self.queue = queue.Queue() # queue for communication between threads # for the TOM `GenericAlert`. this won't be very helpful without instructions. self.pull_url = "https://pubsub.googleapis.com/v1/{subscription_path}"
[docs] def get_credentials(self, user_project): """Create user credentials object from service account credentials or an OAuth. Try service account credentials first. Fall back to OAuth. """ try: self.credentials, project = auth.load_credentials_from_file( settings.GOOGLE_APPLICATION_CREDENTIALS ) assert project == user_project # TODO: handle this better except auth.exceptions.DefaultCredentialsError: self.authenticate_with_oauth() self.credentials = credentials_from_session(self.oauth2)
[docs] def authenticate_with_oauth(self): """Guide user through authentication; create `OAuth2Session` for credentials. The user will need to visit a URL, authenticate themselves, and authorize `PittGoogleConsumer` to make API calls on their behalf. The user must have a Google account that is authorized make API calls through the project defined by the `GOOGLE_CLOUD_PROJECT` variable in the Django `settings.py` file. Any project can be used, as long as the user has access. Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us. TODO: Integrate this with Django. For now, the user interacts via command line. """ # create an OAuth2Session client_id = settings.PITTGOOGLE_OAUTH_CLIENT_ID client_secret = settings.PITTGOOGLE_OAUTH_CLIENT_SECRET authorization_base_url = "https://accounts.google.com/o/oauth2/auth" redirect_uri = "https://ardent-cycling-243415.appspot.com/" # TODO: better page scopes = [ "https://www.googleapis.com/auth/logging.write", "https://www.googleapis.com/auth/pubsub", ] oauth2 = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes) # instruct the user to authorize authorization_url, state = oauth2.authorization_url( authorization_base_url, access_type="offline", # access_type="online", # prompt="select_account", ) print(( "Please visit this URL to authenticate yourself and authorize " "PittGoogleConsumer to make API calls on your behalf:" f"\n\n{authorization_url}\n" )) authorization_response = input( "After authorization, you should be directed to the Pitt-Google Alert " "Broker home page. Enter the full URL of that page (it should start with " "https://ardent-cycling-243415.appspot.com/):\n" ) # complete the authentication _ = oauth2.fetch_token( "https://accounts.google.com/o/oauth2/token", authorization_response=authorization_response, client_secret=client_secret, ) self.oauth2 = oauth2
[docs] def stream_alerts( self, user_filter=None, user_callback=None, **kwargs ): """Execute a streaming pull and process alerts through the `callback`. The streaming pull happens in a background thread. A `queue.Queue` is used to communicate between threads and enforce the stopping condition(s). Args: user_filter (Callable): Used by `callback` to filter alerts before saving. It should accept a single alert as a dictionary (flat dict with fields determined by `ztf_fields` attribute). It should return the alert dict if it passes the filter, else None. user_callback (Callable): Used by `callback` to process alerts. It should accept a single alert as a dictionary (flat dict with fields determined by `ztf_fields` attribute). It should return True if the processing was successful; else False. kwargs (dict): User's parameters. Should include the parameters defined in `BrokerStreamPython`'s `FilterAlertsForm`. There must be at least one stopping condition (`max_results` or `timeout`), else the streaming pull will run forever. """ # callback doesn't accept kwargs. set attribute instead. self.callback_kwargs = { "user_filter": user_filter, "user_callback": user_callback, **kwargs, } # avoid pulling down a large number of alerts that don't get processed flow_control = pubsub_v1.types.FlowControl(max_messages=kwargs['max_backlog']) # Google API has a thread scheduler that can run multiple background threads # and includes a queue, but I (Troy) haven't gotten it working yet. # self.scheduler = ThreadScheduler(ThreadPoolExecutor(max_workers)) # self.scheduler.schedule(self.callback, lighten_alerts=lighten_alerts) # start pulling and processing msgs using the callback, in a background thread self.streaming_pull_future = self.client.subscribe( self.subscription_path, self.callback, flow_control=flow_control, # scheduler=self.scheduler, # await_callbacks_on_shutdown=True, ) try: # Use the queue to count saved messages and # stop when we hit a max_messages or timeout stopping condition. num_saved = 0 while True: try: num_saved += self.queue.get(block=True, timeout=kwargs['timeout']) except Empty: break else: self.queue.task_done() if kwargs['max_results'] & num_saved >= kwargs['max_results']: break self._stop() except KeyboardInterrupt: self._stop() raise self._log_and_print(f"Saved {num_saved} messages from {self.subscription_path}") return self.database_list
def _stop(self): """Shutdown the streaming pull in the background thread gracefully.""" self.streaming_pull_future.cancel() # Trigger the shutdown. self.streaming_pull_future.result() # Block until the shutdown is complete.
[docs] def callback(self, message): """Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg. Used as the callback for the streaming pull. """ kwargs = self.callback_kwargs # unpack try: alert_dict = self._unpack(message) except Exception as e: self._log_and_print(f"Error unpacking message: {e}", severity="DEBUG") message.nack() # nack so message does not leave subscription return # run user filter if kwargs["user_filter"] is not None: try: alert_dict = kwargs["user_filter"](alert_dict, **kwargs) except Exception as e: self._log_and_print(f"Error running user_filter: {e}", severity="DEBUG") message.nack() return # run user callback if kwargs["user_callback"] is not None: try: success = kwargs["user_callback"](alert_dict) # bool except Exception as e: success = False msg = f"Error running user_callback: {e}" else: if not success: msg = "user_callback reported it was unsuccessful." finally: if not success: self._log_and_print(msg, severity="DEBUG") message.nack() return if alert_dict is not None: self.save_alert(alert_dict) # communicate with the main thread self.queue.put(1) # 1 alert successfully processed if kwargs['max_results'] is not None: # block til main thread acknowledges so we don't ack msgs that get lost self.queue.join() # single background thread => one-in-one-out else: self._log_and_print("alert_dict is None") message.ack()
def _unpack(self, message): alert_dict = avro_to_dict(message.data) alert_dict = self._lighten_alert(alert_dict) # flat dict with self.ztf_fields alert_dict.update(self._extract_metadata(message)) return alert_dict
[docs] def save_alert(self, alert): """Save the alert to a database.""" self.database_list.append(alert) # fake database for demo
def _set_ztf_fields(self, fields=None): """Fields to save in the `_unpack` method.""" if fields is not None: self.ztf_fields = fields else: self.ztf_fields = { "top-level": ["objectId", "candid", ], "candidate": ["jd", "ra", "dec", "magpsf", "classtar", ], "metadata": ["message_id", "publish_time", "kafka.timestamp"] } def _extract_metadata(self, message): # TOM wants to serialize this and has trouble with the dates. # Just make everything strings for now. # attributes includes Kafka attributes from originating stream: # kafka.offset, kafka.partition, kafka.timestamp, kafka.topic attributes = {k: str(v) for k, v in message.attributes.items()} metadata = { "message_id": message.message_id, "publish_time": str(message.publish_time), **attributes, } return {k: v for k, v in metadata.items() if k in self.ztf_fields["metadata"]} def _lighten_alert(self, alert_dict): alert_lite = {k: alert_dict[k] for k in self.ztf_fields["top-level"]} alert_lite.update( {k: alert_dict["candidate"][k] for k in self.ztf_fields["candidate"]} ) return alert_lite
[docs] def touch_subscription(self): """Make sure the subscription exists and we can connect. If the subscription doesn't exist, try to create one (in the user's project) that is attached to a topic of the same name in the Pitt-Google project. Note that messages published before the subscription is created are not available. """ try: # check if subscription exists sub = self.client.get_subscription(subscription=self.subscription_path) except NotFound: self._create_subscription() else: self.topic_path = sub.topic print(f"Subscription exists: {self.subscription_path}") print(f"Connected to topic: {self.topic_path}")
def _create_subscription(self): """Try to create the subscription.""" try: self.client.create_subscription( name=self.subscription_path, topic=self.topic_path ) except NotFound: # suitable topic does not exist in the Pitt-Google project raise ValueError( ( f"A subscription named {self.subscription_name} does not exist" "in the Google Cloud Platform project " f"{settings.GOOGLE_CLOUD_PROJECT}, " "and one cannot be automatically create because Pitt-Google " "does not publish a public topic with the same name." ) ) else: self._log_and_print(f"Created subscription: {self.subscription_path}")
[docs] def delete_subscription(self): """Delete the subscription. This is provided for the user's convenience, but it is not necessary and is not automatically called. - Storage of unacknowledged Pub/Sub messages does not result in fees. - Unused subscriptions automatically expire; default is 31 days. """ try: self.client.delete_subscription(subscription=self.subscription_path) except NotFound: pass else: self._log_and_print(f'Deleted subscription: {self.subscription_path}')
def _log_and_print(self, msg, severity="INFO"): self.logger.log_text(msg, severity=severity) print(msg)