StreamRest
Note
The Pitt-Google broker uses Pub/Sub to publish live streams, rather than Apache Kafka. See Pub/Sub Message Service for a basic overview.
BrokerStreamRest
TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the REST API.
Relies on ConsumerStreamRest to manage the connections and work with data.
See especially:
Pull alerts using a POST request with OAuth2, unpack, apply user filter. |
|
Apply the filter indicated by the form's parameters. |
- class tom_pittgoogle.broker_stream_rest.BrokerStreamRest[source]
Pitt-Google broker class to pull alerts from a stream via the REST API.
Base class:
tom_alerts.alerts.GenericBroker
- form
- request_alerts(parameters)[source]
Pull alerts using a POST request with OAuth2, unpack, apply user filter.
- Returns
alerts (List[dict])
- static user_filter(alert_dict, parameters)[source]
Apply the filter indicated by the form’s parameters.
Used as the callback to BrokerStreamRest.unpack_and_ack_messages.
- Parameters
alert_dict – Single alert, ZTF packet data as a dictionary. The schema depends on the value of lighten_alerts passed to BrokerStreamRest.unpack_and_ack_messages. If lighten_alerts=False it is the original ZTF alert schema (https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html). If lighten_alerts=True the dict is flattened and extra fields are dropped.
parameters – parameters submitted by the user through the form.
- Returns
alert_dict if it passes the filter, else None
- class tom_pittgoogle.broker_stream_rest.FilterAlertsForm(*args, **kwargs)[source]
Basic form for filtering alerts.
Fields:
subscription_name (
CharField
)classtar_threshold (
FloatField
)classtar_gt_lt (
ChoiceField
)max_results (
IntegerField
)- property media
Return all media required to render the widgets on this form.
ConsumerStreamRest
Consumer class to manage Pub/Sub connections via REST, and work with message data.
Pub/Sub REST API docs: https://cloud.google.com/pubsub/docs/reference/rest
Used by BrokerStreamRest, but can be called independently.
Basic workflow:
consumer = ConsumerStreamRest(subscription_name)
response = consumer.oauth2.post(
f"{consumer.subscription_url}:pull", data={"maxMessages": max_messages},
)
alerts = consumer.unpack_and_ack_messages(
response, lighten_alerts=True, callback=user_filter,
) # List[dict]
See especially:
Guide user through authentication; create OAuth2Session for HTTP requests. |
|
Make sure the subscription exists and we can connect. |
|
Unpack and acknowledge messages in response; run callback if present. |
- class tom_pittgoogle.consumer_stream_rest.ConsumerStreamRest(subscription_name)[source]
Consumer class to manage Pub/Sub connections and work with messages.
Initialization does the following:
Authenticate the user. Create an OAuth2Session object for the user/broker to make HTTP requests with.
Make sure the subscription exists and we can connect. Create it, if needed.
- authenticate()[source]
Guide user through authentication; create OAuth2Session for HTTP requests.
The user will need to visit a URL, authenticate themselves, and authorize PittGoogleConsumer to make API calls on their behalf.
The user must have a Google account that is authorized make API calls through the project defined by the GOOGLE_CLOUD_PROJECT variable in the Django settings.py file. Any project can be used, as long as the user has access.
Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us.
TODO: Integrate this with Django. For now, the user interacts via command line.
- delete_subscription()[source]
Delete the subscription.
This is provided for the user’s convenience, but it is not necessary and is not automatically called.
Storage of unacknowledged Pub/Sub messages does not result in fees.
Unused subscriptions automatically expire; default is 31 days.
- touch_subscription()[source]
Make sure the subscription exists and we can connect.
If the subscription doesn’t exist, try to create one (in the user’s project) that is attached to a topic of the same name in the Pitt-Google project.
Note that messages published before the subscription is created are not available.
- unpack_and_ack_messages(response, lighten_alerts=False, callback=None, **kwargs)[source]
Unpack and acknowledge messages in response; run callback if present.
If lighten_alerts is True, drop extra fields and flatten the alert dict.
callback is assumed to be a filter. It should accept an alert dict and return the dict if the alert passes the filter, else return None.