Source code for tom_pittgoogle.consumer_stream_python

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""Consumer class to manage Pub/Sub connections via a Python client, and work with data.

Pub/Sub Python Client docs: https://googleapis.dev/python/pubsub/latest/index.html

Used by `BrokerStreamPython`, but can be called independently.

Use-case: Save alerts to a database

    This demo assumes that the real use-case is to save alerts to a database
    rather than view them through a TOM site.
    Therefore, the `Consumer` currently saves the alerts in real time,
    and then simply returns a list of alerts after all messages are processed.
    That list is then coerced into an iterator by the `Broker`.
    If the user really cares about the `Broker`'s iterator, `stream_alerts` can
    be tweaked to yield the alerts in real time.

Basic workflow:

.. code:: python

    consumer = ConsumerStreamPython(subscription_name)

    alert_dicts_list = consumer.stream_alerts(
        lighten_alerts=True,
        user_filter=user_filter,
        parameters=parameters,
    )
    # alerts are processed and saved in real time. the list is returned for convenience.

See especially:

.. autosummary::
   :nosignatures:

   ConsumerStreamPython.authenticate
   ConsumerStreamPython.get_create_subscription
   ConsumerStreamPython.stream_alerts
   ConsumerStreamPython.callback
   ConsumerStreamPython.save_alert

"""

# from concurrent.futures.thread import ThreadPoolExecutor
from django.conf import settings
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
# from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
from google.cloud import logging as gc_logging
from google_auth_oauthlib.helpers import credentials_from_session
from requests_oauthlib import OAuth2Session
import queue
from queue import Empty

from .utils.templatetags.utility_tags import avro_to_dict


PITTGOOGLE_PROJECT_ID = "ardent-cycling-243415"


[docs]class ConsumerStreamPython: """Consumer class to manage Pub/Sub connections and work with messages. Initialization does the following: - Authenticate the user via OAuth 2.0. - Create a `google.cloud.pubsub_v1.SubscriberClient` object. - Create a `queue.Queue` object to communicate with the background thread running the streaming pull. - Make sure the subscription exists and we can connect. Create it, if needed. To view logs, visit: https://console.cloud.google.com/logs - Make sure you are logged in, and your project is selected in the dropdown at the top. - Click the "Log name" dropdown and select the subscription name you instantiate this consumer with. TODO: Give the user a standard logger. """ def __init__(self, subscription_name): """Authenticate user; create client; set subscription path; check connection.""" user_project = settings.GOOGLE_CLOUD_PROJECT self.database_list = [] # list of dicts. fake database for demo. self.authenticate() self.credentials = credentials_from_session(self.oauth2) self.client = pubsub_v1.SubscriberClient(credentials=self.credentials) # logger log_client = gc_logging.Client( project=user_project, credentials=self.credentials ) self.logger = log_client.logger(subscription_name) # subscription self.subscription_name = subscription_name self.subscription_path = f"projects/{user_project}/subscriptions/{subscription_name}" # Topic to connect the subscription to, if it needs to be created. # If the subscription already exists but is connected to a different topic, # the user will be notified and this topic_path will be updated for consistency. self.topic_path = f"projects/{PITTGOOGLE_PROJECT_ID}/topics/{subscription_name}" self.get_create_subscription() self.queue = queue.Queue() # queue for communication between threads # for the TOM `GenericAlert`. this won't be very helpful without instructions. self.pull_url = "https://pubsub.googleapis.com/v1/{subscription_path}"
[docs] def authenticate(self): """Guide user through authentication; create `OAuth2Session` for credentials. The user will need to visit a URL, authenticate themselves, and authorize `PittGoogleConsumer` to make API calls on their behalf. The user must have a Google account that is authorized make API calls through the project defined by the `GOOGLE_CLOUD_PROJECT` variable in the Django `settings.py` file. Any project can be used, as long as the user has access. Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us. TODO: Integrate this with Django. For now, the user interacts via command line. """ # create an OAuth2Session client_id = settings.PITTGOOGLE_OAUTH_CLIENT_ID client_secret = settings.PITTGOOGLE_OAUTH_CLIENT_SECRET authorization_base_url = "https://accounts.google.com/o/oauth2/auth" redirect_uri = "https://ardent-cycling-243415.appspot.com/" # TODO: better page scopes = [ "https://www.googleapis.com/auth/logging.write", "https://www.googleapis.com/auth/pubsub", ] oauth2 = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes) # instruct the user to authorize authorization_url, state = oauth2.authorization_url( authorization_base_url, access_type="offline", # access_type="online", # prompt="select_account", ) print(( "Please visit this URL to authenticate yourself and authorize " "PittGoogleConsumer to make API calls on your behalf:" f"\n\n{authorization_url}\n" )) authorization_response = input( "After authorization, you should be directed to the Pitt-Google Alert " "Broker home page. Enter the full URL of that page (it should start with " "https://ardent-cycling-243415.appspot.com/):\n" ) # complete the authentication _ = oauth2.fetch_token( "https://accounts.google.com/o/oauth2/token", authorization_response=authorization_response, client_secret=client_secret, ) self.oauth2 = oauth2
[docs] def stream_alerts(self, lighten_alerts=False, user_filter=None, parameters=None): """Execute a streaming pull and process alerts through the `callback`. The streaming pull happens in a background thread. A `queue.Queue` is used to communicate between threads and enforce the stopping condition(s). Args: lighten_alerts (bool): If True, drop extra fields and flatten the alert dict user_filter (Callable): Used by `callback` to filter alerts before saving. It should accept a single alert (ZTF packet data) as a dictionary. The schema depends on the value of `lighten_alerts`. If `lighten_alerts=False` it is the original ZTF alert schema (https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html). If `lighten_alerts=True` the dict is flattened and extra fields are dropped. It should return the alert dict if it passes the filter, else None. parameters (dict): User's parameters. Must include the parameters defined in the `Broker`'s `FilterAlertsForm`. """ # callback doesn't currently accept kwargs. set attributes instead. self.user_filter = user_filter self.parameters = {**parameters, 'lighten_alerts': lighten_alerts} # avoid pulling down a large number of alerts that don't get processed flow_control = pubsub_v1.types.FlowControl(max_messages=parameters['max_backlog']) # Google API has a thread scheduler that can run multiple background threads # and includes a queue, but I (Troy) haven't gotten it working yet. # self.scheduler = ThreadScheduler(ThreadPoolExecutor(max_workers)) # self.scheduler.schedule(self.callback, lighten_alerts=lighten_alerts) # start pulling and processing msgs using the callback, in a background thread self.streaming_pull_future = self.client.subscribe( self.subscription_path, self.callback, flow_control=flow_control, # scheduler=self.scheduler, # await_callbacks_on_shutdown=True, ) # Use the queue to count saved messages and # stop when we hit a max_messages or timeout stopping condition. num_saved = 0 while True: try: num_saved += self.queue.get(block=True, timeout=parameters['timeout']) except Empty: break else: self.queue.task_done() if parameters['max_results'] & num_saved >= parameters['max_results']: break self._stop() self._log_and_print(f"Saved {num_saved} messages from {self.subscription_path}") return self.database_list
def _stop(self): """Shutdown the streaming pull in the background thread gracefully. Implemented as a separate function so the developer can quickly shut it down if things get out of control during dev. :) """ self.streaming_pull_future.cancel() # Trigger the shutdown. self.streaming_pull_future.result() # Block until the shutdown is complete.
[docs] def callback(self, message): """Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg. Used as the callback for the streaming pull. """ params = self.parameters alert_dict = avro_to_dict(message.data) if params['lighten_alerts']: alert_dict = self._lighten_alert(alert_dict) if self.user_filter is not None: alert_dict = self.user_filter(alert_dict, params) # save alert if alert_dict is not None: if params['save_metadata'] == "yes": # nest inside the alert so we don't break the broker alert_dict['metadata'] = self._extract_metadata(message) self.save_alert(alert_dict) num_saved = 1 else: num_saved = 0 # communicate with the main thread self.queue.put(num_saved) if params['max_results'] is not None: # block until main thread acknowledges so we don't ack msgs that get lost self.queue.join() # single background thread => one-in-one-out message.ack()
[docs] def save_alert(self, alert): """Save the alert to a database.""" self.database_list.append(alert) # fake database for demo
def _extract_metadata(self, message): # TOM wants to serialize this and has trouble with the dates. # Just make everything strings for now. return { "message_id": message.message_id, "publish_time": str(message.publish_time), # attributes includes the originating 'kafka.timestamp' from ZTF "attributes": {k: str(v) for k, v in message.attributes.items()}, } def _lighten_alert(self, alert_dict): keep_fields = { "top-level": ["objectId", "candid", ], "candidate": ["jd", "ra", "dec", "magpsf", "classtar", ], } alert_lite = {k: alert_dict[k] for k in keep_fields["top-level"]} alert_lite.update( {k: alert_dict["candidate"][k] for k in keep_fields["candidate"]} ) return alert_lite
[docs] def get_create_subscription(self): """Make sure the subscription exists and we can connect. If the subscription doesn't exist, try to create one (in the user's project) that is attached to a topic of the same name in the Pitt-Google project. Note that messages published before the subscription is created are not available. """ try: # check if subscription exists sub = self.client.get_subscription(subscription=self.subscription_path) except NotFound: self._create_subscription() else: self.topic_path = sub.topic print(f"Subscription exists: {self.subscription_path}") print(f"Connected to topic: {self.topic_path}")
def _create_subscription(self): """Try to create the subscription.""" try: self.client.create_subscription( name=self.subscription_path, topic=self.topic_path ) except NotFound: # suitable topic does not exist in the Pitt-Google project raise ValueError( ( f"A subscription named {self.subscription_name} does not exist" "in the Google Cloud Platform project " f"{settings.GOOGLE_CLOUD_PROJECT}, " "and one cannot be automatically create because Pitt-Google " "does not publish a public topic with the same name." ) ) else: self._log_and_print(f"Created subscription: {self.subscription_path}")
[docs] def delete_subscription(self): """Delete the subscription. This is provided for the user's convenience, but it is not necessary and is not automatically called. - Storage of unacknowledged Pub/Sub messages does not result in fees. - Unused subscriptions automatically expire; default is 31 days. """ try: self.client.delete_subscription(subscription=self.subscription_path) except NotFound: pass else: self._log_and_print(f'Deleted subscription: {self.subscription_path}')
def _log_and_print(self, msg, severity="INFO"): # request = { # 'logName': self.log_name, # 'resource': { # 'type': 'pubsub_subscription', # 'labels': { # 'project_id': settings.GOOGLE_CLOUD_PROJECT, # 'subscription_id': self.subscription_name # }, # }, # 'entries': [{'textPayload': msg, 'severity': severity}], # } # response = self.oauth.post(self.logging_url, json=json.dumps(request)) # print(response.content) # response.raise_for_status() print(msg)