
Introduction
In this post, I walk you through how to build a simple prompt tracker, using a combination of Python and Colab, Gemini, and BigQuery. We will make requests using the API as opposed to browser interaction, but this will allow you to get your hands dirty and see what outputs entail. It will also process the unfriendly redirect URLs that the Gemini API includes in its citation data, translating it to the final real URLs, as well as include query fan out data.
After completing this, I recommended that you begin visualizing the data over time in Data Studio.
Why build your own prompt tracker?
I can think of several basic reasons why you should consider this versus paying for services off of the shelf.
- Cost savings. You pay directly for the API calls. When you pay for a pre-existing service, they limit the number of prompts and they add on substantial markup.
- Model and prompt control. You specify which model(s) you want to use, as well as the parameters you want to pass along with the request.
- You control the data. You store it and it is forever yours.
- You get a lot more data than what prompt tracking services provide. Building your own tracker means you can collect the responses, brand mentions, citations, and query fan outs. Since you own that data, you can track trends, monitor what is favored, layer on more robust citation and mention tracking, and do the same for competitors.
- Your analysis and prompt tracking dashboard is infinitely customizable. Aside from cost savings, because I’m paying for it out-of-pocket, one reason I found it necessary to build my own tracker was because of how lacking these platforms are. If generative AI platforms are heavily reliant on third-party websites, especially business directories, I need to track those profiles in addition to a set domain. For local businesses with professionals, like lawyers, doctors, and dentists, where individuals within the organization are just as likely to be recommended as the organization itself, I need to track all those mentions as well. And the same logic applies to competitors. I can’t do that with something like SEMRush or Ahrefs.
- You learn how these things work. Getting your hands dirty knowing how generative AI works and building tools is a mandatory minimum now.
Setup
Create a Gemini API key
This script presumes you already have a Gemini API key, but if you don’t, go to https://aistudio.google.com/, create an API key, and enable billing. You will also need to associate it with a particular Google Cloud project as well.
Create a new notebook
Create a new notebook in Google Colab and make sure the account you’re using is the same account you’ll be uploading your data to BigQuery to.
Authentication
Because we will be passing the data to a BigQuery table, we need to authenticate with the account we’re uploading to. Because you’re using Colab to do this, it has a quick and streamlined authentication method built into it, which allows you to simply log into your own Google Account.
Create a new block of code and paste / run the code below.
from google.colab import auth
# Authenticate the user and grant access to Google Cloud services
print("Starting Google Colab Authentication...")
auth.authenticate_user()
print("Authentication successful!")
It will open a pop-up box prompting you to sign into your account. Do so and give it all necessary permissions as well.
Passing prompts to Gemini
Create another code block in your notebook.
I am going to break this down into sections to explain what everything does, so you will need to modify this code and paste it all into the same block.
Importing libraries
We are going to import several libraries to enable various functions. Most of these are standard and vanilla, but note the necessary Gemini imports.
from google import genai # Call Gemini
from google.genai import types # Call Gemini for grounding
import os # Used for global environmental variable
import re # Used for turning the response into a single line of text
import csv # Import the CSV module
import requests # Used for fixing the Vertex URLs
from google.cloud import storage, bigquery # Used for uploading to BigQuery
Variables
Modify the code below as needed.
- You will need to declare the Gemini model you wish to use. You can find a list of models here, as well as the pricing associated with the models here.
- You will need to declare a timestamp, so change and modify as needed.
- I group my prompts by category, topic, and buyer’s journey. Depending on how you classify and group your prompts, you may or may not want to use these. For me, I only track commercial prompts, and I have multiple code blocks dedicated to different categories of commercial blocks, so I globally declare the category in a given block, and down below in the prompt section, I manually set the topic.
(As an example, category could be “saas” and the topic could be “automation”.) - Your BigQuery setup will be different that mine, so all you need to know is that there are three variables for declaring this, and you will need to declare the project ID, dataset, and table you wish to store the upload in.
- Also, as a fail safe, this initially outputs a CSV file that is uploaded to BigQuery, so you’ll need to name it something appropriate.
- Finally, you’ll need to add your API in the corresponding variable.
# --- Prompt Tracking Variables ---
model = "gemini-3-flash-preview" # Adjust this based on which model you want to use
timestamp = "4/23/26" # Change the date
source = "Gemini"
buyers_journey_segment = "Commercial" # Adjust this as needed
category = "Add category here" # Adjust this as needed
# --- Configuration Variables ---
# Google Cloud and BigQuery Details
PROJECT_ID = "add-your-bigquery-project-id"
BQ_DATASET_ID = "add-your-bigquery-dataset-name"
BQ_TABLE_ID = "add-your-bigquery-table-name"
# The local CSV file you created with the extracted queries
LOCAL_CSV_FILE = "change-your-csv-name.csv"
all_results = [] # Essential for creating the CSV
# Google Cloud Storage (GCS) Details
GCS_BUCKET_NAME = "your-data-staging-bucket"
GCS_DESTINATION_BLOB = f"queries_load_data/{LOCAL_CSV_FILE}" # Path in GCS bucket
# Gemini Details
os.environ['GEMINI_API_KEY'] = "add-your-gemini-api-key" # Add your API key here
client = genai.Client() # The client gets the API key from the environment variable `GEMINI_API_KEY`.
# Enable using Google Search and grounding queries
grounding_tool = types.Tool(
google_search=types.GoogleSearch()
)
config = types.GenerateContentConfig(
tools=[grounding_tool]
)
Functions
We will create three functions here.
The first – add_citations() – will pull the metadata associated with the response, which collects the exported citations. It then processes the citations and translates them from a Gemini-specific URL that redirects to the target page to the target page itself. This will save you a headache for analyzing citations.
The second and third – upload_to_gcs() and load_csv_to_bigquery() – will take the final CSV file we create, which contains timestamps, prompts, responses, and other classification data, and upload it to our BigQuery table.
# Function: Adding Citations
def add_citations(response):
text = response.text
supports = response.candidates[0].grounding_metadata.grounding_supports
chunks = response.candidates[0].grounding_metadata.grounding_chunks
# Sort supports by end_index in descending order to avoid shifting issues when inserting.
sorted_supports = sorted(supports, key=lambda s: s.segment.end_index, reverse=True)
for support in sorted_supports:
end_index = support.segment.end_index
if support.grounding_chunk_indices:
# Create citation string like [1](link1)[2](link2)
citation_links = []
for i in support.grounding_chunk_indices:
if i < len(chunks):
uri = chunks[i].web.uri
try:
uri_fixed = requests.get(uri, allow_redirects=True)
except requests.ConnectionError as ce:
pass
except requests.Timeout as te:
pass
else:
uri = uri_fixed.url
citation_links.append(f"[{i + 1}]({uri})")
citation_string = ", ".join(citation_links)
text = text[:end_index] + citation_string + text[end_index:]
return text
# Function: Uploading to Google Cloud Storage
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the GCS bucket."""
print(f"1. Uploading {source_file_name} to GCS bucket: {bucket_name}...")
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f" File uploaded to gs://{bucket_name}/{destination_blob_name}")
return f"gs://{bucket_name}/{destination_blob_name}"
# Function: Uploading to Google BigQuery
def load_csv_to_bigquery(gcs_uri, dataset_id, table_id):
"""Loads the CSV file from GCS into the specified BigQuery table, appending data."""
print(f"2. Starting BigQuery load job into {dataset_id}.{table_id}...")
bigquery_client = bigquery.Client(project=PROJECT_ID)
table_ref = bigquery_client.dataset(dataset_id).table(table_id)
# Configure the load job
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1, # Skip the header row
autodetect=True, # BigQuery infers the schema and data types
# *** CRUCIAL CHANGE HERE: WRITE_APPEND ***
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
# Start the load job
load_job = bigquery_client.load_table_from_uri(
gcs_uri, table_ref, job_config=job_config
)
print(f" Load job started: {load_job.job_id}")
load_job.result() # Wait for the job to complete
# Check the result
destination_table = bigquery_client.get_table(table_ref)
print(f"3. Load job complete. Loaded {load_job.output_rows} rows into BigQuery.")
Setting the prompts you want to track
Next up in our block is the section where you want to declare the prompts you intend to track. In the appropriate variables – prompt and topic – make the changes needed, so it asks the question you want to track and classifies it as needed.
I’ve designed this so you can copy, paste, and adapt the same chunk over and over, depending on how far you want to take this. This is also assembled for convenience and can easily consolidated into it’s own function; however, it works as intended and I’m helping you dip your toes into prompt tracking, so c’est la vie.
It is also worth noting that I have included the query fan out data that is associated with the response. You can isolate this after-the-fact using something like SQL in BigQuery for your own analysis.
# Prompts go here
prompt = "Add your first prompt here." # Adjust to reflect the prompt you want to run
topic = "Assign the topic you want your prompt to represent." # Adjust to reflect the topic your prompt aligns with
# Note: I defined topic globally above, because I limit my personal prompt tracking to commercial queries. You can change it as needed here and elsewhere.
response = client.models.generate_content(model=model, contents=prompt, config=config,)
candidate = response.candidates[0]
metadata = candidate.grounding_metadata if candidate.grounding_metadata else type('obj', (object,), {'web_search_queries': []})()
fan_out = getattr(metadata, 'web_search_queries', [])
text_with_citations = add_citations(response)
markdown_text = text_with_citations
single_line_text = ' '.join(markdown_text.split())
print(single_line_text)
all_results.append({
'Date': timestamp,
'Source': source,
'Classification': buyers_journey_segment,
'Category': category,
'Topic': topic,
'Prompt': prompt,
'Response': single_line_text,
'Model': model,
'Query Fan Out': fan_out
})
prompt = "Add your second prompt here." # Adjust to reflect the prompt you want to run
topic = "Assign the topic you want your prompt to represent." # Adjust to reflect the topic your prompt aligns with
response = client.models.generate_content(model=model, contents=prompt, config=config,)
candidate = response.candidates[0]
metadata = candidate.grounding_metadata if candidate.grounding_metadata else type('obj', (object,), {'web_search_queries': []})()
fan_out = getattr(metadata, 'web_search_queries', [])
text_with_citations = add_citations(response)
markdown_text = text_with_citations
single_line_text = ' '.join(markdown_text.split())
print(single_line_text)
all_results.append({
'Date': timestamp,
'Source': source,
'Classification': buyers_journey_segment,
'Category': category,
'Topic': topic,
'Prompt': prompt,
'Response': single_line_text,
'Model': model,
'Query Fan Out': fan_out
})
prompt = "Add your final prompt here." # Adjust to reflect the prompt you want to run
topic = "Assign the topic you want your prompt to represent." # Adjust to reflect the topic your prompt aligns with
response = client.models.generate_content(model=model, contents=prompt, config=config,)
candidate = response.candidates[0]
metadata = candidate.grounding_metadata if candidate.grounding_metadata else type('obj', (object,), {'web_search_queries': []})()
fan_out = getattr(metadata, 'web_search_queries', [])
text_with_citations = add_citations(response)
markdown_text = text_with_citations
single_line_text = ' '.join(markdown_text.split())
print(single_line_text)
all_results.append({
'Date': timestamp,
'Source': source,
'Classification': buyers_journey_segment,
'Category': category,
'Topic': topic,
'Prompt': prompt,
'Response': single_line_text,
'Model': model,
'Query Fan Out': fan_out
})
print("Ended API calls; beginning CSV creation.")
Exporting and uploading the CSV
This section will create a CSV file with the data you have assembled. The first row will be assigned as the headers, while everything else will be added as rows below.
Once it finishes running, it will be available for download in Colab’s files. I include this both as a fail safe in case something prevents the BigQuery upload, as well as a step you can use to initially populate a table in BigQuery. The upload itself will both write (if not present) or append any uploads to the existing data.
# --- CSV Export Section ---
if all_results:
# Get the fieldnames from the keys of the first dictionary
fieldnames = all_results[0].keys()
try:
with open(LOCAL_CSV_FILE, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write the header row
writer.writeheader()
# Write all the data rows
writer.writerows(all_results)
print(f"✅ Successfully exported {len(all_results)} responses to **{LOCAL_CSV_FILE}**")
except IOError as e:
print(f"❌ Error writing to CSV file: {e}")
else:
print("⚠️ No results were generated to export.")
print("Created CSV")
Uploading to BigQuery
In case you want to examine the CSV output before committing the data to a BigQuery table, I have separated this out. Copy and paste this below at the end of the code block to complete the last step.
# Upload to Cloud Storage and BigQuery
if not os.path.exists(LOCAL_CSV_FILE):
print(f"ERROR: Local CSV file '{LOCAL_CSV_FILE}' not found. Please run the extraction script first.")
else:
# 1. Upload CSV to GCS
gcs_uri = upload_to_gcs(GCS_BUCKET_NAME, LOCAL_CSV_FILE, GCS_DESTINATION_BLOB)
# 2. Load data from GCS into BigQuery (appending to the existing table)
load_csv_to_bigquery(gcs_uri, BQ_DATASET_ID, BQ_TABLE_ID)
print("Upload completed")
print("End of script execution")
Conclusion
That’s the script – you now have a basic way to track prompts and responses in Gemini via the API. However, it is only a starting point. There are various ways you can improve this setup:
- You absolutely should visualize this data in Data Studio and create an interactive dashboard with it.
- You can transition this setup from using the API to interacting in the interface via the browser. You will acquire more accurate data. In its current state, it merely provides directional insights, but some of the things you can gain from it are useful.
- You can clean up the script to make it more efficient.
- You can do what I did and monitor for mentions and citations in Data Studio. REGEX and custom dimensions make this easy to do. You can also further process the data using Python.
- You can do what I did and layer on a competitor analysis in Data Studio. REGEX and custom dimensions make this easy to do.