SQL Validator v2 - Production-Grade Implementation
Overview
The new SQL validator (validator_v2.py) replaces regex-based validation with proper AST parsing, providing production-grade security and semantic validation for LLM-generated SQL.
Critical Issues Fixed
1. ❌ Regex-Based Validation → ✅ AST Parsing
Old Approach:
# Brittle regex matching
if re.search(rf"\b{keyword}\b", sql_upper):
errors.append(f"Forbidden keyword detected: {keyword}")
Problems:
- Easy to bypass with comments:
/* DROP */ SELECT - False positives on valid queries
- Can’t understand context
- Misses complex constructs
New Approach:
# Parse SQL into AST
parsed = sqlparse.parse(sql)
statement = parsed[0]
stmt_type = statement.get_type() # Returns 'SELECT', 'INSERT', etc.
Benefits:
- Understands SQL structure
- Context-aware validation
- Handles complex SQL correctly
- No false positives from string matching
2. ❌ Missing Patterns → ✅ Comprehensive Validation
Old Code (with ... placeholders):
FORBIDDEN_PATTERNS = {
r";.*?(?:DROP|DELETE)": "multiple statements",
r"--.*": "SQL comments",
... # Incomplete!
}
New Implementation:
- ✅ Security validation (SQL injection, dangerous operations)
- ✅ Structure validation (CTEs, subqueries, JOINs, nesting depth)
- ✅ Schema validation (table/column existence)
- ✅ Performance validation (SELECT *, missing WHERE, excessive complexity)
- ✅ Function validation (whitelist approach)
- ✅ LIMIT clause validation
3. ❌ Can’t Handle Complex SQL → ✅ Full SQL Support
Old Validator Fails On:
-- CTEs (Common Table Expressions)
WITH monthly_revenue AS (
SELECT date_trunc('month', order_date) AS month, SUM(amount) AS revenue
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY 1
)
SELECT * FROM monthly_revenue LIMIT 100;
-- ❌ Old: Rejects due to "WITH" keyword confusion
-- Nested queries
SELECT customer_id,
(SELECT COUNT(*) FROM orders WHERE orders.customer_id = customers.id) as order_count
FROM customers
WHERE country = 'USA'
LIMIT 100;
-- ❌ Old: Can't track subquery context
-- Type casting
SELECT
customer_id,
revenue::numeric(10,2) AS formatted_revenue,
CAST(order_date AS DATE) AS order_day
FROM orders
LIMIT 100;
-- ❌ Old: Rejects :: as suspicious
-- Quoted identifiers
SELECT "Customer Name", "Order Total"
FROM "Sales Data"
WHERE "Region" = 'West'
LIMIT 100;
-- ❌ Old: Confused by quotes
New Validator Handles All:
- ✅ CTEs with proper depth tracking
- ✅ Nested subqueries with depth limits
- ✅ Type casting (CAST, ::)
- ✅ Quoted identifiers
- ✅ Complex functions (date_trunc, CASE WHEN, etc.)
- ✅ Window functions
- ✅ Array operations
Architecture
Validation Levels
class ValidationLevel(str, Enum):
STRICT = "strict" # Maximum security, minimal SQL features
MODERATE = "moderate" # Balanced (default)
PERMISSIVE = "permissive" # Minimal restrictions
Structured Errors
class ValidationError(BaseModel):
code: str # Error code (e.g., "FORBIDDEN_OPERATION")
message: str # Human-readable message
severity: str # "error", "warning", "info"
location: Optional[str] # Where in SQL
context: Optional[Dict] # Additional details
Validation Pipeline
Input SQL
↓
1. Parse SQL → AST
↓
2. Security Validation
- Forbidden operations (DROP, DELETE, etc.)
- SQL injection patterns
- Dangerous functions (xp_cmdshell, etc.)
- Multiple statements
↓
3. Structure Validation
- Statement type (must be SELECT)
- CTEs detection
- JOIN counting
- Nesting depth
- Function whitelisting
↓
4. Schema Validation (if semantic_context provided)
- Table existence
- Column existence
- Metric/dimension references
↓
5. Performance Validation
- SELECT * warnings
- Missing WHERE with JOINs
- Excessive nesting
↓
6. LIMIT Validation
- LIMIT presence
- LIMIT value < max
↓
Output: SQLValidationResult
Usage Examples
Basic Usage
from src.sql.validator_v2 import ProductionSQLValidator, ValidationLevel
validator = ProductionSQLValidator(
validation_level=ValidationLevel.MODERATE,
max_row_limit=10000,
require_limit=True
)
# Validate SQL
sql = """
SELECT customer_id, SUM(amount) as total_revenue
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY customer_id
LIMIT 100
"""
result = validator.validate(sql)
if result.is_valid:
print("✅ SQL is valid")
print(f"Metadata: {result.metadata}")
else:
print("❌ SQL validation failed:")
for error in result.errors:
print(f" [{error.code}] {error.message}")
# Warnings (non-blocking)
for warning in result.warnings:
print(f" ⚠️ [{warning.code}] {warning.message}")
With Semantic Context
semantic_context = {
"metrics": [
{"name": "revenue", "table": "orders", "column": "amount"},
{"name": "order_count", "table": "orders", "column": "id"}
],
"dimensions": [
{"name": "customer_id", "table": "orders", "column": "customer_id"},
{"name": "order_date", "table": "orders", "column": "order_date"}
]
}
validator = ProductionSQLValidator(
semantic_context=semantic_context,
validation_level=ValidationLevel.MODERATE
)
# This will validate table/column references
result = validator.validate(sql)
Strict Mode (Maximum Security)
validator = ProductionSQLValidator(
validation_level=ValidationLevel.STRICT,
max_query_depth=3, # Limit nesting
max_joins=5, # Limit JOINs
require_limit=True
)
# Only whitelisted functions allowed
sql = "SELECT customer_id, CUSTOM_FUNC(amount) FROM orders LIMIT 100"
result = validator.validate(sql)
# ❌ Error: Function not in allowed list: CUSTOM_FUNC
Validation Results
Example: Valid Query
sql = """
WITH monthly_sales AS (
SELECT
date_trunc('month', order_date) AS month,
SUM(amount) AS revenue
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY 1
)
SELECT month, revenue
FROM monthly_sales
ORDER BY month DESC
LIMIT 12
"""
result = validator.validate(sql)
Result:
{
"is_valid": true,
"errors": [],
"warnings": [],
"metadata": {
"query_length": 245,
"has_cte": true,
"has_subquery": false,
"join_count": 0,
"query_depth": 1,
"statement_type": "SELECT"
}
}
Example: Invalid Query (SQL Injection)
sql = "SELECT * FROM users WHERE username = 'admin' OR '1'='1' LIMIT 100"
result = validator.validate(sql)
Result:
{
"is_valid": false,
"errors": [
{
"code": "SQL_INJECTION_RISK",
"message": "SQL injection risk: SQL injection pattern: OR '1'='1'",
"severity": "error",
"location": "'\s*OR\s*'1'\s*=\s*'1"
}
],
"warnings": [
{
"code": "SELECT_STAR",
"message": "SELECT * may impact performance, consider explicit columns",
"severity": "warning"
}
],
"metadata": {...}
}
Example: Invalid Query (Forbidden Operation)
sql = "DROP TABLE users; SELECT * FROM customers LIMIT 100"
result = validator.validate(sql)
Result:
{
"is_valid": false,
"errors": [
{
"code": "FORBIDDEN_OPERATION",
"message": "Forbidden operation detected: DROP",
"severity": "error",
"context": {"operation": "DROP"}
},
{
"code": "MULTIPLE_STATEMENTS",
"message": "Multiple SQL statements not allowed",
"severity": "error",
"context": {"statement_count": 2}
}
]
}
Example: Warnings Only
sql = """
SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
JOIN categories cat ON p.category_id = cat.id
LIMIT 1000
"""
result = validator.validate(sql)
Result:
{
"is_valid": true,
"errors": [],
"warnings": [
{
"code": "SELECT_STAR",
"message": "SELECT * may impact performance, consider explicit columns",
"severity": "warning"
},
{
"code": "MISSING_WHERE",
"message": "Query with JOINs but no WHERE clause may be inefficient",
"severity": "warning",
"context": {"join_count": 3}
}
],
"metadata": {
"join_count": 3,
"query_depth": 1
}
}
Security Features
1. SQL Injection Protection
Detected Patterns:
' OR '1'='1'- Classic injection' OR 1=1 --- Comment-based injectionUNION SELECT- Union-based injection; DROP TABLE- Multiple statement injection/* */ DROP- Comment obfuscation
Example:
# All of these are blocked:
"SELECT * FROM users WHERE id = 1; DROP TABLE users"
"SELECT * FROM users WHERE name = '' OR '1'='1'"
"SELECT * FROM users UNION SELECT password FROM admin"
2. Dangerous Function Blocking
Blocked Functions:
- System access:
xp_cmdshell,system,shell - File operations:
load_file,into outfile,pg_read_file - Code execution:
exec,sp_executesql,dbms_java
Allowed Functions:
- Aggregates:
COUNT,SUM,AVG,MIN,MAX - String:
UPPER,LOWER,CONCAT,SUBSTRING - Date:
DATE_TRUNC,EXTRACT,NOW - Math:
ROUND,FLOOR,CEIL,ABS - Conditional:
CASE,COALESCE,NULLIF
3. Operation Restrictions
Forbidden Operations:
- DDL:
DROP,CREATE,ALTER - DML:
INSERT,UPDATE,DELETE,TRUNCATE - DCL:
GRANT,REVOKE
Only Allowed:
SELECTstatements
Performance Features
1. Complexity Limits
validator = ProductionSQLValidator(
max_query_depth=5, # Maximum subquery nesting
max_joins=10, # Maximum JOIN count
max_row_limit=10000 # Maximum LIMIT value
)
Prevents:
- Excessive subquery nesting (cartesian explosion)
- Too many JOINs (performance degradation)
- Unbounded result sets (memory issues)
2. Performance Warnings
Detected Issues:
SELECT *without explicit columns- JOINs without WHERE clause
- Deep nesting (> 3 levels)
- Missing indexes (if schema metadata available)
Schema Validation
Integration with Semantic Layer
# Load semantic context
semantic_context = {
"metrics": [
{
"name": "revenue",
"table": "orders",
"column": "amount",
"aggregation": "SUM"
}
],
"dimensions": [
{
"name": "customer_id",
"table": "orders",
"column": "customer_id"
}
]
}
validator = ProductionSQLValidator(semantic_context=semantic_context)
Validates:
- Table references exist in semantic layer
- Column references exist in tables
- Metric names match semantic definitions
- Dimension names match semantic definitions
Example:
# Valid - uses known tables/columns
sql = "SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id LIMIT 100"
# ✅ Pass
# Invalid - unknown table
sql = "SELECT * FROM unknown_table LIMIT 100"
# ⚠️ Warning: Table not found in semantic layer: unknown_table
# Invalid - unknown column
sql = "SELECT fake_column FROM orders LIMIT 100"
# ⚠️ Warning: Column fake_column not found in table orders
Migration Guide
Phase 1: Side-by-Side Testing
from src.sql.validator import SQLValidator # Old
from src.sql.validator_v2 import ProductionSQLValidator # New
# Run both validators
old_validator = SQLValidator()
new_validator = ProductionSQLValidator()
is_valid_old, errors_old = old_validator.validate(sql)
result_new = new_validator.validate(sql)
# Compare results
print(f"Old: {is_valid_old}, New: {result_new.is_valid}")
Phase 2: Gradual Rollout
import os
if os.getenv("USE_V2_VALIDATOR", "false") == "true":
from src.sql.validator_v2 import ProductionSQLValidator as SQLValidator
else:
from src.sql.validator import SQLValidator
Phase 3: Full Replacement
# Update imports everywhere
from src.sql.validator_v2 import ProductionSQLValidator
# Update orchestrator_v2.py
validator = ProductionSQLValidator(
semantic_context=semantic_context,
validation_level=ValidationLevel.MODERATE,
max_row_limit=self.max_row_limit
)
Testing Recommendations
1. Security Tests
def test_sql_injection_blocked():
validator = ProductionSQLValidator()
injection_attempts = [
"SELECT * FROM users WHERE id = 1; DROP TABLE users",
"SELECT * FROM users WHERE name = '' OR '1'='1'",
"SELECT * FROM users UNION SELECT password FROM admin",
]
for sql in injection_attempts:
result = validator.validate(sql)
assert not result.is_valid
assert any(e.code == "SQL_INJECTION_RISK" for e in result.errors)
2. Complex SQL Tests
def test_cte_support():
validator = ProductionSQLValidator(require_limit=True)
sql = """
WITH monthly_revenue AS (
SELECT date_trunc('month', order_date) AS month, SUM(amount) AS revenue
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY 1
)
SELECT month, revenue FROM monthly_revenue ORDER BY month DESC LIMIT 12
"""
result = validator.validate(sql)
assert result.is_valid
assert result.metadata['has_cte'] == True
assert result.metadata['query_depth'] == 1
3. Schema Validation Tests
def test_schema_validation():
semantic_context = {
"metrics": [{"name": "revenue", "table": "orders", "column": "amount"}],
"dimensions": [{"name": "customer_id", "table": "orders", "column": "customer_id"}]
}
validator = ProductionSQLValidator(semantic_context=semantic_context)
# Valid query
sql = "SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id LIMIT 100"
result = validator.validate(sql)
assert result.is_valid
# Invalid table
sql = "SELECT * FROM fake_table LIMIT 100"
result = validator.validate(sql)
assert len(result.warnings) > 0
assert any(w.code == "UNKNOWN_TABLE" for w in result.warnings)
Comparison: Old vs New
| Feature | Old Validator | New Validator |
|---|---|---|
| Parsing Method | ❌ Regex | ✅ AST (sqlparse) |
| CTEs | ❌ Not supported | ✅ Full support |
| Nested Queries | ❌ Basic check | ✅ Depth tracking |
| Type Casting | ❌ False positives | ✅ Correct handling |
| Quoted Identifiers | ❌ Fails | ✅ Handles correctly |
| Function Validation | ❌ Pattern matching | ✅ Whitelist with AST |
| SQL Injection | ⚠️ Basic patterns | ✅ Comprehensive |
| Schema Validation | ❌ None | ✅ Full semantic layer |
| Error Types | ❌ Strings | ✅ Structured (code, message, context) |
| Performance Warnings | ❌ None | ✅ SELECT *, JOINs, etc. |
| Metadata | ❌ None | ✅ Depth, JOINs, CTEs, etc. |
| False Positives | ❌ High | ✅ Minimal |
| Extensibility | ❌ Hardcoded | ✅ Configurable levels |
Future Enhancements
1. Cost Estimation
# Estimate query cost before execution
metadata["estimated_cost"] = validator.estimate_cost(sql, statistics)
2. Query Optimization Suggestions
# Suggest optimizations
result.suggestions = [
"Add index on orders.customer_id",
"Consider partitioning on order_date"
]
3. Machine Learning Integration
# Learn from query patterns
validator.learn_from_execution(sql, execution_time, row_count)
4. Custom Rules
# Add custom validation rules
validator.add_rule(CustomRule(
name="no_cross_joins",
check=lambda sql: "CROSS JOIN" not in sql.upper(),
error_code="FORBIDDEN_CROSS_JOIN"
))
Conclusion
The new SQL validator provides production-grade validation with:
✅ Proper AST parsing - No more regex hacks ✅ Complete SQL support - CTEs, nested queries, casts, etc. ✅ Security hardening - SQL injection, dangerous operations ✅ Semantic validation - Schema-aware checking ✅ Performance insights - Complexity warnings ✅ Structured errors - Typed errors with context ✅ Extensibility - Configurable validation levels
This addresses all the critical issues in the original validator and provides a solid foundation for LLM-generated SQL validation.