"""Multi-backend query engine for GIQL.
This module provides the main query engine that supports multiple SQL databases
through transpilation of GIQL syntax to standard SQL.
"""
from typing import Literal
import pandas as pd
from sqlglot import parse_one
from giql.constants import DEFAULT_CHROM_COL
from giql.constants import DEFAULT_END_COL
from giql.constants import DEFAULT_GENOMIC_COL
from giql.constants import DEFAULT_START_COL
from giql.constants import DEFAULT_STRAND_COL
from giql.dialect import GIQLDialect
from giql.generators import BaseGIQLGenerator
from giql.generators import GIQLDuckDBGenerator
from giql.protocols import CursorLike
from giql.range_parser import CoordinateSystem
from giql.range_parser import IntervalType
from giql.schema import ColumnInfo
from giql.schema import SchemaInfo
from giql.schema import TableSchema
from giql.transformer import ClusterTransformer
from giql.transformer import MergeTransformer
DialectType = Literal["duckdb", "sqlite"]
[docs]
class GIQLEngine:
"""Multi-backend GIQL query engine.
Supports multiple SQL databases through transpilation of GIQL syntax
to standard SQL. Can work with DuckDB, SQLite, and other backends.
Examples
--------
Query a pandas DataFrame with DuckDB::
import pandas as pd
from giql import GIQLEngine
df = pd.DataFrame(
{
"id": [1, 2, 3],
"chromosome": ["chr1", "chr1", "chr2"],
"start_pos": [1500, 10500, 500],
"end_pos": [1600, 10600, 600],
}
)
with GIQLEngine(target_dialect="duckdb") as engine:
engine.conn.register("variants", df)
cursor = engine.execute(
"SELECT * FROM variants WHERE interval INTERSECTS 'chr1:1000-2000'"
)
for row in cursor:
print(row)
Load from CSV::
with GIQLEngine(target_dialect="duckdb") as engine:
engine.load_csv("variants", "variants.csv")
cursor = engine.execute(
"SELECT * FROM variants WHERE interval INTERSECTS 'chr1:1000-2000'"
)
# Process rows lazily
while True:
row = cursor.fetchone()
if row is None:
break
print(row)
Using SQLite backend::
with GIQLEngine(target_dialect="sqlite", db_path="data.db") as engine:
cursor = engine.execute(
"SELECT * FROM variants WHERE interval INTERSECTS 'chr1:1000-2000'"
)
# Materialize all results at once
results = cursor.fetchall()
"""
[docs]
def __init__(
self,
target_dialect: DialectType | str = "duckdb",
connection=None,
db_path: str = ":memory:",
verbose: bool = False,
**dialect_options,
):
"""Initialize engine.
:param target_dialect:
Target SQL dialect ('duckdb', 'sqlite', 'standard')
:param connection:
Existing database connection (optional)
:param db_path:
Database path or connection string
:param verbose:
Print transpiled SQL
:param dialect_options:
Additional options for specific dialects
"""
self.target_dialect = target_dialect
self.verbose = verbose
self.schema_info = SchemaInfo()
self.dialect_options = dialect_options
# Initialize connection
if connection:
self.conn = connection
self.owns_connection = False
else:
self.conn = self._create_connection(db_path)
self.owns_connection = True
# Get appropriate generator
self.generator = self._get_generator()
# Initialize query transformers
self.cluster_transformer = ClusterTransformer(self.schema_info)
self.merge_transformer = MergeTransformer(self.schema_info)
def _create_connection(self, db_path: str):
"""Create database connection based on target dialect.
:param db_path:
Path to database file or connection string
:return:
Connection object for the specified database backend
:raises ImportError:
If the required database driver is not installed
:raises ValueError:
If the dialect is not supported
"""
if self.target_dialect == "duckdb":
try:
import duckdb
return duckdb.connect(db_path)
except ImportError:
raise ImportError("DuckDB not installed.")
elif self.target_dialect == "sqlite":
import sqlite3
return sqlite3.connect(db_path)
else:
raise ValueError(
f"Unsupported dialect: {self.target_dialect}. Supported: duckdb, sqlite"
)
def _get_generator(self):
"""Get generator for target dialect.
:return:
SQL generator instance configured for the target dialect
"""
generators = {
"duckdb": GIQLDuckDBGenerator,
"sqlite": BaseGIQLGenerator,
"standard": BaseGIQLGenerator,
}
generator_class = generators.get(self.target_dialect, BaseGIQLGenerator)
return generator_class(schema_info=self.schema_info, **self.dialect_options)
[docs]
def register_table_schema(
self,
table_name: str,
columns: dict[str, str],
genomic_column: str = DEFAULT_GENOMIC_COL,
chrom_col: str = DEFAULT_CHROM_COL,
start_col: str = DEFAULT_START_COL,
end_col: str = DEFAULT_END_COL,
strand_col: str | None = DEFAULT_STRAND_COL,
coordinate_system: str = "0based",
interval_type: str = "half_open",
):
"""Register schema for a table.
This method tells the engine how genomic ranges are stored in the table,
mapping logical genomic column names to physical column names.
:param table_name:
Table name
:param columns:
Dict of column_name -> type
:param genomic_column:
Logical name for genomic position
:param chrom_col:
Physical chromosome column
:param start_col:
Physical start position column
:param end_col:
Physical end position column
:param strand_col:
Physical strand column (optional)
:param coordinate_system:
Coordinate system: "0based" or "1based" (default: "0based")
:param interval_type:
Interval endpoint handling: "half_open" or "closed" (default: "half_open")
"""
# Convert string parameters to enums
coord_sys = (
CoordinateSystem.ONE_BASED
if coordinate_system == "1based"
else CoordinateSystem.ZERO_BASED
)
int_type = (
IntervalType.CLOSED if interval_type == "closed" else IntervalType.HALF_OPEN
)
column_infos = {}
for col_name, col_type in columns.items():
column_infos[col_name] = ColumnInfo(
name=col_name, type=col_type, is_genomic=False
)
# Add virtual genomic column with mappings to physical columns
column_infos[genomic_column] = ColumnInfo(
name=genomic_column,
type="GENOMIC_RANGE", # Virtual type
is_genomic=True,
chrom_col=chrom_col,
start_col=start_col,
end_col=end_col,
strand_col=strand_col,
coordinate_system=coord_sys,
interval_type=int_type,
)
table_schema = TableSchema(table_name, column_infos)
self.schema_info.register_table(table_name, table_schema)
[docs]
def load_csv(self, table_name: str, file_path: str):
"""Load CSV file into database.
:param table_name:
Name to assign to the table
:param file_path:
Path to the CSV file
"""
if self.target_dialect == "duckdb":
self.conn.execute(
f"CREATE TABLE {table_name} "
f"AS SELECT * FROM read_csv_auto('{file_path}')"
)
elif self.target_dialect == "sqlite":
# Use pandas for SQLite
df = pd.read_csv(file_path)
df.to_sql(table_name, self.conn, if_exists="replace", index=False)
if self.verbose:
print(f"Loaded {table_name} from {file_path}")
[docs]
def load_parquet(self, table_name: str, file_path: str):
"""Load Parquet file into database.
:param table_name:
Name to assign to the table
:param file_path:
Path to the Parquet file
"""
if self.target_dialect == "duckdb":
self.conn.execute(
f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}')"
)
else:
df = pd.read_parquet(file_path)
df.to_sql(table_name, self.conn, if_exists="replace", index=False)
if self.verbose:
print(f"Loaded {table_name} from {file_path}")
[docs]
def transpile(self, giql: str) -> str:
"""Transpile a GIQL query to the engine's target SQL dialect.
Parses the GIQL syntax and transpiles it to the target SQL dialect
without executing it. Useful for debugging or generating SQL for
external use.
:param giql:
Query string with GIQL genomic extensions
:return:
Transpiled SQL query string in the target dialect
:raises ValueError:
If the query cannot be parsed or transpiled
"""
# Parse with GIQL dialect
try:
ast = parse_one(giql, dialect=GIQLDialect)
except Exception as e:
raise ValueError(f"Parse error: {e}\nQuery: {giql}")
# Transform query (MERGE first, then CLUSTER)
try:
# Apply MERGE transformation (which may internally use CLUSTER)
ast = self.merge_transformer.transform(ast)
# Apply CLUSTER transformation for any standalone CLUSTER expressions
ast = self.cluster_transformer.transform(ast)
except Exception as e:
raise ValueError(f"Transformation error: {e}")
# Transpile to target dialect
try:
target_sql = self.generator.generate(ast)
except Exception as e:
raise ValueError(f"Transpilation error: {e}")
if self.verbose:
print(f"\n{'=' * 60}")
print(f"Target Dialect: {self.target_dialect}")
print("\nOriginal GIQL:")
print(giql)
print("\nTranspiled SQL:")
print(target_sql)
print(f"{'=' * 60}\n")
return target_sql
[docs]
def execute(self, giql: str) -> CursorLike:
"""Execute a GIQL query and return a database cursor.
Parses the GIQL syntax, transpiles to target SQL dialect,
and executes the query returning a cursor for lazy iteration.
:param giql:
Query string with GIQL genomic extensions
:return:
Database cursor (DB-API 2.0 compatible) that can be iterated
:raises ValueError:
If the query cannot be parsed, transpiled, or executed
"""
# Transpile GIQL to target SQL
target_sql = self.transpile(giql)
# Execute and return cursor
try:
return self.conn.execute(target_sql)
except Exception as e:
raise ValueError(f"Execution error: {e}\nSQL: {target_sql}")
[docs]
def execute_raw(self, sql: str) -> pd.DataFrame:
"""Execute raw SQL directly, bypassing GIQL parsing.
:param sql:
Raw SQL query string
:return:
Query results as a pandas DataFrame
"""
return pd.read_sql(sql, self.conn)
[docs]
def close(self):
"""Close database connection.
Only closes connections created by the engine. If an external
connection was provided during initialization, it is not closed.
"""
if self.owns_connection and self.conn:
self.conn.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb): # noqa: ANN001
self.close()