StreamPython

Note

The Pitt-Google broker uses Pub/Sub to publish live streams, rather than Apache Kafka. See Pub/Sub Message Service for a basic overview.

BrokerStreamPython

TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the Python client.

Relies on ConsumerStreamPython to manage the connections and work with data.

See especially:

BrokerStreamPython.fetch_alerts

Entry point to pull and filter alerts.

BrokerStreamPython.user_filter

Apply the filter indicated by the form's parameters.

class tom_pittgoogle.broker_stream_python.BrokerStreamPython[source]

Pitt-Google broker interface to pull alerts from Pub/Sub via the Python client.

Base class: tom_alerts.alerts.GenericBroker

fetch_alerts(parameters)[source]

Entry point to pull and filter alerts.

Pull alerts using a Python client, unpack, apply user filter.

This demo assumes that the real use-case is to save alerts to a database rather than view them through a TOM site. Therefore, the Consumer currently saves the alerts in real time, and then simply returns a list of alerts after all messages are processed. That list is then coerced into an iterator here. If the user really cares about the iterator, ConsumerStreamPython.stream_alerts can be tweaked to yield the alerts in real time.

form

alias of tom_pittgoogle.broker_stream_python.FilterAlertsForm

to_generic_alert(alert_dict)[source]

Map the Pitt-Google alert to a TOM GenericAlert.

to_target(alert_dict)[source]

Map the Pitt-Google alert to a TOM Target.

static user_filter(alert_dict, **parameters)[source]

Apply the filter indicated by the form’s parameters.

Used as the callback to BrokerStreamPython.unpack_and_ack_messages.

Parameters
  • alert_dict – Single alert, ZTF packet data as a dictionary. The schema depends on the value of lighten_alerts passed to BrokerStreamPython.unpack_and_ack_messages. If lighten_alerts=False it is the original ZTF alert schema (https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html). If lighten_alerts=True the dict is flattened and extra fields are dropped.

  • parameters – parameters submitted by the user through the form.

Returns

alert_dict if it passes the filter, else None

class tom_pittgoogle.broker_stream_python.FilterAlertsForm(*args, **kwargs)[source]

Basic form for filtering alerts.

Fields:

subscription_name (CharField)

classtar_threshold (FloatField)

classtar_gt_lt (ChoiceField)

max_results (IntegerField)

timeout (IntegerField)

max_backlog (IntegerField)

property media

Return all media required to render the widgets on this form.

ConsumerStreamPython

Consumer class to pull Pub/Sub messages via a Python client, and work with data.

Pub/Sub Python Client docs: https://googleapis.dev/python/pubsub/latest/index.html

Basic workflow:

consumer = ConsumerStreamPython(subscription_name)

def user_callback(alert_dict):
    # Process the alert as desired. If processing is successful,
    # tell the consumer to send an acknowldgement to Pub/Sub.
    success = True
    if success:
        ack = True
    else:
        ack = False
    return result = {"ack": ack}

alert_dicts_list = consumer.stream_alerts(user_callback=user_callback)

See especially:

ConsumerStreamPython.stream_alerts

Execute a streaming pull, processing messages through self.callback.

ConsumerStreamPython.callback

Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.

class tom_pittgoogle.consumer_stream_python.ConsumerStreamPython(subscription_name, save_fields=None)[source]

Consumer class to manage Pub/Sub connections and work with messages.

Initialization does the following:

  • Authenticate the user via a key file or OAuth 2.0.

  • Add a Google Cloud logging handler to the LOGGER. To view, visit: https://console.cloud.google.com/logs.

  • Create a client object (google.cloud.pubsub_v1.SubscriberClient).

  • Create a queue.Queue object to communicate with the background thread running the streaming pull.

  • Make sure the subscription exists and we can connect. Create it, if needed.

authenticate_with_oauth()[source]

Guide user through authentication; create OAuth2Session for credentials.

The user will need to visit a URL, authenticate themselves, and authorize PittGoogleConsumer to make API calls on their behalf.

The user must have a Google account that is authorized make API calls through the project defined by the GOOGLE_CLOUD_PROJECT variable in the Django settings.py file. Any project can be used, as long as the user has access.

Additional requirement because this is still in dev: The OAuth is restricted to users registered with Pitt-Google, so contact us.

TODO: Integrate this with Django. For now, the user interacts via command line.

callback(message)[source]

Process a single alert; run user filter; save alert; acknowledge Pub/Sub msg.

Used as the callback for the streaming pull.

delete_subscription()[source]

Delete the subscription.

This is provided for the user’s convenience, but it is not necessary and is not automatically called.

  • Storage of unacknowledged Pub/Sub messages does not result in fees.

  • Unused subscriptions automatically expire; default is 31 days.

get_credentials(user_project)[source]

Create user credentials object from service account credentials or an OAuth.

Try service account credentials first. Fall back to OAuth.

stream_alerts(send_data='alert_dict', include_metadata=False, user_callback=None, return_list=False, flow_configs=None, **user_kwargs)[source]

Execute a streaming pull, processing messages through self.callback.

The streaming pull happens in a background thread. A queue.Queue is used to communicate between threads and enforce the stopping condition(s).

Parameters
  • send_data (str) – String specifying the data and format requested. Options are: ‘alert_dict’ (default, alert packet data cast to a dictionary), ‘alert_bytes’ (alert packet data as bytes, as received from Pub/Sub), ‘full_msg’ (complete google.pubsub_v1.types.PubsubMessage object)

  • include_metadata (bool) – If False (default), message will be returned acccording to send_data. If True, message will be delivered as a dictionary with two keys: ‘metadata_dict’ and one of ‘alert_dict’ or ‘alert_bytes’ according to send_data. This has no effect if format=’full_msg’.

  • user_callback (Callable) – Used by self.callback to perform user- requested processing in the background thread. It must accept the message as the first argument (type and contents determined by send_data and include_metadata), and can accept arbitrary keyword arguments which should be passed via user_kwargs. It should return a dictionary with two keys, ‘ack’ and ‘result’, with values as follows. ‘ack’: boolean indicating whether the message should be acknowledged to Pub/Sub. Use False to indicate that the message cannot be processed at this time, and Pub/Sub should redeliver it in the future. ‘result’: (arbitrary type) the result of the processing. If return_list is True, this is the value that will be returned in the list. If this is explicitly set to None, the message will not be counted towards max_results, and it will not be returned in the list (if return_list is True). If the ‘result’ key is absent, the message will be counted towards max_results, and the original message data will be returned in the list (if return_list is True).

  • return_list (bool) – If True, messages will be processed through self.callback, and results will be collected in memory and returned as a list after the streaming pull is complete. If False (default), the user_callback is responsible for saving the processing results.

  • flow_configs (dict) – Dictionary defining stopping conditions and flow control settings for the streaming pull. May contain keys: max_results (int, default=10, use None for no limit; maximum number of messages to process/collect before stopping) timeout (int; default=30, use None for no limit; maximum number of seconds to wait for a new message before stopping) max_backlog (int, default=min(1000, max_results; maximum number of received but unprocessed messages before pausing the streaming pull).

  • user_kwargs (dict) – Dict defining keyword arguments to be sent to user_callback.

touch_subscription()[source]

Make sure the subscription exists and we can connect.

If the subscription doesn’t exist, try to create one (in the user’s project) that is attached to a topic of the same name in the Pitt-Google project.

Note that messages published before the subscription is created are not available.