StreamPython

Note

The Pitt-Google broker uses Pub/Sub to publish live streams, rather than Apache Kafka. See Pub/Sub Message Service for a basic overview.

BrokerStreamPython

TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the Python client.

Relies on ConsumerStreamPython to manage the connections and work with data.

See especially:

BrokerStreamPython.fetch_alerts

Entry point to pull and filter alerts.

BrokerStreamPython.user_filter

Apply the filter indicated by the form's parameters.

class tom_pittgoogle.broker_stream_python.BrokerStreamPython[source]

Pitt-Google broker interface to pull alerts from Pub/Sub via the Python client.

Base class: tom_alerts.alerts.GenericBroker

fetch_alerts(parameters)[source]

Entry point to pull and filter alerts.

Pull alerts using a Python client, unpack, apply user filter.

This demo assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator here. If the user really cares about the iterator, ConsumerStreamPython.stream_alerts can be tweaked to yield the alerts in real time.

form

alias of tom_pittgoogle.broker_stream_python.FilterAlertsForm

to_generic_alert(alert_dict)[source]

Map the Pitt-Google alert to a TOM GenericAlert.

to_target(alert_dict)[source]

Map the Pitt-Google alert to a TOM Target.

static user_filter(alert_dict, parameters)[source]

Apply the filter indicated by the form’s parameters.

Used as the callback to BrokerStreamPython.unpack_and_ack_messages.

Parameters
  • alert_dict – Single alert, ZTF packet data as a dictionary. The schema depends on the value of lighten_alerts passed to BrokerStreamPython.unpack_and_ack_messages. If lighten_alerts=False it is the original ZTF alert schema (https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html). If lighten_alerts=True the dict is flattened and extra fields are dropped.

  • parameters – parameters submitted by the user through the form.

Returns

alert_dict if it passes the filter, else None

class tom_pittgoogle.broker_stream_python.FilterAlertsForm(*args, **kwargs)[source]

Basic form for filtering alerts.

Fields:

subscription_name (CharField)

classtar_threshold (FloatField)

classtar_gt_lt (ChoiceField)

max_results (IntegerField)

timeout (IntegerField)

max_backlog (IntegerField)

property media

Return all media required to render the widgets on this form.

ConsumerStreamPython

Consumer class to pull Pub/Sub messages via a Python client, and work with data.

Pub/Sub Python Client docs: https://googleapis.dev/python/pubsub/latest/index.html

Used by BrokerStreamPython, but can be called independently.

Use-case: Save alerts to a database

The demo for this implementation assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator by the Broker. If the user really cares about the Broker’s iterator, stream_alerts can be tweaked to yield the alerts in real time.

Basic workflow:

consumer = ConsumerStreamPython(subscription_name)

alert_dicts_list = consumer.stream_alerts(
    user_filter=user_filter,
    **kwargs,
)
# alerts are processed and saved in real time. the list is returned for convenience.

See especially:

ConsumerStreamPython.touch_subscription

Make sure the subscription exists and we can connect.

ConsumerStreamPython.stream_alerts

Execute a streaming pull and process alerts through the callback.

ConsumerStreamPython.callback

Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.

ConsumerStreamPython.save_alert

Save the alert to a database.

class tom_pittgoogle.consumer_stream_python.ConsumerStreamPython(subscription_name, ztf_fields=None)[source]

Consumer class to manage Pub/Sub connections and work with messages.

Initialization does the following:

  • Authenticate the user via OAuth 2.0.

  • Create a google.cloud.pubsub_v1.SubscriberClient object.

  • Create a queue.Queue object to communicate with the background thread running the streaming pull.

  • Make sure the subscription exists and we can connect. Create it, if needed.

To view logs, visit: https://console.cloud.google.com/logs

  • Make sure you are logged in, and your project is selected in the dropdown at the top.

  • Click the “Log name” dropdown and select the subscription name you instantiate this consumer with.

TODO: Give the user a standard logger.

authenticate_with_oauth()[source]

Guide user through authentication; create OAuth2Session for credentials.

The user will need to visit a URL, authenticate themselves, and authorize PittGoogleConsumer to make API calls on their behalf.

The user must have a Google account that is authorized make API calls through the project defined by the GOOGLE_CLOUD_PROJECT variable in the Django settings.py file. Any project can be used, as long as the user has access.

Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us.

TODO: Integrate this with Django. For now, the user interacts via command line.

callback(message)[source]

Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.

Used as the callback for the streaming pull.

delete_subscription()[source]

Delete the subscription.

This is provided for the user’s convenience, but it is not necessary and is not automatically called.

  • Storage of unacknowledged Pub/Sub messages does not result in fees.

  • Unused subscriptions automatically expire; default is 31 days.

get_credentials(user_project)[source]

Create user credentials object from service account credentials or an OAuth.

Try service account credentials first. Fall back to OAuth.

save_alert(alert)[source]

Save the alert to a database.

stream_alerts(user_filter=None, user_callback=None, **kwargs)[source]

Execute a streaming pull and process alerts through the callback.

The streaming pull happens in a background thread. A queue.Queue is used to communicate between threads and enforce the stopping condition(s).

Parameters
  • user_filter (Callable) – Used by callback to filter alerts before saving. It should accept a single alert as a dictionary (flat dict with fields determined by ztf_fields attribute). It should return the alert dict if it passes the filter, else None.

  • user_callback (Callable) – Used by callback to process alerts. It should accept a single alert as a dictionary (flat dict with fields determined by ztf_fields attribute). It should return True if the processing was successful; else False.

  • kwargs (dict) – User’s parameters. Should include the parameters defined in BrokerStreamPython’s FilterAlertsForm. There must be at least one stopping condition (max_results or timeout), else the streaming pull will run forever.

touch_subscription()[source]

Make sure the subscription exists and we can connect.

If the subscription doesn’t exist, try to create one (in the user’s project) that is attached to a topic of the same name in the Pitt-Google project.

Note that messages published before the subscription is created are not available.