StreamPython
Note
The Pitt-Google broker uses Pub/Sub to publish live streams, rather than Apache Kafka. See Pub/Sub Message Service for a basic overview.
BrokerStreamPython
TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the Python client.
Relies on ConsumerStreamPython to manage the connections and work with data.
See especially:
Entry point to pull and filter alerts. |
|
Apply the filter indicated by the form's parameters. |
- class tom_pittgoogle.broker_stream_python.BrokerStreamPython[source]
Pitt-Google broker interface to pull alerts from Pub/Sub via the Python client.
Base class:
tom_alerts.alerts.GenericBroker- fetch_alerts(parameters)[source]
Entry point to pull and filter alerts.
Pull alerts using a Python client, unpack, apply user filter.
This demo assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator here. If the user really cares about the iterator, ConsumerStreamPython.stream_alerts can be tweaked to yield the alerts in real time.
- form
alias of
tom_pittgoogle.broker_stream_python.FilterAlertsForm
- static user_filter(alert_dict, **parameters)[source]
Apply the filter indicated by the form’s parameters.
Used as the callback to BrokerStreamPython.unpack_and_ack_messages.
- Parameters
alert_dict – Single alert, ZTF packet data as a dictionary. The schema depends on the value of lighten_alerts passed to BrokerStreamPython.unpack_and_ack_messages. If lighten_alerts=False it is the original ZTF alert schema (https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html). If lighten_alerts=True the dict is flattened and extra fields are dropped.
parameters – parameters submitted by the user through the form.
- Returns
alert_dict if it passes the filter, else None
- class tom_pittgoogle.broker_stream_python.FilterAlertsForm(*args, **kwargs)[source]
Basic form for filtering alerts.
Fields:
subscription_name (
CharField)classtar_threshold (
FloatField)classtar_gt_lt (
ChoiceField)max_results (
IntegerField)timeout (
IntegerField)max_backlog (
IntegerField)- property media
Return all media required to render the widgets on this form.
ConsumerStreamPython
Consumer class to pull Pub/Sub messages via a Python client, and work with data.
Pub/Sub Python Client docs: https://googleapis.dev/python/pubsub/latest/index.html
Basic workflow:
consumer = ConsumerStreamPython(subscription_name)
def user_callback(alert_dict):
# Process the alert as desired. If processing is successful,
# tell the consumer to send an acknowldgement to Pub/Sub.
success = True
if success:
ack = True
else:
ack = False
return result = {"ack": ack}
alert_dicts_list = consumer.stream_alerts(user_callback=user_callback)
See especially:
Execute a streaming pull, processing messages through self.callback. |
|
Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg. |
- class tom_pittgoogle.consumer_stream_python.ConsumerStreamPython(subscription_name, save_fields=None)[source]
Consumer class to manage Pub/Sub connections and work with messages.
Initialization does the following:
Authenticate the user via a key file or OAuth 2.0.
Add a Google Cloud logging handler to the LOGGER. To view, visit: https://console.cloud.google.com/logs.
Create a client object (google.cloud.pubsub_v1.SubscriberClient).
Create a queue.Queue object to communicate with the background thread running the streaming pull.
Make sure the subscription exists and we can connect. Create it, if needed.
- authenticate_with_oauth()[source]
Guide user through authentication; create OAuth2Session for credentials.
The user will need to visit a URL, authenticate themselves, and authorize PittGoogleConsumer to make API calls on their behalf.
The user must have a Google account that is authorized make API calls through the project defined by the GOOGLE_CLOUD_PROJECT variable in the Django settings.py file. Any project can be used, as long as the user has access.
Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us.
TODO: Integrate this with Django. For now, the user interacts via command line.
- callback(message)[source]
Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.
Used as the callback for the streaming pull.
- delete_subscription()[source]
Delete the subscription.
This is provided for the user’s convenience, but it is not necessary and is not automatically called.
Storage of unacknowledged Pub/Sub messages does not result in fees.
Unused subscriptions automatically expire; default is 31 days.
- get_credentials(user_project)[source]
Create user credentials object from service account credentials or an OAuth.
Try service account credentials first. Fall back to OAuth.
- stream_alerts(send_data='alert_dict', include_metadata=False, user_callback=None, return_list=False, flow_configs=None, **user_kwargs)[source]
Execute a streaming pull, processing messages through self.callback.
The streaming pull happens in a background thread. A queue.Queue is used to communicate between threads and enforce the stopping condition(s).
- Parameters
send_data (str) – String specifying the data and format requested. Options are: ‘alert_dict’ (default, alert packet data cast to a dictionary), ‘alert_bytes’ (alert packet data as bytes, as received from Pub/Sub), ‘full_msg’ (complete google.pubsub_v1.types.PubsubMessage object)
include_metadata (bool) – If False (default), message will be returned acccording to send_data. If True, message will be delivered as a dictionary with two keys: ‘metadata_dict’ and one of ‘alert_dict’ or ‘alert_bytes’ according to send_data. This has no effect if format=’full_msg’.
user_callback (Callable) – Used by self.callback to perform user- requested processing in the background thread. It must accept the message as the first argument (type and contents determined by send_data and include_metadata), and can accept arbitrary keyword arguments which should be passed via user_kwargs. It should return a dictionary with two keys, ‘ack’ and ‘result’, with values as follows. ‘ack’: boolean indicating whether the message should be acknowledged to Pub/Sub. Use False to indicate that the message cannot be processed at this time, and Pub/Sub should redeliver it in the future. ‘result’: (arbitrary type) the result of the processing. If return_list is True, this is the value that will be returned in the list. If this is explicitly set to None, the message will not be counted towards max_results, and it will not be returned in the list (if return_list is True). If the ‘result’ key is absent, the message will be counted towards max_results, and the original message data will be returned in the list (if return_list is True).
return_list (bool) – If True, messages will be processed through self.callback, and results will be collected in memory and returned as a list after the streaming pull is complete. If False (default), the user_callback is responsible for saving the processing results.
flow_configs (dict) – Dictionary defining stopping conditions and flow control settings for the streaming pull. May contain keys: max_results (int, default=10, use None for no limit; maximum number of messages to process/collect before stopping) timeout (int; default=30, use None for no limit; maximum number of seconds to wait for a new message before stopping) max_backlog (int, default=min(1000, max_results; maximum number of received but unprocessed messages before pausing the streaming pull).
user_kwargs (dict) – Dict defining keyword arguments to be sent to user_callback.
- touch_subscription()[source]
Make sure the subscription exists and we can connect.
If the subscription doesn’t exist, try to create one (in the user’s project) that is attached to a topic of the same name in the Pitt-Google project.
Note that messages published before the subscription is created are not available.