How to build a prompt tracker in Gemini

Introduction

In this post, I walk you through how to build a simple prompt tracker, using a combination of Python and Colab, Gemini, and BigQuery. We will make requests using the API as opposed to browser interaction, but this will allow you to get your hands dirty and see what outputs entail. It will also process the unfriendly redirect URLs that the Gemini API includes in its citation data, translating it to the final real URLs, as well as include query fan out data.

After completing this, I recommended that you begin visualizing the data over time in Data Studio.

Why build your own prompt tracker?

I can think of several basic reasons why you should consider this versus paying for services off of the shelf.

Setup

Create a Gemini API key

This script presumes you already have a Gemini API key, but if you don’t, go to https://aistudio.google.com/, create an API key, and enable billing. You will also need to associate it with a particular Google Cloud project as well.

Create a new notebook

Create a new notebook in Google Colab and make sure the account you’re using is the same account you’ll be uploading your data to BigQuery to.

Authentication

Because we will be passing the data to a BigQuery table, we need to authenticate with the account we’re uploading to. Because you’re using Colab to do this, it has a quick and streamlined authentication method built into it, which allows you to simply log into your own Google Account.

Create a new block of code and paste / run the code below.

from google.colab import auth

# Authenticate the user and grant access to Google Cloud services
print("Starting Google Colab Authentication...")
auth.authenticate_user()
print("Authentication successful!")

It will open a pop-up box prompting you to sign into your account. Do so and give it all necessary permissions as well.

Passing prompts to Gemini

Create another code block in your notebook.

I am going to break this down into sections to explain what everything does, so you will need to modify this code and paste it all into the same block.

Importing libraries

We are going to import several libraries to enable various functions. Most of these are standard and vanilla, but note the necessary Gemini imports.

from google import genai # Call Gemini
from google.genai import types # Call Gemini for grounding
import os # Used for global environmental variable
import re # Used for turning the response into a single line of text
import csv # Import the CSV module
import requests # Used for fixing the Vertex URLs
from google.cloud import storage, bigquery # Used for uploading to BigQuery

Variables

Modify the code below as needed. 

# --- Prompt Tracking Variables ---
model = "gemini-3-flash-preview" # Adjust this based on which model you want to use 
timestamp = "4/23/26" # Change the date
source = "Gemini"
buyers_journey_segment = "Commercial" # Adjust this as needed
category = "Add category here" # Adjust this as needed

# --- Configuration Variables ---

# Google Cloud and BigQuery Details
PROJECT_ID = "add-your-bigquery-project-id"
BQ_DATASET_ID = "add-your-bigquery-dataset-name"
BQ_TABLE_ID = "add-your-bigquery-table-name"

# The local CSV file you created with the extracted queries
LOCAL_CSV_FILE = "change-your-csv-name.csv"
all_results = [] # Essential for creating the CSV

# Google Cloud Storage (GCS) Details
GCS_BUCKET_NAME = "your-data-staging-bucket"
GCS_DESTINATION_BLOB = f"queries_load_data/{LOCAL_CSV_FILE}"  # Path in GCS bucket

# Gemini Details
os.environ['GEMINI_API_KEY'] = "add-your-gemini-api-key" # Add your API key here
client = genai.Client() # The client gets the API key from the environment variable `GEMINI_API_KEY`.

# Enable using Google Search and grounding queries

grounding_tool = types.Tool(
    google_search=types.GoogleSearch()
)

config = types.GenerateContentConfig(
    tools=[grounding_tool]
)

Functions

We will create three functions here. 

The first – add_citations() – will pull the metadata associated with the response, which collects the exported citations. It then processes the citations and translates them from a Gemini-specific URL that redirects to the target page to the target page itself. This will save you a headache for analyzing citations.

The second and third – upload_to_gcs() and load_csv_to_bigquery() – will take the final CSV file we create, which contains timestamps, prompts, responses, and other classification data, and upload it to our BigQuery table.

# Function: Adding Citations

def add_citations(response):
    text = response.text
    supports = response.candidates[0].grounding_metadata.grounding_supports
    chunks = response.candidates[0].grounding_metadata.grounding_chunks

    # Sort supports by end_index in descending order to avoid shifting issues when inserting.
    sorted_supports = sorted(supports, key=lambda s: s.segment.end_index, reverse=True)

    for support in sorted_supports:
        end_index = support.segment.end_index
        if support.grounding_chunk_indices:
            # Create citation string like [1](link1)[2](link2)
            citation_links = []
            for i in support.grounding_chunk_indices:
                if i < len(chunks):
                    uri = chunks[i].web.uri

                    try:
                      uri_fixed = requests.get(uri, allow_redirects=True)
                    except requests.ConnectionError as ce:
                        pass
                    except requests.Timeout as te:
                        pass
                    else:
                      uri = uri_fixed.url

                    citation_links.append(f"[{i + 1}]({uri})")

            citation_string = ", ".join(citation_links)
            text = text[:end_index] + citation_string + text[end_index:]

    return text

# Function: Uploading to Google Cloud Storage

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the GCS bucket."""
    print(f"1. Uploading {source_file_name} to GCS bucket: {bucket_name}...")
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)
    print(f"   File uploaded to gs://{bucket_name}/{destination_blob_name}")
    return f"gs://{bucket_name}/{destination_blob_name}"

# Function: Uploading to Google BigQuery

def load_csv_to_bigquery(gcs_uri, dataset_id, table_id):
    """Loads the CSV file from GCS into the specified BigQuery table, appending data."""
    print(f"2. Starting BigQuery load job into {dataset_id}.{table_id}...")
    bigquery_client = bigquery.Client(project=PROJECT_ID)

    table_ref = bigquery_client.dataset(dataset_id).table(table_id)

    # Configure the load job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,  # Skip the header row
        autodetect=True,      # BigQuery infers the schema and data types

        # *** CRUCIAL CHANGE HERE: WRITE_APPEND ***
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,

    )

    # Start the load job
    load_job = bigquery_client.load_table_from_uri(
        gcs_uri, table_ref, job_config=job_config
    )

    print(f"   Load job started: {load_job.job_id}")
    load_job.result()  # Wait for the job to complete

    # Check the result
    destination_table = bigquery_client.get_table(table_ref)
    print(f"3. Load job complete. Loaded {load_job.output_rows} rows into BigQuery.")

Setting the prompts you want to track

Next up in our block is the section where you want to declare the prompts you intend to track. In the appropriate variables – prompt and topic – make the changes needed, so it asks the question you want to track and classifies it as needed.

I’ve designed this so you can copy, paste, and adapt the same chunk over and over, depending on how far you want to take this. This is also assembled for convenience and can easily consolidated into it’s own function; however, it works as intended and I’m helping you dip your toes into prompt tracking, so c’est la vie.

It is also worth noting that I have included the query fan out data that is associated with the response. You can isolate this after-the-fact using something like SQL in BigQuery for your own analysis.

# Prompts go here

prompt = "Add your first prompt here." # Adjust to reflect the prompt you want to run
topic = "Assign the topic you want your prompt to represent." # Adjust to reflect the topic your prompt aligns with

# Note: I defined topic globally above, because I limit my personal prompt tracking to commercial queries. You can change it as needed here and elsewhere. 

response = client.models.generate_content(model=model, contents=prompt, config=config,)
candidate = response.candidates[0]
metadata = candidate.grounding_metadata if candidate.grounding_metadata else type('obj', (object,), {'web_search_queries': []})()
fan_out = getattr(metadata, 'web_search_queries', [])
text_with_citations = add_citations(response)
markdown_text = text_with_citations
single_line_text = ' '.join(markdown_text.split())
print(single_line_text)

all_results.append({
        'Date': timestamp,
        'Source': source,
        'Classification': buyers_journey_segment,
        'Category': category,
        'Topic': topic,
        'Prompt': prompt,
        'Response': single_line_text,
        'Model': model,
        'Query Fan Out': fan_out
})

prompt = "Add your second prompt here." # Adjust to reflect the prompt you want to run
topic = "Assign the topic you want your prompt to represent." # Adjust to reflect the topic your prompt aligns with

response = client.models.generate_content(model=model, contents=prompt, config=config,)
candidate = response.candidates[0]
metadata = candidate.grounding_metadata if candidate.grounding_metadata else type('obj', (object,), {'web_search_queries': []})()
fan_out = getattr(metadata, 'web_search_queries', [])
text_with_citations = add_citations(response)
markdown_text = text_with_citations
single_line_text = ' '.join(markdown_text.split())
print(single_line_text)

all_results.append({
        'Date': timestamp,
        'Source': source,
        'Classification': buyers_journey_segment,
        'Category': category,
        'Topic': topic,
        'Prompt': prompt,
        'Response': single_line_text,
        'Model': model,
        'Query Fan Out': fan_out
})

prompt = "Add your final prompt here." # Adjust to reflect the prompt you want to run
topic = "Assign the topic you want your prompt to represent." # Adjust to reflect the topic your prompt aligns with

response = client.models.generate_content(model=model, contents=prompt, config=config,)
candidate = response.candidates[0]
metadata = candidate.grounding_metadata if candidate.grounding_metadata else type('obj', (object,), {'web_search_queries': []})()
fan_out = getattr(metadata, 'web_search_queries', [])
text_with_citations = add_citations(response)
markdown_text = text_with_citations
single_line_text = ' '.join(markdown_text.split())
print(single_line_text)

all_results.append({
        'Date': timestamp,
        'Source': source,
        'Classification': buyers_journey_segment,
        'Category': category,
        'Topic': topic,
        'Prompt': prompt,
        'Response': single_line_text,
        'Model': model,
        'Query Fan Out': fan_out
})

print("Ended API calls; beginning CSV creation.")

Exporting and uploading the CSV

This section will create a CSV file with the data you have assembled. The first row will be assigned as the headers, while everything else will be added as rows below.

Once it finishes running, it will be available for download in Colab’s files. I include this both as a fail safe in case something prevents the BigQuery upload, as well as a step you can use to initially populate a table in BigQuery. The upload itself will both write (if not present) or append any uploads to the existing data.

# --- CSV Export Section ---

if all_results:
    # Get the fieldnames from the keys of the first dictionary
    fieldnames = all_results[0].keys()

    try:
        with open(LOCAL_CSV_FILE, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

            # Write the header row
            writer.writeheader()

            # Write all the data rows
            writer.writerows(all_results)

        print(f"✅ Successfully exported {len(all_results)} responses to **{LOCAL_CSV_FILE}**")
    except IOError as e:
        print(f"❌ Error writing to CSV file: {e}")
else:
    print("⚠️ No results were generated to export.")

print("Created CSV")

Uploading to BigQuery

In case you want to examine the CSV output before committing the data to a BigQuery table, I have separated this out. Copy and paste this below at the end of the code block to complete the last step.

# Upload to Cloud Storage and BigQuery

if not os.path.exists(LOCAL_CSV_FILE):
    print(f"ERROR: Local CSV file '{LOCAL_CSV_FILE}' not found. Please run the extraction script first.")
else:
    # 1. Upload CSV to GCS
    gcs_uri = upload_to_gcs(GCS_BUCKET_NAME, LOCAL_CSV_FILE, GCS_DESTINATION_BLOB)

    # 2. Load data from GCS into BigQuery (appending to the existing table)
    load_csv_to_bigquery(gcs_uri, BQ_DATASET_ID, BQ_TABLE_ID)

print("Upload completed")
print("End of script execution")

Conclusion

That’s the script – you now have a basic way to track prompts and responses in Gemini via the API. However, it is only a starting point. There are various ways you can improve this setup:

Thoughts? Feedback?

Send me a message on LinkedIn.