StreamPython
Note
The Pitt-Google broker uses Pub/Sub to publish live streams, rather than Apache Kafka. See Pub/Sub Message Service for a basic overview.
BrokerStreamPython
TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the Python client.
Relies on ConsumerStreamPython to manage the connections and work with data.
See especially:
Entry point to pull and filter alerts. |
|
Apply the filter indicated by the form's parameters. |
- class tom_pittgoogle.broker_stream_python.BrokerStreamPython[source]
Pitt-Google broker interface to pull alerts from Pub/Sub via the Python client.
Base class:
tom_alerts.alerts.GenericBroker
- fetch_alerts(parameters)[source]
Entry point to pull and filter alerts.
Pull alerts using a Python client, unpack, apply user filter.
This demo assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator here. If the user really cares about the iterator, ConsumerStreamPython.stream_alerts can be tweaked to yield the alerts in real time.
- form
alias of
tom_pittgoogle.broker_stream_python.FilterAlertsForm
- static user_filter(alert_dict, parameters)[source]
Apply the filter indicated by the form’s parameters.
Used as the callback to BrokerStreamPython.unpack_and_ack_messages.
- Parameters
alert_dict – Single alert, ZTF packet data as a dictionary. The schema depends on the value of lighten_alerts passed to BrokerStreamPython.unpack_and_ack_messages. If lighten_alerts=False it is the original ZTF alert schema (https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html). If lighten_alerts=True the dict is flattened and extra fields are dropped.
parameters – parameters submitted by the user through the form.
- Returns
alert_dict if it passes the filter, else None
- class tom_pittgoogle.broker_stream_python.FilterAlertsForm(*args, **kwargs)[source]
Basic form for filtering alerts.
Fields:
subscription_name (
CharField
)classtar_threshold (
FloatField
)classtar_gt_lt (
ChoiceField
)max_results (
IntegerField
)timeout (
IntegerField
)max_backlog (
IntegerField
)- property media
Return all media required to render the widgets on this form.
ConsumerStreamPython
Consumer class to pull Pub/Sub messages via a Python client, and work with data.
Pub/Sub Python Client docs: https://googleapis.dev/python/pubsub/latest/index.html
Used by BrokerStreamPython, but can be called independently.
Use-case: Save alerts to a database
The demo for this implementation assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator by the Broker. If the user really cares about the Broker’s iterator, stream_alerts can be tweaked to yield the alerts in real time.
Basic workflow:
consumer = ConsumerStreamPython(subscription_name)
alert_dicts_list = consumer.stream_alerts(
user_filter=user_filter,
**kwargs,
)
# alerts are processed and saved in real time. the list is returned for convenience.
See especially:
Make sure the subscription exists and we can connect. |
|
Execute a streaming pull and process alerts through the callback. |
|
Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg. |
|
Save the alert to a database. |
- class tom_pittgoogle.consumer_stream_python.ConsumerStreamPython(subscription_name, ztf_fields=None)[source]
Consumer class to manage Pub/Sub connections and work with messages.
Initialization does the following:
Authenticate the user via OAuth 2.0.
Create a google.cloud.pubsub_v1.SubscriberClient object.
Create a queue.Queue object to communicate with the background thread running the streaming pull.
Make sure the subscription exists and we can connect. Create it, if needed.
To view logs, visit: https://console.cloud.google.com/logs
Make sure you are logged in, and your project is selected in the dropdown at the top.
Click the “Log name” dropdown and select the subscription name you instantiate this consumer with.
TODO: Give the user a standard logger.
- authenticate_with_oauth()[source]
Guide user through authentication; create OAuth2Session for credentials.
The user will need to visit a URL, authenticate themselves, and authorize PittGoogleConsumer to make API calls on their behalf.
The user must have a Google account that is authorized make API calls through the project defined by the GOOGLE_CLOUD_PROJECT variable in the Django settings.py file. Any project can be used, as long as the user has access.
Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us.
TODO: Integrate this with Django. For now, the user interacts via command line.
- callback(message)[source]
Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.
Used as the callback for the streaming pull.
- delete_subscription()[source]
Delete the subscription.
This is provided for the user’s convenience, but it is not necessary and is not automatically called.
Storage of unacknowledged Pub/Sub messages does not result in fees.
Unused subscriptions automatically expire; default is 31 days.
- get_credentials(user_project)[source]
Create user credentials object from service account credentials or an OAuth.
Try service account credentials first. Fall back to OAuth.
- stream_alerts(user_filter=None, user_callback=None, **kwargs)[source]
Execute a streaming pull and process alerts through the callback.
The streaming pull happens in a background thread. A queue.Queue is used to communicate between threads and enforce the stopping condition(s).
- Parameters
user_filter (Callable) – Used by callback to filter alerts before saving. It should accept a single alert as a dictionary (flat dict with fields determined by ztf_fields attribute). It should return the alert dict if it passes the filter, else None.
user_callback (Callable) – Used by callback to process alerts. It should accept a single alert as a dictionary (flat dict with fields determined by ztf_fields attribute). It should return True if the processing was successful; else False.
kwargs (dict) – User’s parameters. Should include the parameters defined in BrokerStreamPython’s FilterAlertsForm. There must be at least one stopping condition (max_results or timeout), else the streaming pull will run forever.
- touch_subscription()[source]
Make sure the subscription exists and we can connect.
If the subscription doesn’t exist, try to create one (in the user’s project) that is attached to a topic of the same name in the Pitt-Google project.
Note that messages published before the subscription is created are not available.