QueryOrchestrator Improvements
Overview
This document describes the critical improvements made to the QueryOrchestrator based on architectural review feedback. The improved version is available in src/integration/orchestrator_v2.py.
Key Issues Addressed
1. Real Result Caching ✅
Problem: Original implementation always returned cached=False, even when results came from cache.
Solution:
async def _execute_query(self, sql: str, enable_caching: bool) -> Tuple[List[Dict[str, Any]], bool]:
"""Execute query with REAL result caching."""
results = await self.query_executor.execute(sql)
# Check actual cache status from executor
cached = False
if hasattr(self.query_executor, 'last_query_was_cached'):
cached = self.query_executor.last_query_was_cached()
return results, cached # Return real cache status
Benefits:
- Accurate cache-hit metrics in performance tracking
- Users can see when queries are served from cache
- Better debugging and optimization insights
2. Multi-Level Validation ✅
Problem: Validation only happened on final SQL string, too late to catch plan-level errors.
Solution: Added two validation layers:
Layer 1: Plan-Level Validation
Validates query plan BEFORE SQL generation:
def _validate_plan(self, query_plan: QueryPlan, semantic_context: Dict[str, Any]) -> Tuple[bool, List[str]]:
"""Validate query plan before SQL generation."""
errors = []
# Check metrics exist
available_metrics = {m["name"] for m in semantic_context.get("metrics", [])}
for metric in query_plan.metrics:
if metric.name not in available_metrics:
errors.append(f"Unknown metric: {metric.name}")
# Check dimensions exist
available_dimensions = {d["name"] for d in semantic_context.get("dimensions", [])}
for dimension in query_plan.dimensions:
if dimension.name not in available_dimensions:
errors.append(f"Unknown dimension: {dimension.name}")
# Validate filters reference valid fields
# Validate time ranges
# etc.
return len(errors) == 0, errors
Layer 2: SQL AST Validation
Validates SQL structure using AST parsing:
def _validate_sql_ast(self, sql: str, semantic_context: Dict[str, Any]) -> Tuple[bool, List[str]]:
"""Validate SQL using AST parsing."""
errors = []
# Parse SQL
parsed = sqlparse.parse(sql)
statement = parsed[0]
# Check statement type (only SELECT allowed)
stmt_type = statement.get_type()
if stmt_type != 'SELECT':
errors.append(f"Only SELECT statements allowed, got {stmt_type}")
# Check for dangerous keywords
dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE']
for keyword in dangerous_keywords:
if keyword in sql.upper():
errors.append(f"Dangerous keyword detected: {keyword}")
# Check for DELETE/UPDATE without WHERE
# Verify column names
# etc.
return len(errors) == 0, errors
Benefits:
- Catch errors earlier in the pipeline
- Prevent invalid SQL generation
- Security: Block dangerous operations
- Better error messages with context
3. Typed Errors with Debug Info ✅
Problem: Generic Exception handling made debugging difficult. No error types or structured debug info.
Solution: Introduced typed errors with debug payloads:
class ErrorType(str, Enum):
"""Error types for typed error handling."""
VALIDATION_ERROR = "validation_error"
PLAN_ERROR = "plan_error"
SQL_GENERATION_ERROR = "sql_generation_error"
EXECUTION_ERROR = "execution_error"
LLM_ERROR = "llm_error"
ANALYTICS_ERROR = "analytics_error"
UNKNOWN_ERROR = "unknown_error"
class QueryError(BaseModel):
"""Structured error information."""
type: ErrorType
message: str
stage: str # Which pipeline stage failed
debug_info: Optional[Dict[str, Any]] = None # Debug payload
def _create_error(self, error_type: ErrorType, stage: str, message: str,
debug_info: Optional[Dict[str, Any]] = None) -> ValueError:
"""Create a typed error with metadata."""
error = ValueError(message)
error.type = error_type
error.stage = stage
error.debug_info = debug_info
return error
Example Error Response:
{
"success": false,
"error": {
"type": "plan_error",
"message": "Invalid query plan: Unknown metric: invalid_metric",
"stage": "plan_validation",
"debug_info": {
"plan": {"metrics": ["invalid_metric"], "dimensions": []},
"errors": ["Unknown metric: invalid_metric"]
}
}
}
Benefits:
- Clients can handle different error types appropriately
- Debug info provides context for troubleshooting
- Structured logging and monitoring
- Better error recovery strategies
4. Analytics on Full Result Set ✅
Problem: Analytics ran on paginated results, distorting statistics.
Solution: Run analytics BEFORE pagination:
async def execute_query(self, request: QueryRequest) -> QueryResponse:
"""Execute query with proper analytics pipeline."""
# Stage 6: Execute query (FULL result set - no pagination yet)
full_results, result_cached = await self._execute_query(sql, request.enable_caching)
# Stage 7: Run analytics on FULL result set (before pagination)
analytics = None
if request.enable_analytics and full_results:
analytics = self._calculate_analytics(full_results, query_plan)
# Analytics now computed on ALL rows
# Stage 8: Apply pagination AFTER analytics
if request.pagination:
results, pagination_metadata = paginate_results(
full_results,
len(full_results),
request.pagination
)
else:
results = full_results
Metadata in Analytics:
{
"revenue": {
"statistics": {
"count": 10000,
"mean": 54321.45,
"median": 45000.00,
"std_dev": 12345.67
}
},
"_metadata": {
"total_rows": 10000,
"numeric_columns_analyzed": 5,
"computed_on_full_dataset": true
}
}
Benefits:
- Accurate statistics (mean, median, std_dev)
- Correct anomaly detection
- Trustworthy insights for decision-making
- Transparent metadata about computation
Pipeline Flow
Original Flow (❌ Issues)
1. Load context
2. Extract intent (cached=false always)
3. Generate plan (no validation)
4. Generate SQL
5. Validate SQL string only
6. Execute + paginate together
7. Analytics on paginated results ❌
8. Return (cached=false always)
Improved Flow (✅ Fixed)
1. Load context
2. Extract intent (with REAL caching) ✅
3. Validate plan ✅
4. Generate SQL
5. Validate SQL string ✅
6. Validate SQL AST ✅
7. Execute (full results)
8. Analytics on full results ✅
9. Apply pagination
10. Return (with real cache status) ✅
Error Handling Comparison
Before
except Exception as e:
print(f"Query execution failed: {str(e)}")
return QueryResponse(
...
explanation=f"Failed to execute query: {str(e)}",
cached=False
)
Issues:
- No error type
- No stage information
- No debug payload
- Generic error message
After
except ValueError as e:
# Typed errors with debug info
if hasattr(e, 'type'):
error_data = QueryError(
type=e.type,
message=str(e),
stage=e.stage,
debug_info=e.debug_info
)
return QueryResponse(
success=False,
error=error_data,
...
)
Improvements:
- Typed errors (VALIDATION_ERROR, EXECUTION_ERROR, etc.)
- Stage tracking (plan_validation, sql_generation, etc.)
- Debug payloads (SQL, plan, errors)
- Structured error response
Performance Metrics
Before
{
"performance": {
"total_time_ms": 1234,
"stage_timings_ms": {...}
},
"cached": false // Always false ❌
}
After
{
"performance": {
"total_time_ms": 1234,
"stage_timings_ms": {
"semantic_context": 10,
"query_planning": 50,
"plan_validation": 5, // ✅ New
"sql_generation": 100,
"sql_validation": 15, // ✅ Improved
"query_execution": 950,
"analytics": 80,
"pagination": 4
},
"plan_cached": true, // ✅ Real status
"result_cached": false // ✅ Real status
},
"cached": false
}
Migration Guide
Option 1: Drop-in Replacement
# Old
from src.integration.orchestrator import get_orchestrator
orchestrator = get_orchestrator()
# New
from src.integration.orchestrator_v2 import get_improved_orchestrator
orchestrator = get_improved_orchestrator()
# API remains compatible
response = await orchestrator.execute_query(request)
Option 2: Gradual Migration
- Import both versions
- Run queries through both
- Compare results in parallel
- Switch to v2 when confident
- Remove v1
Option 3: Feature Flag
if os.getenv("USE_IMPROVED_ORCHESTRATOR", "false") == "true":
from src.integration.orchestrator_v2 import get_improved_orchestrator as get_orchestrator
else:
from src.integration.orchestrator import get_orchestrator
API Changes
QueryRequest
Added field:
enable_debug: bool = False # Include debug info in errors
QueryResponse
Added fields:
success: bool # Explicit success flag
error: Optional[QueryError] = None # Structured error info
QueryResponse.performance
Added fields:
{
"plan_cached": bool, # Was plan from cache?
"result_cached": bool # Was result from cache?
}
QueryResponse.analytics
Added metadata:
{
"_metadata": {
"total_rows": int,
"numeric_columns_analyzed": int,
"computed_on_full_dataset": true
}
}
Testing Recommendations
1. Cache Hit Testing
# First query - should be cache miss
response1 = await orchestrator.execute_query(request)
assert response1.cached == False
assert response1.performance["plan_cached"] == False
# Second query - should be cache hit
response2 = await orchestrator.execute_query(request)
assert response2.cached == True
assert response2.performance["plan_cached"] == True
2. Validation Testing
# Test plan validation
request = QueryRequest(question="Show me invalid_metric by invalid_dimension")
response = await orchestrator.execute_query(request)
assert response.success == False
assert response.error.type == ErrorType.PLAN_ERROR
assert "Unknown metric" in response.error.message
# Test SQL AST validation
# Mock SQL generator to return dangerous SQL
response = await orchestrator.execute_query(request)
assert response.error.type == ErrorType.VALIDATION_ERROR
assert "Dangerous keyword" in response.error.message
3. Analytics Testing
# Test analytics on full dataset
request = QueryRequest(
question="Show me revenue",
pagination=PaginationParams(page=1, page_size=10)
)
response = await orchestrator.execute_query(request)
# Results are paginated (10 rows)
assert len(response.results) == 10
# But analytics computed on all rows
assert response.analytics["_metadata"]["total_rows"] > 10
assert response.analytics["_metadata"]["computed_on_full_dataset"] == True
4. Error Type Testing
# Test each error type
error_scenarios = [
("invalid metric", ErrorType.PLAN_ERROR),
("SQL injection", ErrorType.VALIDATION_ERROR),
("database down", ErrorType.EXECUTION_ERROR),
("OpenAI timeout", ErrorType.LLM_ERROR),
]
for scenario, expected_type in error_scenarios:
response = await orchestrator.execute_query(request)
assert response.error.type == expected_type
Future Enhancements
1. Query Plan Optimization
- Detect redundant filters
- Suggest index usage
- Rewrite inefficient plans
2. Smart Caching Strategies
- Partial result caching
- Invalidation on data updates
- Cache warming for common queries
3. Advanced AST Validation
- Column existence checking
- Join condition validation
- Index usage analysis
4. Analytics Enhancements
- Time-series analysis
- Trend detection
- Automatic insights generation
5. Distributed Execution
- Parallel query execution
- Result streaming
- Resource pooling
Conclusion
The improved QueryOrchestrator addresses all four critical gaps:
- ✅ Real Caching: Accurate cache-hit tracking with metadata
- ✅ Multi-Level Validation: Plan-level + SQL AST validation
- ✅ Typed Errors: Structured errors with debug payloads
- ✅ Correct Analytics: Computed on full result sets
These improvements provide a production-ready orchestration layer with:
- Better debugging capabilities
- Improved security
- Accurate analytics
- Transparent performance metrics
- Structured error handling
The implementation maintains API compatibility while adding opt-in features like debug mode.