Basic Code Workflow

Each implementation does things a bit differently, but they share a basic workflow:

The Broker instantiates a Consumer and uses it to fetch, unpack, and process alerts.

The Consumer can accept a user filter and return only alerts that pass.

Here is a compact but working example of a Broker’s fetch_alerts method for the StreamRest implementation.

def fetch_alerts(self):
    from consumer_stream_rest import ConsumerStreamRest

    subscription_name = "ztf-loop"
    max_messages = 10
    lighten_alerts = True  # flatten the alert dict and drop extra fields. optional.
    # If you pass a callback function, the Consumer will run each alert through it.
    # Optional. Useful for user filters. Here's a basic demo.
    def user_filter(alert_dict):
        passes_filter = True
        if passes_filter:
            return alert_dict
        else:
            return None
    callback = user_filter

    consumer = ConsumerStreamRest(subscription_name)

    response = consumer.oauth2.post(
        f"{consumer.subscription_url}:pull", data={"maxMessages": max_messages},
    )

    alerts = consumer.unpack_and_ack_messages(
        response, lighten_alerts=lighten_alerts, callback=callback,
    )  # List[dict]

    return iter(alerts)