#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""TOM Toolkit broker to listen to a Pitt-Google Pub/Sub stream via the Python client.
Relies on `ConsumerStreamPython` to manage the connections and work with data.
See especially:
.. autosummary::
:nosignatures:
BrokerStreamPython.fetch_alerts
BrokerStreamPython.user_filter
"""
from django import forms
from tom_alerts.alerts import GenericQueryForm, GenericAlert, GenericBroker
from tom_targets.models import Target
from .consumer_stream_python import ConsumerStreamPython
from .utils.templatetags.utility_tags import jd_to_readable_date
[docs]class BrokerStreamPython(GenericBroker):
"""Pitt-Google broker interface to pull alerts from Pub/Sub via the Python client.
Base class: ``tom_alerts.alerts.GenericBroker``
"""
name = "Pitt-Google StreamPython"
form = FilterAlertsForm
[docs] def fetch_alerts(self, parameters):
"""Entry point to pull and filter alerts.
Pull alerts using a Python client, unpack, apply user filter.
This demo assumes that the real use-case is to save alerts to a database
rather than view them through a TOM site.
Therefore, the `Consumer` currently saves the alerts in real time,
and then simply returns a list of alerts after all messages are processed.
That list is then coerced into an iterator here.
If the user really cares about the iterator,
`ConsumerStreamPython.stream_alerts` can be tweaked to yield the alerts in
real time.
"""
clean_params = self._clean_parameters(parameters)
self.consumer = ConsumerStreamPython(clean_params['subscription_name'])
alert_dicts_list = self.consumer.stream_alerts(
user_filter=self.user_filter,
parameters=clean_params,
)
return iter(alert_dicts_list)
def _clean_parameters(self, parameters):
clean_params = dict(parameters)
# there must be at least one stopping condition
if (clean_params['max_results'] is None) & (clean_params['timeout'] is None):
raise ValueError((
"You must set at least one stopping condition. "
"max_results and timeout cannot both be None."
))
if clean_params['max_backlog'] is None:
clean_params['max_backlog'] = 1000 # keep the google default of 1000
return clean_params
[docs] @staticmethod
def user_filter(alert_dict, parameters):
"""Apply the filter indicated by the form's parameters.
Used as the `callback` to `BrokerStreamPython.unpack_and_ack_messages`.
Args:
`alert_dict`: Single alert, ZTF packet data as a dictionary.
The schema depends on the value of `lighten_alerts` passed to
`BrokerStreamPython.unpack_and_ack_messages`.
If `lighten_alerts=False` it is the original ZTF alert schema
(https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html).
If `lighten_alerts=True` the dict is flattened and extra
fields are dropped.
`parameters`: parameters submitted by the user through the form.
Returns:
`alert_dict` if it passes the filter, else `None`
"""
if parameters["classtar_threshold"] is None:
# no filter requested. all alerts pass
return alert_dict
# run the filter
lt_threshold = alert_dict["classtar"] < parameters["classtar_threshold"]
if ((parameters["classtar_gt_lt"] == "lt") & lt_threshold) or (
(parameters["classtar_gt_lt"] == "gt") & ~lt_threshold
):
return alert_dict
else:
return None
[docs] def to_generic_alert(self, alert_dict):
"""Map the Pitt-Google alert to a TOM `GenericAlert`."""
return GenericAlert(
timestamp=jd_to_readable_date(alert_dict["jd"]),
# url=self.consumer.pull_url,
# this is not a valid url
url="https://pubsub.googleapis.com/v1/{subscription_path}",
id=alert_dict["candid"],
name=alert_dict["objectId"],
ra=alert_dict["ra"],
dec=alert_dict["dec"],
mag=alert_dict["magpsf"],
score=alert_dict["classtar"],
)
[docs] def to_target(self, alert_dict):
"""Map the Pitt-Google alert to a TOM `Target`."""
return Target(
# identifier=alert_dict['candid'],
name=alert_dict['objectId'],
type='SIDEREAL',
# designation='MY ALERT',
ra=alert_dict['ra'],
dec=alert_dict['dec'],
# epoch=alert_dict['jd'],
)
# def get_or_create_target(self, alert_dict):
# target, created = Target.objects.get_or_create(
# name=alert_dict['objectId'],
# type='SIDEREAL',
# ra=alert_dict['ra'],
# dec=alert_dict['dec'],
# )