Getting started with alloydb-ai-nl tool

An end to end tutorial for building an ADK agent using the AlloyDB AI NL tool.

Open In
Colab

Objective

In this notebook, we will be using the AlloyDB AI NL tool to generate and execute SQL queries on AlloyDB databases. We will start by setting up an AlloyDB database and then connecting to it using MCP Toolbox and ADK. This allows us to leverage AlloyDB NL to SQL capabilities in agentic applications.

For detailed information on the AlloyDB capability, please see AlloyDB AI Natural Language Overview.

Before you Begin

Complete the before you begin setup from Use AlloyDB AI natural language to generate SQL.

Step 1: Set up Database

Here, we will set up a sample AlloyDB database for testing out our alloydb-ai-nl tool.

Authenticate to Google Cloud within Colab

You need to authenticate as an IAM user so this notebook can access your Google Cloud Project. This access is necessary to use Google’s LLM models.

# Run this and allow access through the pop-up
from google.colab import auth

auth.authenticate_user()

Connect Your Google Cloud Project

# @markdown Please fill in the value below with your GCP project ID and then run the cell.

# Please fill in these values.
project_id = "project-id"  # @param {type:"string"}

# Quick input validations.
assert project_id, "⚠️ Please provide a Google Cloud project ID"

# Configure gcloud.
!gcloud config set project {project_id}

Set up AlloyDB

You will need a Postgres AlloyDB instance for the following stages of this notebook. Please set the following variables to connect to your instance or create a new instance.

Please make sure that your instance has the required access.

# @markdown Please fill in the both the Google Cloud region and name of your AlloyDB instance. Once filled in, run the cell.

# Please fill in these values.
region = "us-central1"  # @param {type:"string"}
cluster_name = "alloydb-ai-nl-testing"  # @param {type:"string"}
instance_name = "alloydb-ai-nl-testing-instance"  # @param {type:"string"}
database_name = "ai_nl_tool_testing"  # @param {type:"string"}
password = input("Please provide a password to be used for 'postgres' database user: ")
# Quick input validations.
assert region, "⚠️ Please provide a Google Cloud region"
assert instance_name, "⚠️ Please provide the name of your instance"
assert database_name, "⚠️ Please provide the name of your database_name"
assert cluster_name, "⚠️ Please provide the name of your cluster_name"
assert password, "⚠️ Please provide your password"

Connect to AlloyDB

%pip install \
  google-cloud-alloydb-connector[asyncpg]==1.4.0 \
  sqlalchemy==2.0.36 \
  vertexai==1.70.0 \
  asyncio==3.4.3 \
  greenlet==3.1.1 \
  --quiet
This function will create a connection pool to your AlloyDB instance using the AlloyDB Python connector. The AlloyDB Python connector will automatically create secure connections to your AlloyDB instance using mTLS.
import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.alloydb.connector import AsyncConnector, IPTypes

async def init_connection_pool(connector: AsyncConnector, db_name: str, pool_size: int = 5) -> AsyncEngine:
    # initialize Connector object for connections to AlloyDB
    connection_string = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}/instances/{instance_name}"

    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect(
            connection_string,
            "asyncpg",
            user="postgres",
            password=password,
            db=db_name,
            ip_type=IPTypes.PUBLIC,
        )
        return conn

    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
        pool_size=pool_size,
        max_overflow=0,
    )
    return pool

connector = AsyncConnector()

Create a Database

Next, you will create database to store the data using the connection pool. Enabling public IP takes a few minutes, you may get an error that there is no public IP address. Please wait and retry this step if you hit an error!

from sqlalchemy import text, exc

async def create_db(database_name, connector):
    pool = await init_connection_pool(connector, "postgres")
    async with pool.connect() as conn:
        # Check if the database already exists
        result = await conn.execute(text(f"SELECT 1 FROM pg_database WHERE datname = '{database_name}'"))
        if result.scalar_one_or_none() is None:
            await conn.execute(text("COMMIT"))
            await conn.execute(text(f"CREATE DATABASE {database_name}"))
            print(f"Database '{database_name}' created successfully")
        else:
            print(f"Database '{database_name}' already exists")
    await pool.dispose()

await create_db(database_name=database_name, connector=connector)

Set up your Database

In the following steps, we set up our database to be ready to handle Natural Language Queries.

  1. Create tables and populate data: The provided schema and data are designed to support the fundamental operations of an online retail business, with potential applications extending to customer management, analytics, marketing, and operational aspects.

  2. Configure Model Endpoint: To use AlloyDB AI natural language, make sure that the Vertex AI endpoint is configured. Then you create a configuration and register a schema. g_alloydb_ai_nl.g_create_configuration creates the model.

  3. Automated Context Generation: To provide accurate answers to natural language questions, you use the AlloyDB AI natural language API to provide context about tables, views, and columns. You can use the automated context generation feature to produce context from tables and columns, and apply the context as COMMENTS attached to tables, views, and columns.

setup_queries = [
  # Install required extension to use the AlloyDB AI natural language support API
  """CREATE EXTENSION IF NOT EXISTS alloydb_ai_nl cascade;""",

  # Create schema
  """CREATE SCHEMA IF NOT EXISTS nla_demo;""",

  # Create tables (If they do not exist)
  """
  CREATE TABLE IF NOT EXISTS nla_demo.addresses (
    address_id      SERIAL         PRIMARY KEY,
    street_address  VARCHAR(255)   NOT NULL,
    city            VARCHAR(255)   NOT NULL,
    country         VARCHAR(255)
  );
  """,
  """
  CREATE TABLE IF NOT EXISTS nla_demo.customers (
    customer_id     SERIAL         PRIMARY KEY,
    first_name      VARCHAR(255)   NOT NULL,
    last_name       VARCHAR(255)   NOT NULL,
    email           VARCHAR(255)   UNIQUE NOT NULL,
    address_id      INTEGER        REFERENCES nla_demo.addresses(address_id),
    date_of_birth   DATE,
    created_at      TIMESTAMP      DEFAULT CURRENT_TIMESTAMP
  );
  """,
  """
  CREATE TABLE IF NOT EXISTS nla_demo.categories (
    category_id     INTEGER        PRIMARY KEY,
    category_name   VARCHAR(255)   UNIQUE NOT NULL
  );
  """,
  """
  CREATE TABLE IF NOT EXISTS nla_demo.brands (
    brand_id      INTEGER        PRIMARY KEY,
    brand_name    VARCHAR(255)   NOT NULL
  );
  """,
  """
  CREATE TABLE IF NOT EXISTS nla_demo.products (
    product_id    INTEGER        PRIMARY KEY,
    name          VARCHAR(255)   NOT NULL,
    description   TEXT,
    brand_id      INTEGER        REFERENCES nla_demo.brands(brand_id),
    category_id   INTEGER        REFERENCES nla_demo.categories(category_id),
    created_at    TIMESTAMP      DEFAULT CURRENT_TIMESTAMP
  );
  """,
  """
  CREATE TABLE IF NOT EXISTS nla_demo.orders (
      order_id            INTEGER        PRIMARY KEY,
      customer_id         INTEGER        REFERENCES nla_demo.customers(customer_id),
      order_date          TIMESTAMP      DEFAULT CURRENT_TIMESTAMP,
      total_amount        DECIMAL(10, 2) NOT NULL,
      shipping_address_id INTEGER        REFERENCES nla_demo.addresses(address_id),
      billing_address_id  INTEGER        REFERENCES nla_demo.addresses(address_id),
      order_status        VARCHAR(50)
  );
  """,
  """
  CREATE TABLE IF NOT EXISTS nla_demo.order_items (
      order_item_id   SERIAL         PRIMARY KEY,
      order_id        INTEGER        REFERENCES nla_demo.orders(order_id),
      product_id      INTEGER        REFERENCES nla_demo.products(product_id),
      quantity        INTEGER        NOT NULL,
      price           DECIMAL(10, 2) NOT NULL
  );
  """,

  # Populate tables (Only if they are existing and empty)
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 FROM nla_demo.addresses) THEN
      INSERT INTO nla_demo.addresses (street_address, city, country)
      VALUES
        ('1800 Amphibious Blvd', 'Mountain View', 'USA'),
        ('Avenida da Pastelaria, 1903', 'Lisbon', 'Portugal'),
        ('8 Rue du Nom Fictif 341', 'Paris', 'France');
    END IF;
  END $$;
  """,
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 FROM nla_demo.customers) THEN
      INSERT INTO nla_demo.customers (first_name, last_name, email, address_id, date_of_birth)
      VALUES
        ('Alex', 'B.', 'alex.b@example.com', 1, '2003-02-20'),
        ('Amal', 'M.', 'amal.m@example.com', 2, '1998-11-08'),
        ('Dani', 'G.', 'dani.g@example.com', 3, '2002-07-25');
    END IF;
  END $$;
  """,
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 FROM nla_demo.categories) THEN
      INSERT INTO nla_demo.categories (category_id, category_name)
      VALUES
        (1, 'Accessories'),
        (2, 'Apparel'),
        (3, 'Footwear'),
        (4, 'Swimwear');
    END IF;
  END $$;
  """,
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 FROM nla_demo.brands) THEN
      INSERT INTO nla_demo.brands (brand_id, brand_name)
      VALUES
        (1, 'CymbalPrime'),
        (2, 'CymbalPro'),
        (3, 'CymbalSports');
    END IF;
  END $$;
  """,
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 FROM nla_demo.products) THEN
      INSERT INTO nla_demo.products (product_id, brand_id, category_id, name)
      VALUES
        (1, 1, 2, 'Hoodie'),
        (2, 1, 3, 'Running Shoes'),
        (3, 2, 4, 'Swimsuit'),
        (4, 3, 1, 'Tote Bag'),
        (5, 3, 3, 'CymbalShoe');
    END IF;
  END $$;
  """,
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 FROM nla_demo.orders) THEN
      INSERT INTO nla_demo.orders (order_id, customer_id, total_amount, shipping_address_id, billing_address_id, order_status)
      VALUES
        (1, 1, 99.99, 1, 1, 'Shipped'),
        (2, 1, 69.99, 1, 1, 'Delivered'),
        (3, 2, 20.99, 2, 2, 'Processing'),
        (4, 3, 79.99, 3, 3, 'Shipped');
    END IF;
  END $$;
  """,
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 FROM nla_demo.order_items) THEN
      INSERT INTO nla_demo.order_items (order_id, product_id, quantity, price)
      VALUES
        (1, 1, 1, 79.99),
        (1, 3, 1, 20.00),
        (2, 4, 1, 69.99),
        (3, 3, 1, 20.00),
        (4, 2, 1, 79.99);
    END IF;
  END $$;
  """,

  # Create a natural language configuration 
  # alloydb_ai_nl.g_create_configuration creates the model.
  """
  DO $$
  BEGIN
    IF NOT EXISTS (SELECT 1 from alloydb_ai_nl.g_magic_configuration where configuration_id = 'nla_demo_cfg') THEN
     PERFORM alloydb_ai_nl.g_create_configuration( 'nla_demo_cfg' );
    END IF;
  END $$;
  """,
  """
  SELECT alloydb_ai_nl.g_manage_configuration(
    operation => 'register_table_view',
    configuration_id_in => 'nla_demo_cfg',
    table_views_in=>'{nla_demo.customers, nla_demo.addresses, nla_demo.brands, nla_demo.products, nla_demo.categories, nla_demo.orders, nla_demo.order_items}'
  );
  """,

  # Create and apply context
  """
  SELECT alloydb_ai_nl.generate_schema_context(
    'nla_demo_cfg',
    TRUE
  );
  """,

  # Enable parametrized views to get AlloyDB query responses
  """
  CREATE EXTENSION IF NOT EXISTS parameterized_views;
  """
]
from google.cloud.alloydb.connector import AsyncConnector

# Create table and insert data
async def run_setup(pool):
  async with pool.connect() as db_conn:
    for query in setup_queries:
      await db_conn.execute(sqlalchemy.text(query))
    await db_conn.commit()
pool = await init_connection_pool(connector, database_name)
await run_setup(pool)

Set up NL to SQL capability of AlloyDB

Now, we will be doing some setup to ensure the working of the NL to SQL capability of AlloyBD

  1. Verify and updated generated context: Verify the generated context for the tables and update any that needs revision.
  2. Construct the value index: The AlloyDB AI natural language API produces accurate SQL queries by using value linking. Value linking associates value phrases in natural language statements with pre-registered concept types and column names which can enrich the natural language question.
  3. Define a query template: You can define templates to improve the quality of the answers produced by the AlloyDB AI natural language API.
verify_context_queries = [
  """
  SELECT object_context
  FROM alloydb_ai_nl.generated_schema_context_view
  WHERE schema_object = 'nla_demo.products';
  """,
  """
  SELECT object_context
  FROM alloydb_ai_nl.generated_schema_context_view
  WHERE schema_object = 'nla_demo.products.name';
  """
]


update_context_queries = [
  """
  SELECT alloydb_ai_nl.update_generated_relation_context(
    'nla_demo.products',
    'The "nla_demo.products" table stores product details such as ID, name, description, brand, category linkage, and record creation time.'
  );
  """,
  """
  SELECT alloydb_ai_nl.update_generated_column_context(
    'nla_demo.products.name',
    'The "name" column in the "nla_demo.products" table contains the specific name or title of each product.'
  );
  """
]

apply_generated_context_queries = [
  """
  SELECT alloydb_ai_nl.apply_generated_relation_context(
    'nla_demo.products', true
  );
  """,
  """
  SELECT alloydb_ai_nl.apply_generated_column_context(
    'nla_demo.products.name',
    true
  );
  """
]

define_product_name_context_queries = [
  """
  DO $$
  BEGIN
    IF NOT EXISTS (select 1 from alloydb_ai_nl.concept_types_user_defined where type_name = 'product_name') THEN
      PERFORM alloydb_ai_nl.add_concept_type(
        concept_type_in => 'product_name',
        match_function_in => 'alloydb_ai_nl.get_concept_and_value_generic_entity_name',
        additional_info_in => '{
          "description": "Concept type for product name.",
          "examples": "SELECT alloydb_ai_nl.get_concept_and_value_generic_entity_name(''Camera'')" }'::jsonb
      );
    END IF;
  END $$;
  """,
  """
  SELECT alloydb_ai_nl.associate_concept_type(
    'nla_demo.products.name',
    'product_name',
    'nla_demo_cfg'
  );
  """,
]

verify_product_name_concept_query = "SELECT alloydb_ai_nl.list_concept_types();"

verify_product_name_association_query = """
  SELECT *
  FROM alloydb_ai_nl.value_index_columns
  WHERE column_names = 'nla_demo.products.name';
"""

define_brand_name_concept_queries = [
  """
  DO $$
  BEGIN
    IF NOT EXISTS (select 1 from alloydb_ai_nl.concept_types_user_defined where type_name = 'brand_name') THEN
      PERFORM alloydb_ai_nl.add_concept_type(
        concept_type_in => 'brand_name',
        match_function_in => 'alloydb_ai_nl.get_concept_and_value_generic_entity_name',
        additional_info_in => '{
        "description": "Concept type for brand name.",
        "examples": "SELECT alloydb_ai_nl.get_concept_and_value_generic_entity_name(''CymbalPrime'')" }'::jsonb
      );
    END IF;
  END $$;
  """,
  """
  SELECT alloydb_ai_nl.associate_concept_type(
    'nla_demo.brands.brand_name',
    'brand_name',
    'nla_demo_cfg'
  );
  """,
]

construct_value_index_queries = [
  """SELECT alloydb_ai_nl.create_value_index('nla_demo_cfg');""",
  """SELECT alloydb_ai_nl.refresh_value_index('nla_demo_cfg');"""
]

query_template_sql = """
  SELECT c.first_name, c.last_name FROM nla_demo.Customers c JOIN nla_demo.orders o ON c.customer_id = o.customer_id JOIN nla_demo.order_items oi ON o.order_id = oi.order_id JOIN nla_demo.products p ON oi.product_id = p.product_id  AND p.name = ''Swimsuit''
"""

define_query_template_query = f"""
  DO $$
  BEGIN
    IF NOT EXISTS (select 1 from alloydb_ai_nl.g_template_store where template_sql = '{query_template_sql}') THEN
      PERFORM alloydb_ai_nl.add_template(
        nl_config_id => 'nla_demo_cfg',
        intent => 'List the first names and the last names of all customers who ordered Swimsuit.',
        sql => '{query_template_sql}',
        sql_explanation => 'To answer this question, JOIN `nla_demo.Customers` with `nla_demo.orders` on having the same `customer_id`, and JOIN the result with nla_demo.order_items on having the same `order_id`. Then JOIN the result with `nla_demo.products` on having the same `product_id`, and filter rwos that with p.name = ''Swimsuit''. Return the `first_name` and the `last_name` of the customers with matching records.',
        check_intent => TRUE
      );
    END IF;
  END $$;

"""

view_added_template_query = """
  SELECT nl, sql, intent, psql, pintent
  FROM alloydb_ai_nl.template_store_view
  WHERE config = 'nla_demo_cfg';
"""
async def run_queries(pool):
  async with pool.connect() as db_conn:
    # Verify the generated context for the tables
    for query in verify_context_queries:
      response = await db_conn.execute(sqlalchemy.text(query))
      print("Verify the context:", response.mappings().all())

    # Update context that needs revision
    for query in update_context_queries:
      await db_conn.execute(sqlalchemy.text(query))

    # The resulting context entries in the alloydb_ai_nl.generated_schema_context_view 
    # view are applied to the corresponding schema objects, and the comments are overwritten.
    for query in apply_generated_context_queries:
      await db_conn.execute(sqlalchemy.text(query))

    # Define the product_name concept type and associate it with the nla_demo.products.name column
    for query in define_product_name_context_queries:
      await db_conn.execute(sqlalchemy.text(query))

    # Verify that the product_name concept type is added to the list of concept types
    response = await db_conn.execute(sqlalchemy.text(verify_product_name_concept_query))
    print("Verify the product name concept:", response.mappings().all())

    # Verify that the nla_demo.products.name column is associated with the product_name concept type
    response = await db_conn.execute(sqlalchemy.text(verify_product_name_association_query))
    print("Verify the product name association:", response.mappings().all())

    # Define the brand_name concept type and associate it with the nla_demo.brands.brand_name column
    for query in define_brand_name_concept_queries:
      await db_conn.execute(sqlalchemy.text(query))

    # Create a value index
    for query in construct_value_index_queries:
      await db_conn.execute(sqlalchemy.text(query))

    # Add a template (This helps in improving accuracy for critical questions)
    await db_conn.execute(sqlalchemy.text(define_query_template_query))

    # View a list of added templates
    response = await db_conn.execute(sqlalchemy.text(view_added_template_query))
    print("View added template:", response.mappings().all())

    await db_conn.commit()

# pool = await init_connection_pool(connector, database_name)
await run_queries(pool)
await pool.dispose()

Step 2: Set up Toolbox

Here, we will set up the Toolbox server to interact with our AlloyDB Database.

Download the latest version of Toolbox as a binary.
version = "0.11.0" # x-release-please-version
! curl -L -o /content/toolbox https://storage.googleapis.com/genai-toolbox/v{version}/linux/amd64/toolbox

# Make the binary executable
! chmod +x /content/toolbox
TOOLBOX_BINARY_PATH = "/content/toolbox"
SERVER_PORT = 5000
# Create a tools file
tools_file_name = "tools.yml"
file_content = f"""
sources:
  my-alloydb-pg-source:
    kind: alloydb-postgres
    project: {project_id}
    region: {region}
    cluster: {cluster_name}
    instance: {instance_name}
    database: {database_name}
    user: postgres
    password: {password}
tools:
  ask_questions:
    kind: alloydb-ai-nl
    source: my-alloydb-pg-source
    description: 'Ask any natural language questions about the tables'
    nlConfig: 'nla_demo_cfg'
  basic_sql:
    kind: postgres-sql
    source: my-alloydb-pg-source
    description: 'Check if db is connected'
    statement: SELECT * from nla_demo.products;
"""
# Write the file content into the tools file.
! echo "{file_content}" > "{tools_file_name}"
TOOLS_FILE_PATH = f"/content/{tools_file_name}"
# Start a toolbox server
! nohup {TOOLBOX_BINARY_PATH} --tools-file {TOOLS_FILE_PATH}  --log-level debug --logging-format json -p {SERVER_PORT} > toolbox.log 2>&1 &
# Check if toolbox is running
!sudo lsof -i :{SERVER_PORT}
# Kill process at port
# !lsof -t -i :{SERVER_PORT} | xargs kill -9

Step 3: Connect Using ADK

Now, we will use ADK to connect to the server and answer natural language questions related to our database.

! pip install toolbox-core --quiet
! pip install google-adk --quiet
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.genai import types
from toolbox_core import ToolboxSyncClient

import os

# TODO(developer): replace this with your Google API key
os.environ['GOOGLE_API_KEY'] = "your-api-key"

toolbox_client = ToolboxSyncClient(f"http://127.0.0.1:{SERVER_PORT}")

prompt = """You're a helpful assistant to fetch queries for an e-commerce application."""

tools = toolbox_client.load_toolset()

root_agent = Agent(
    model='gemini-2.0-flash',
    name='alloy_nl_agent',
    description='A helpful AI assistant.',
    instruction=prompt,
    tools=tools,
)

session_service = InMemorySessionService()
artifacts_service = InMemoryArtifactService()
session = await session_service.create_session(
    state={}, app_name='alloy_nl_agent', user_id='123'
)
runner = Runner(
    app_name='alloy_nl_agent',
    agent=root_agent,
    artifact_service=artifacts_service,
    session_service=session_service,
)

queries = [
  "Find the customers who purchased Tote Bag.",
  "List the maximum price of any CymbalShoe.", # This will return null. This data is not in the dataset.
  "List the maximum price of any CymbalPrime.",
  "Find the last name of the customers who live in France."
]

for query in queries:
  content = types.Content(role='user', parts=[types.Part(text=query)])
  events = runner.run(session_id=session.id,
                      user_id='123', new_message=content)

  responses = (
    part.text
    for event in events
    for part in event.content.parts
    if part.text is not None
  )

  for text in responses:
    print(text)