The good thing about Coral TPUs are they are really fast. The bad thing about Coral TPUs are the models they use are really small so they aren’t very accurate. Luckily Frigate has a really advanced system for configuring object detection filters by size, ratio, etc. As well as a deep mechanic for zones to manage object and motion detection. That being said, I have 15 cameras and it’s going to take some time to fine tune them all.

 

While I am learning to optimize Frigate’s configuration for motion and object detection, I decided to tinker with my Ollama server a little too.

Frigate is using a lightweight image detection model, so I wrote a Python service that is listening to MQTT events, double checking the thumbnails with the llava vision model, and marking false positives. This isn’t a long term solution because it isn’t as accurate as I would like either, but it’s a fun experiment.

import paho.mqtt.client as mqtt
import requests
import json
import base64
import os  

# Configuration parameters
MQTT_BROKER = '192.168.1.2'          # Update with your MQTT broker address
MQTT_PORT = 1883                       # Update with your MQTT broker port
MQTT_TOPIC = 'frigate/events/#'        # Subscribe to  'frigate/events'

FRIGATE_API_URL = 'http://192.168.1.2:5000'  
OLLAMA_SERVER_URL = 'http://192.168.1.2:7869/api/generate'  # LLAVA model API

# Create directories for debugging if they don't exist
os.makedirs('reviewed_images/true', exist_ok=True)
os.makedirs('reviewed_images/false', exist_ok=True)

def on_connect(client, userdata, flags, rc):
    """
    Callback when the client receives a CONNACK response from the server.
    """
    if rc == 0:
        print("Connected to MQTT Broker!")
        print("Listening for Frigate events...")
        client.subscribe(MQTT_TOPIC)
    else:
        print(f"Failed to connect to MQTT Broker. Return code: {rc}")

def on_message(client, userdata, msg):
    """
    Callback when a PUBLISH message is received from the server.
    """
    try:
        payload = json.loads(msg.payload)
    except json.JSONDecodeError:
        print("Received a message that is not valid JSON.")
        return

    event_type = payload.get('type')
    # Process 'end' events to ensure the snapshot is available
    if event_type == 'end':
        event = payload.get('after', {})
        event_id = event.get('id')
        camera = event.get('camera')
        label = event.get('label')
        has_snapshot = event.get('has_snapshot', False)

        if not event_id:
            print("Received an event without an ID. Skipping.")
            return

        if has_snapshot:
            print(f"Processing event {event_id} from camera {camera}")
            # Get the thumbnail image
            thumbnail_url = f"{FRIGATE_API_URL}/api/events/{event_id}/thumbnail.jpg"
            try:
                response = requests.get(thumbnail_url)
                response.raise_for_status()
                image_data = response.content
                # Encode the image in Base64
                image_base64 = base64.b64encode(image_data).decode('utf-8')
            except requests.exceptions.RequestException as e:
                print(f"Failed to retrieve thumbnail for event {event_id}: {e}")
                return

            # Prepare event details for Ollama
            event_details = {
                'id': event_id,
                'thumbnail': image_base64
            }

            # Send the event to Ollama
            is_valid = send_event_to_ollama(event_details)
            if is_valid:
                print(f"Event {event_id}: LLAVA model confirmed the presence of a person, dog, cat, or vehicle.")
                # Leave the event as is
            else:
                print(f"Event {event_id}: LLAVA model did NOT detect a person, dog, cat, or vehicle.")
                # Mark the event as reviewed/false positive
                mark_event_as_false_positive(event_id)

            # Uncomment the following line to save reviewed images for debugging
            with open(f"reviewed_images/{'true' if is_valid else 'false'}/{event_id}.jpg", "wb") as f: f.write(image_data)

        else:
            print(f"Event {event_id} does not have a snapshot available.")
    else:
        # For other event types, you can handle them here if needed
        pass

def send_event_to_ollama(event_details):
    """
    Sends the event details to the Ollama server and returns whether the image contains
    a person, dog, cat, or vehicle based on the LLAVA model's response.

    Returns:
        bool: True if the image contains a person, dog, cat, or vehicle; False otherwise.
    """
    # Create the system prompt and the user prompt
    system_prompt = "You are an expert in image classification. Your responses must be limited to either 'true' or 'false'."
    user_prompt = "Does this image contain a person, dog, cat, or vehicle? Answer only with 'true' or 'false'."

    # Construct the JSON payload with the system prompt
    prompt_payload = {
        "model": "llava:7b",
        "image_base64": event_details.get('thumbnail'),
        "prompt": user_prompt,
        "system_prompt": system_prompt,
        "stream": False
    }

    try:
        response = requests.post(OLLAMA_SERVER_URL, json=prompt_payload)
        response.raise_for_status()
        response_data = response.json()
        # Assuming the response has a 'response' field containing the answer
        result = response_data.get('response', '').strip().lower()

        # Interpret the response
        if result.startswith("true") or result.startswith("yes"):
            return True
        elif result.startswith("false") or result.startswith("no"):
            return False
        else:
            print(f"Unexpected response from Ollama for event ID {event_details['id']}: {result}")
            return False
    except requests.RequestException as e:
        print(f"Error from Ollama for event ID {event_details['id']}: {e}")
        return False

def mark_event_as_false_positive(event_id):
    """
    Marks the event as a false positive using the Frigate API.
    """
    url = f"{FRIGATE_API_URL}/api/events/{event_id}/false_positive"
    try:
        response = requests.put(url)
        if response.status_code == 200:
            print(f"Event {event_id} marked as false positive.")
        else:
            print(f"Failed to mark event {event_id} as false positive. Status code: {response.status_code} - {response.text}")
    except requests.exceptions.RequestException as e:
        print(f"Failed to mark event {event_id} as false positive: {e}")

def main():
    # Set up the MQTT client
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message

    # Connect to the MQTT broker
    try:
        client.connect(MQTT_BROKER, MQTT_PORT, 60)
    except Exception as e:
        print(f"Failed to connect to MQTT broker: {e}")
        return

    # Start the MQTT client loop
    try:
        client.loop_forever()
    except KeyboardInterrupt:
        print("Exiting...")
        client.disconnect()

if __name__ == '__main__':
    main()