

TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the Python client.

Relies on ConsumerStreamPython to manage the connections and work with data.

See especially:


Entry point to pull and filter alerts.


Apply the filter indicated by the form's parameters.

class tom_pittgoogle.broker_stream_python.BrokerStreamPython[source]

Pitt-Google broker interface to pull alerts from Pub/Sub via the Python client.

Base class: tom_alerts.alerts.GenericBroker


Entry point to pull and filter alerts.

Pull alerts using a Python client, unpack, apply user filter.

This demo assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator here. If the user really cares about the iterator, ConsumerStreamPython.stream_alerts can be tweaked to yield the alerts in real time.


alias of tom_pittgoogle.broker_stream_python.FilterAlertsForm


Map the Pitt-Google alert to a TOM GenericAlert.

static user_filter(alert_dict, parameters)[source]

Apply the filter indicated by the form’s parameters.

Used as the callback to BrokerStreamPython.unpack_and_ack_messages.

  • alert_dict – Single alert, ZTF packet data as a dictionary. The schema depends on the value of lighten_alerts passed to BrokerStreamPython.unpack_and_ack_messages. If lighten_alerts=False it is the original ZTF alert schema ( If lighten_alerts=True the dict is flattened and extra fields are dropped.

  • parameters – parameters submitted by the user through the form.


alert_dict if it passes the filter, else None

class tom_pittgoogle.broker_stream_python.FilterAlertsForm(*args, **kwargs)[source]

Basic form for filtering alerts.


subscription_name (CharField)

classtar_threshold (FloatField)

classtar_gt_lt (ChoiceField)

max_results (IntegerField)

timeout (IntegerField)

max_backlog (IntegerField)

save_metadata (ChoiceField)

property media

Return all media required to render the widgets on this form.


Consumer class to manage Pub/Sub connections via a Python client, and work with data.

Pub/Sub Python Client docs:

Used by BrokerStreamPython, but can be called independently.

Use-case: Save alerts to a database

This demo assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator by the Broker. If the user really cares about the Broker’s iterator, stream_alerts can be tweaked to yield the alerts in real time.

Basic workflow:

consumer = ConsumerStreamPython(subscription_name)

alert_dicts_list = consumer.stream_alerts(
# alerts are processed and saved in real time. the list is returned for convenience.

See especially:


Guide user through authentication; create OAuth2Session for credentials.


Make sure the subscription exists and we can connect.


Execute a streaming pull and process alerts through the callback.


Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.


Save the alert to a database.

class tom_pittgoogle.consumer_stream_python.ConsumerStreamPython(subscription_name)[source]

Consumer class to manage Pub/Sub connections and work with messages.

Initialization does the following:

  • Authenticate the user via OAuth 2.0.

  • Create a object.

  • Create a queue.Queue object to communicate with the background thread running the streaming pull.

  • Make sure the subscription exists and we can connect. Create it, if needed.

To view logs, visit:

  • Make sure you are logged in, and your project is selected in the dropdown at the top.

  • Click the “Log name” dropdown and select the subscription name you instantiate this consumer with.

TODO: Give the user a standard logger.


Guide user through authentication; create OAuth2Session for credentials.

The user will need to visit a URL, authenticate themselves, and authorize PittGoogleConsumer to make API calls on their behalf.

The user must have a Google account that is authorized make API calls through the project defined by the GOOGLE_CLOUD_PROJECT variable in the Django file. Any project can be used, as long as the user has access.

Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us.

TODO: Integrate this with Django. For now, the user interacts via command line.


Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.

Used as the callback for the streaming pull.


Delete the subscription.

This is provided for the user’s convenience, but it is not necessary and is not automatically called.

  • Storage of unacknowledged Pub/Sub messages does not result in fees.

  • Unused subscriptions automatically expire; default is 31 days.


Make sure the subscription exists and we can connect.

If the subscription doesn’t exist, try to create one (in the user’s project) that is attached to a topic of the same name in the Pitt-Google project.

Note that messages published before the subscription is created are not available.


Save the alert to a database.

stream_alerts(lighten_alerts=False, user_filter=None, parameters=None)[source]

Execute a streaming pull and process alerts through the callback.

The streaming pull happens in a background thread. A queue.Queue is used to communicate between threads and enforce the stopping condition(s).

  • lighten_alerts (bool) – If True, drop extra fields and flatten the alert dict

  • user_filter (Callable) – Used by callback to filter alerts before saving. It should accept a single alert (ZTF packet data) as a dictionary. The schema depends on the value of lighten_alerts. If lighten_alerts=False it is the original ZTF alert schema ( If lighten_alerts=True the dict is flattened and extra fields are dropped. It should return the alert dict if it passes the filter, else None.

  • parameters (dict) – User’s parameters. Must include the parameters defined in the Broker’s FilterAlertsForm.