Getting started with alloydb-ai-nl tool
Objective
In this notebook, we will be using the AlloyDB AI NL tool to generate and execute SQL queries on AlloyDB databases. We will start by setting up an AlloyDB database and then connecting to it using MCP Toolbox and ADK. This allows us to leverage AlloyDB NL to SQL capabilities in agentic applications.
For detailed information on the AlloyDB capability, please see AlloyDB AI Natural Language Overview.
Before you Begin
Complete the before you begin setup from Use AlloyDB AI natural language to generate SQL.
Step 1: Set up Database
Here, we will set up a sample AlloyDB database for testing out our alloydb-ai-nl
tool.
Authenticate to Google Cloud within Colab
You need to authenticate as an IAM user so this notebook can access your Google Cloud Project. This access is necessary to use Google’s LLM models.
# Run this and allow access through the pop-up
from google.colab import auth
auth.authenticate_user()
Connect Your Google Cloud Project
# @markdown Please fill in the value below with your GCP project ID and then run the cell.
# Please fill in these values.
project_id = "project-id" # @param {type:"string"}
# Quick input validations.
assert project_id, "⚠️ Please provide a Google Cloud project ID"
# Configure gcloud.
!gcloud config set project {project_id}
Set up AlloyDB
You will need a Postgres AlloyDB instance for the following stages of this notebook. Please set the following variables to connect to your instance or create a new instance.
Please make sure that your instance has the required access.
# @markdown Please fill in the both the Google Cloud region and name of your AlloyDB instance. Once filled in, run the cell.
# Please fill in these values.
region = "us-central1" # @param {type:"string"}
cluster_name = "alloydb-ai-nl-testing" # @param {type:"string"}
instance_name = "alloydb-ai-nl-testing-instance" # @param {type:"string"}
database_name = "ai_nl_tool_testing" # @param {type:"string"}
password = input("Please provide a password to be used for 'postgres' database user: ")
# Quick input validations.
assert region, "⚠️ Please provide a Google Cloud region"
assert instance_name, "⚠️ Please provide the name of your instance"
assert database_name, "⚠️ Please provide the name of your database_name"
assert cluster_name, "⚠️ Please provide the name of your cluster_name"
assert password, "⚠️ Please provide your password"
Connect to AlloyDB
%pip install \
google-cloud-alloydb-connector[asyncpg]==1.4.0 \
sqlalchemy==2.0.36 \
vertexai==1.70.0 \
asyncio==3.4.3 \
greenlet==3.1.1 \
--quiet
import asyncpg
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector, IPTypes
async def init_connection_pool(connector: AsyncConnector, db_name: str, pool_size: int = 5) -> AsyncEngine:
# initialize Connector object for connections to AlloyDB
connection_string = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}/instances/{instance_name}"
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
connection_string,
"asyncpg",
user="postgres",
password=password,
db=db_name,
ip_type=IPTypes.PUBLIC,
)
return conn
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
pool_size=pool_size,
max_overflow=0,
)
return pool
connector = AsyncConnector()
Create a Database
Next, you will create database to store the data using the connection pool. Enabling public IP takes a few minutes, you may get an error that there is no public IP address. Please wait and retry this step if you hit an error!
from sqlalchemy import text, exc
async def create_db(database_name, connector):
pool = await init_connection_pool(connector, "postgres")
async with pool.connect() as conn:
# Check if the database already exists
result = await conn.execute(text(f"SELECT 1 FROM pg_database WHERE datname = '{database_name}'"))
if result.scalar_one_or_none() is None:
await conn.execute(text("COMMIT"))
await conn.execute(text(f"CREATE DATABASE {database_name}"))
print(f"Database '{database_name}' created successfully")
else:
print(f"Database '{database_name}' already exists")
await pool.dispose()
await create_db(database_name=database_name, connector=connector)
Set up your Database
In the following steps, we set up our database to be ready to handle Natural Language Queries.
Create tables and populate data: The provided schema and data are designed to support the fundamental operations of an online retail business, with potential applications extending to customer management, analytics, marketing, and operational aspects.
Configure Model Endpoint: To use AlloyDB AI natural language, make sure that the Vertex AI endpoint is configured. Then you create a configuration and register a schema.
g_alloydb_ai_nl.g_create_configuration
creates the model.Automated Context Generation: To provide accurate answers to natural language questions, you use the AlloyDB AI natural language API to provide context about tables, views, and columns. You can use the automated context generation feature to produce context from tables and columns, and apply the context as COMMENTS attached to tables, views, and columns.
setup_queries = [
# Install required extension to use the AlloyDB AI natural language support API
"""CREATE EXTENSION IF NOT EXISTS alloydb_ai_nl cascade;""",
# Create schema
"""CREATE SCHEMA IF NOT EXISTS nla_demo;""",
# Create tables (If they do not exist)
"""
CREATE TABLE IF NOT EXISTS nla_demo.addresses (
address_id SERIAL PRIMARY KEY,
street_address VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
country VARCHAR(255)
);
""",
"""
CREATE TABLE IF NOT EXISTS nla_demo.customers (
customer_id SERIAL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
address_id INTEGER REFERENCES nla_demo.addresses(address_id),
date_of_birth DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""",
"""
CREATE TABLE IF NOT EXISTS nla_demo.categories (
category_id INTEGER PRIMARY KEY,
category_name VARCHAR(255) UNIQUE NOT NULL
);
""",
"""
CREATE TABLE IF NOT EXISTS nla_demo.brands (
brand_id INTEGER PRIMARY KEY,
brand_name VARCHAR(255) NOT NULL
);
""",
"""
CREATE TABLE IF NOT EXISTS nla_demo.products (
product_id INTEGER PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
brand_id INTEGER REFERENCES nla_demo.brands(brand_id),
category_id INTEGER REFERENCES nla_demo.categories(category_id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""",
"""
CREATE TABLE IF NOT EXISTS nla_demo.orders (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER REFERENCES nla_demo.customers(customer_id),
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_amount DECIMAL(10, 2) NOT NULL,
shipping_address_id INTEGER REFERENCES nla_demo.addresses(address_id),
billing_address_id INTEGER REFERENCES nla_demo.addresses(address_id),
order_status VARCHAR(50)
);
""",
"""
CREATE TABLE IF NOT EXISTS nla_demo.order_items (
order_item_id SERIAL PRIMARY KEY,
order_id INTEGER REFERENCES nla_demo.orders(order_id),
product_id INTEGER REFERENCES nla_demo.products(product_id),
quantity INTEGER NOT NULL,
price DECIMAL(10, 2) NOT NULL
);
""",
# Populate tables (Only if they are existing and empty)
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM nla_demo.addresses) THEN
INSERT INTO nla_demo.addresses (street_address, city, country)
VALUES
('1800 Amphibious Blvd', 'Mountain View', 'USA'),
('Avenida da Pastelaria, 1903', 'Lisbon', 'Portugal'),
('8 Rue du Nom Fictif 341', 'Paris', 'France');
END IF;
END $$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM nla_demo.customers) THEN
INSERT INTO nla_demo.customers (first_name, last_name, email, address_id, date_of_birth)
VALUES
('Alex', 'B.', 'alex.b@example.com', 1, '2003-02-20'),
('Amal', 'M.', 'amal.m@example.com', 2, '1998-11-08'),
('Dani', 'G.', 'dani.g@example.com', 3, '2002-07-25');
END IF;
END $$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM nla_demo.categories) THEN
INSERT INTO nla_demo.categories (category_id, category_name)
VALUES
(1, 'Accessories'),
(2, 'Apparel'),
(3, 'Footwear'),
(4, 'Swimwear');
END IF;
END $$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM nla_demo.brands) THEN
INSERT INTO nla_demo.brands (brand_id, brand_name)
VALUES
(1, 'CymbalPrime'),
(2, 'CymbalPro'),
(3, 'CymbalSports');
END IF;
END $$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM nla_demo.products) THEN
INSERT INTO nla_demo.products (product_id, brand_id, category_id, name)
VALUES
(1, 1, 2, 'Hoodie'),
(2, 1, 3, 'Running Shoes'),
(3, 2, 4, 'Swimsuit'),
(4, 3, 1, 'Tote Bag'),
(5, 3, 3, 'CymbalShoe');
END IF;
END $$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM nla_demo.orders) THEN
INSERT INTO nla_demo.orders (order_id, customer_id, total_amount, shipping_address_id, billing_address_id, order_status)
VALUES
(1, 1, 99.99, 1, 1, 'Shipped'),
(2, 1, 69.99, 1, 1, 'Delivered'),
(3, 2, 20.99, 2, 2, 'Processing'),
(4, 3, 79.99, 3, 3, 'Shipped');
END IF;
END $$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM nla_demo.order_items) THEN
INSERT INTO nla_demo.order_items (order_id, product_id, quantity, price)
VALUES
(1, 1, 1, 79.99),
(1, 3, 1, 20.00),
(2, 4, 1, 69.99),
(3, 3, 1, 20.00),
(4, 2, 1, 79.99);
END IF;
END $$;
""",
# Create a natural language configuration
# alloydb_ai_nl.g_create_configuration creates the model.
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 from alloydb_ai_nl.g_magic_configuration where configuration_id = 'nla_demo_cfg') THEN
PERFORM alloydb_ai_nl.g_create_configuration( 'nla_demo_cfg' );
END IF;
END $$;
""",
"""
SELECT alloydb_ai_nl.g_manage_configuration(
operation => 'register_table_view',
configuration_id_in => 'nla_demo_cfg',
table_views_in=>'{nla_demo.customers, nla_demo.addresses, nla_demo.brands, nla_demo.products, nla_demo.categories, nla_demo.orders, nla_demo.order_items}'
);
""",
# Create and apply context
"""
SELECT alloydb_ai_nl.generate_schema_context(
'nla_demo_cfg',
TRUE
);
""",
# Enable parametrized views to get AlloyDB query responses
"""
CREATE EXTENSION IF NOT EXISTS parameterized_views;
"""
]
from google.cloud.alloydb.connector import AsyncConnector
# Create table and insert data
async def run_setup(pool):
async with pool.connect() as db_conn:
for query in setup_queries:
await db_conn.execute(sqlalchemy.text(query))
await db_conn.commit()
pool = await init_connection_pool(connector, database_name)
await run_setup(pool)
Set up NL to SQL capability of AlloyDB
Now, we will be doing some setup to ensure the working of the NL to SQL capability of AlloyBD
- Verify and updated generated context: Verify the generated context for the tables and update any that needs revision.
- Construct the value index: The AlloyDB AI natural language API produces accurate SQL queries by using value linking. Value linking associates value phrases in natural language statements with pre-registered concept types and column names which can enrich the natural language question.
- Define a query template: You can define templates to improve the quality of the answers produced by the AlloyDB AI natural language API.
verify_context_queries = [
"""
SELECT object_context
FROM alloydb_ai_nl.generated_schema_context_view
WHERE schema_object = 'nla_demo.products';
""",
"""
SELECT object_context
FROM alloydb_ai_nl.generated_schema_context_view
WHERE schema_object = 'nla_demo.products.name';
"""
]
update_context_queries = [
"""
SELECT alloydb_ai_nl.update_generated_relation_context(
'nla_demo.products',
'The "nla_demo.products" table stores product details such as ID, name, description, brand, category linkage, and record creation time.'
);
""",
"""
SELECT alloydb_ai_nl.update_generated_column_context(
'nla_demo.products.name',
'The "name" column in the "nla_demo.products" table contains the specific name or title of each product.'
);
"""
]
apply_generated_context_queries = [
"""
SELECT alloydb_ai_nl.apply_generated_relation_context(
'nla_demo.products', true
);
""",
"""
SELECT alloydb_ai_nl.apply_generated_column_context(
'nla_demo.products.name',
true
);
"""
]
define_product_name_context_queries = [
"""
DO $$
BEGIN
IF NOT EXISTS (select 1 from alloydb_ai_nl.concept_types_user_defined where type_name = 'product_name') THEN
PERFORM alloydb_ai_nl.add_concept_type(
concept_type_in => 'product_name',
match_function_in => 'alloydb_ai_nl.get_concept_and_value_generic_entity_name',
additional_info_in => '{
"description": "Concept type for product name.",
"examples": "SELECT alloydb_ai_nl.get_concept_and_value_generic_entity_name(''Camera'')" }'::jsonb
);
END IF;
END $$;
""",
"""
SELECT alloydb_ai_nl.associate_concept_type(
'nla_demo.products.name',
'product_name',
'nla_demo_cfg'
);
""",
]
verify_product_name_concept_query = "SELECT alloydb_ai_nl.list_concept_types();"
verify_product_name_association_query = """
SELECT *
FROM alloydb_ai_nl.value_index_columns
WHERE column_names = 'nla_demo.products.name';
"""
define_brand_name_concept_queries = [
"""
DO $$
BEGIN
IF NOT EXISTS (select 1 from alloydb_ai_nl.concept_types_user_defined where type_name = 'brand_name') THEN
PERFORM alloydb_ai_nl.add_concept_type(
concept_type_in => 'brand_name',
match_function_in => 'alloydb_ai_nl.get_concept_and_value_generic_entity_name',
additional_info_in => '{
"description": "Concept type for brand name.",
"examples": "SELECT alloydb_ai_nl.get_concept_and_value_generic_entity_name(''CymbalPrime'')" }'::jsonb
);
END IF;
END $$;
""",
"""
SELECT alloydb_ai_nl.associate_concept_type(
'nla_demo.brands.brand_name',
'brand_name',
'nla_demo_cfg'
);
""",
]
construct_value_index_queries = [
"""SELECT alloydb_ai_nl.create_value_index('nla_demo_cfg');""",
"""SELECT alloydb_ai_nl.refresh_value_index('nla_demo_cfg');"""
]
query_template_sql = """
SELECT c.first_name, c.last_name FROM nla_demo.Customers c JOIN nla_demo.orders o ON c.customer_id = o.customer_id JOIN nla_demo.order_items oi ON o.order_id = oi.order_id JOIN nla_demo.products p ON oi.product_id = p.product_id AND p.name = ''Swimsuit''
"""
define_query_template_query = f"""
DO $$
BEGIN
IF NOT EXISTS (select 1 from alloydb_ai_nl.g_template_store where template_sql = '{query_template_sql}') THEN
PERFORM alloydb_ai_nl.add_template(
nl_config_id => 'nla_demo_cfg',
intent => 'List the first names and the last names of all customers who ordered Swimsuit.',
sql => '{query_template_sql}',
sql_explanation => 'To answer this question, JOIN `nla_demo.Customers` with `nla_demo.orders` on having the same `customer_id`, and JOIN the result with nla_demo.order_items on having the same `order_id`. Then JOIN the result with `nla_demo.products` on having the same `product_id`, and filter rwos that with p.name = ''Swimsuit''. Return the `first_name` and the `last_name` of the customers with matching records.',
check_intent => TRUE
);
END IF;
END $$;
"""
view_added_template_query = """
SELECT nl, sql, intent, psql, pintent
FROM alloydb_ai_nl.template_store_view
WHERE config = 'nla_demo_cfg';
"""
async def run_queries(pool):
async with pool.connect() as db_conn:
# Verify the generated context for the tables
for query in verify_context_queries:
response = await db_conn.execute(sqlalchemy.text(query))
print("Verify the context:", response.mappings().all())
# Update context that needs revision
for query in update_context_queries:
await db_conn.execute(sqlalchemy.text(query))
# The resulting context entries in the alloydb_ai_nl.generated_schema_context_view
# view are applied to the corresponding schema objects, and the comments are overwritten.
for query in apply_generated_context_queries:
await db_conn.execute(sqlalchemy.text(query))
# Define the product_name concept type and associate it with the nla_demo.products.name column
for query in define_product_name_context_queries:
await db_conn.execute(sqlalchemy.text(query))
# Verify that the product_name concept type is added to the list of concept types
response = await db_conn.execute(sqlalchemy.text(verify_product_name_concept_query))
print("Verify the product name concept:", response.mappings().all())
# Verify that the nla_demo.products.name column is associated with the product_name concept type
response = await db_conn.execute(sqlalchemy.text(verify_product_name_association_query))
print("Verify the product name association:", response.mappings().all())
# Define the brand_name concept type and associate it with the nla_demo.brands.brand_name column
for query in define_brand_name_concept_queries:
await db_conn.execute(sqlalchemy.text(query))
# Create a value index
for query in construct_value_index_queries:
await db_conn.execute(sqlalchemy.text(query))
# Add a template (This helps in improving accuracy for critical questions)
await db_conn.execute(sqlalchemy.text(define_query_template_query))
# View a list of added templates
response = await db_conn.execute(sqlalchemy.text(view_added_template_query))
print("View added template:", response.mappings().all())
await db_conn.commit()
# pool = await init_connection_pool(connector, database_name)
await run_queries(pool)
await pool.dispose()
Step 2: Set up Toolbox
Here, we will set up the Toolbox server to interact with our AlloyDB Database.
version = "0.11.0" # x-release-please-version
! curl -L -o /content/toolbox https://storage.googleapis.com/genai-toolbox/v{version}/linux/amd64/toolbox
# Make the binary executable
! chmod +x /content/toolbox
TOOLBOX_BINARY_PATH = "/content/toolbox"
SERVER_PORT = 5000
# Create a tools file
tools_file_name = "tools.yml"
file_content = f"""
sources:
my-alloydb-pg-source:
kind: alloydb-postgres
project: {project_id}
region: {region}
cluster: {cluster_name}
instance: {instance_name}
database: {database_name}
user: postgres
password: {password}
tools:
ask_questions:
kind: alloydb-ai-nl
source: my-alloydb-pg-source
description: 'Ask any natural language questions about the tables'
nlConfig: 'nla_demo_cfg'
basic_sql:
kind: postgres-sql
source: my-alloydb-pg-source
description: 'Check if db is connected'
statement: SELECT * from nla_demo.products;
"""
# Write the file content into the tools file.
! echo "{file_content}" > "{tools_file_name}"
TOOLS_FILE_PATH = f"/content/{tools_file_name}"
# Start a toolbox server
! nohup {TOOLBOX_BINARY_PATH} --tools-file {TOOLS_FILE_PATH} --log-level debug --logging-format json -p {SERVER_PORT} > toolbox.log 2>&1 &
# Check if toolbox is running
!sudo lsof -i :{SERVER_PORT}
# Kill process at port
# !lsof -t -i :{SERVER_PORT} | xargs kill -9
Step 3: Connect Using ADK
Now, we will use ADK to connect to the server and answer natural language questions related to our database.
! pip install toolbox-core --quiet
! pip install google-adk --quiet
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.genai import types
from toolbox_core import ToolboxSyncClient
import os
# TODO(developer): replace this with your Google API key
os.environ['GOOGLE_API_KEY'] = "your-api-key"
toolbox_client = ToolboxSyncClient(f"http://127.0.0.1:{SERVER_PORT}")
prompt = """You're a helpful assistant to fetch queries for an e-commerce application."""
tools = toolbox_client.load_toolset()
root_agent = Agent(
model='gemini-2.0-flash',
name='alloy_nl_agent',
description='A helpful AI assistant.',
instruction=prompt,
tools=tools,
)
session_service = InMemorySessionService()
artifacts_service = InMemoryArtifactService()
session = await session_service.create_session(
state={}, app_name='alloy_nl_agent', user_id='123'
)
runner = Runner(
app_name='alloy_nl_agent',
agent=root_agent,
artifact_service=artifacts_service,
session_service=session_service,
)
queries = [
"Find the customers who purchased Tote Bag.",
"List the maximum price of any CymbalShoe.", # This will return null. This data is not in the dataset.
"List the maximum price of any CymbalPrime.",
"Find the last name of the customers who live in France."
]
for query in queries:
content = types.Content(role='user', parts=[types.Part(text=query)])
events = runner.run(session_id=session.id,
user_id='123', new_message=content)
responses = (
part.text
for event in events
for part in event.content.parts
if part.text is not None
)
for text in responses:
print(text)