StreamRest

Note

The Pitt-Google broker uses Pub/Sub to publish live streams, rather than Apache Kafka. See Pub/Sub Message Service for a basic overview.

BrokerStreamRest

TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the REST API.

Relies on ConsumerStreamRest to manage the connections and work with data.

See especially:

BrokerStreamRest.request_alerts

Pull alerts using a POST request with OAuth2, unpack, apply user filter.

BrokerStreamRest.user_filter

Apply the filter indicated by the form's parameters.

class tom_pittgoogle.broker_stream_rest.BrokerStreamRest[source]

Pitt-Google broker class to pull alerts from a stream via the REST API.

Base class: tom_alerts.alerts.GenericBroker

fetch_alerts(parameters)[source]

Entry point to pull and filter alerts.

form

alias of tom_pittgoogle.broker_stream_rest.FilterAlertsForm

request_alerts(parameters)[source]

Pull alerts using a POST request with OAuth2, unpack, apply user filter.

Returns

alerts (List[dict])

to_generic_alert(alert)[source]

Map the Pitt-Google alert to a TOM GenericAlert.

static user_filter(alert_dict, parameters)[source]

Apply the filter indicated by the form’s parameters.

Used as the callback to BrokerStreamRest.unpack_and_ack_messages.

Parameters
  • alert_dict – Single alert, ZTF packet data as a dictionary. The schema depends on the value of lighten_alerts passed to BrokerStreamRest.unpack_and_ack_messages. If lighten_alerts=False it is the original ZTF alert schema (https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html). If lighten_alerts=True the dict is flattened and extra fields are dropped.

  • parameters – parameters submitted by the user through the form.

Returns

alert_dict if it passes the filter, else None

class tom_pittgoogle.broker_stream_rest.FilterAlertsForm(*args, **kwargs)[source]

Basic form for filtering alerts.

Fields:

subscription_name (CharField)

classtar_threshold (FloatField)

classtar_gt_lt (ChoiceField)

max_results (IntegerField)

property media

Return all media required to render the widgets on this form.

ConsumerStreamRest

Consumer class to manage Pub/Sub connections via REST, and work with message data.

Pub/Sub REST API docs: https://cloud.google.com/pubsub/docs/reference/rest

Used by BrokerStreamRest, but can be called independently.

Basic workflow:

consumer = ConsumerStreamRest(subscription_name)

response = consumer.oauth2.post(
    f"{consumer.subscription_url}:pull", data={"maxMessages": max_messages},
)

alerts = consumer.unpack_and_ack_messages(
    response, lighten_alerts=True, callback=user_filter,
)  # List[dict]

See especially:

ConsumerStreamRest.authenticate

Guide user through authentication; create OAuth2Session for HTTP requests.

ConsumerStreamRest.touch_subscription

Make sure the subscription exists and we can connect.

ConsumerStreamRest.unpack_and_ack_messages

Unpack and acknowledge messages in response; run callback if present.

class tom_pittgoogle.consumer_stream_rest.ConsumerStreamRest(subscription_name)[source]

Consumer class to manage Pub/Sub connections and work with messages.

Initialization does the following:

  • Authenticate the user. Create an OAuth2Session object for the user/broker to make HTTP requests with.

  • Make sure the subscription exists and we can connect. Create it, if needed.

authenticate()[source]

Guide user through authentication; create OAuth2Session for HTTP requests.

The user will need to visit a URL, authenticate themselves, and authorize PittGoogleConsumer to make API calls on their behalf.

The user must have a Google account that is authorized make API calls through the project defined by the GOOGLE_CLOUD_PROJECT variable in the Django settings.py file. Any project can be used, as long as the user has access.

Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us.

TODO: Integrate this with Django. For now, the user interacts via command line.

delete_subscription()[source]

Delete the subscription.

This is provided for the user’s convenience, but it is not necessary and is not automatically called.

  • Storage of unacknowledged Pub/Sub messages does not result in fees.

  • Unused subscriptions automatically expire; default is 31 days.

touch_subscription()[source]

Make sure the subscription exists and we can connect.

If the subscription doesn’t exist, try to create one (in the user’s project) that is attached to a topic of the same name in the Pitt-Google project.

Note that messages published before the subscription is created are not available.

unpack_and_ack_messages(response, lighten_alerts=False, callback=None, **kwargs)[source]

Unpack and acknowledge messages in response; run callback if present.

If lighten_alerts is True, drop extra fields and flatten the alert dict.

callback is assumed to be a filter. It should accept an alert dict and return the dict if the alert passes the filter, else return None.