Source code for giql.engine

"""Multi-backend query engine for GIQL.

This module provides the main query engine that supports multiple SQL databases
through transpilation of GIQL syntax to standard SQL.
"""

from typing import Literal

import pandas as pd
from sqlglot import parse_one

from giql.constants import DEFAULT_CHROM_COL
from giql.constants import DEFAULT_END_COL
from giql.constants import DEFAULT_GENOMIC_COL
from giql.constants import DEFAULT_START_COL
from giql.constants import DEFAULT_STRAND_COL
from giql.dialect import GIQLDialect
from giql.generators import BaseGIQLGenerator
from giql.generators import GIQLDuckDBGenerator
from giql.protocols import CursorLike
from giql.range_parser import CoordinateSystem
from giql.range_parser import IntervalType
from giql.schema import ColumnInfo
from giql.schema import SchemaInfo
from giql.schema import TableSchema
from giql.transformer import ClusterTransformer
from giql.transformer import MergeTransformer

DialectType = Literal["duckdb", "sqlite"]


[docs] class GIQLEngine: """Multi-backend GIQL query engine. Supports multiple SQL databases through transpilation of GIQL syntax to standard SQL. Can work with DuckDB, SQLite, and other backends. Examples -------- Query a pandas DataFrame with DuckDB:: import pandas as pd from giql import GIQLEngine df = pd.DataFrame( { "id": [1, 2, 3], "chromosome": ["chr1", "chr1", "chr2"], "start_pos": [1500, 10500, 500], "end_pos": [1600, 10600, 600], } ) with GIQLEngine(target_dialect="duckdb") as engine: engine.conn.register("variants", df) cursor = engine.execute( "SELECT * FROM variants WHERE interval INTERSECTS 'chr1:1000-2000'" ) for row in cursor: print(row) Load from CSV:: with GIQLEngine(target_dialect="duckdb") as engine: engine.load_csv("variants", "variants.csv") cursor = engine.execute( "SELECT * FROM variants WHERE interval INTERSECTS 'chr1:1000-2000'" ) # Process rows lazily while True: row = cursor.fetchone() if row is None: break print(row) Using SQLite backend:: with GIQLEngine(target_dialect="sqlite", db_path="data.db") as engine: cursor = engine.execute( "SELECT * FROM variants WHERE interval INTERSECTS 'chr1:1000-2000'" ) # Materialize all results at once results = cursor.fetchall() """
[docs] def __init__( self, target_dialect: DialectType | str = "duckdb", connection=None, db_path: str = ":memory:", verbose: bool = False, **dialect_options, ): """Initialize engine. :param target_dialect: Target SQL dialect ('duckdb', 'sqlite', 'standard') :param connection: Existing database connection (optional) :param db_path: Database path or connection string :param verbose: Print transpiled SQL :param dialect_options: Additional options for specific dialects """ self.target_dialect = target_dialect self.verbose = verbose self.schema_info = SchemaInfo() self.dialect_options = dialect_options # Initialize connection if connection: self.conn = connection self.owns_connection = False else: self.conn = self._create_connection(db_path) self.owns_connection = True # Get appropriate generator self.generator = self._get_generator() # Initialize query transformers self.cluster_transformer = ClusterTransformer(self.schema_info) self.merge_transformer = MergeTransformer(self.schema_info)
def _create_connection(self, db_path: str): """Create database connection based on target dialect. :param db_path: Path to database file or connection string :return: Connection object for the specified database backend :raises ImportError: If the required database driver is not installed :raises ValueError: If the dialect is not supported """ if self.target_dialect == "duckdb": try: import duckdb return duckdb.connect(db_path) except ImportError: raise ImportError("DuckDB not installed.") elif self.target_dialect == "sqlite": import sqlite3 return sqlite3.connect(db_path) else: raise ValueError( f"Unsupported dialect: {self.target_dialect}. Supported: duckdb, sqlite" ) def _get_generator(self): """Get generator for target dialect. :return: SQL generator instance configured for the target dialect """ generators = { "duckdb": GIQLDuckDBGenerator, "sqlite": BaseGIQLGenerator, "standard": BaseGIQLGenerator, } generator_class = generators.get(self.target_dialect, BaseGIQLGenerator) return generator_class(schema_info=self.schema_info, **self.dialect_options)
[docs] def register_table_schema( self, table_name: str, columns: dict[str, str], genomic_column: str = DEFAULT_GENOMIC_COL, chrom_col: str = DEFAULT_CHROM_COL, start_col: str = DEFAULT_START_COL, end_col: str = DEFAULT_END_COL, strand_col: str | None = DEFAULT_STRAND_COL, coordinate_system: str = "0based", interval_type: str = "half_open", ): """Register schema for a table. This method tells the engine how genomic ranges are stored in the table, mapping logical genomic column names to physical column names. :param table_name: Table name :param columns: Dict of column_name -> type :param genomic_column: Logical name for genomic position :param chrom_col: Physical chromosome column :param start_col: Physical start position column :param end_col: Physical end position column :param strand_col: Physical strand column (optional) :param coordinate_system: Coordinate system: "0based" or "1based" (default: "0based") :param interval_type: Interval endpoint handling: "half_open" or "closed" (default: "half_open") """ # Convert string parameters to enums coord_sys = ( CoordinateSystem.ONE_BASED if coordinate_system == "1based" else CoordinateSystem.ZERO_BASED ) int_type = ( IntervalType.CLOSED if interval_type == "closed" else IntervalType.HALF_OPEN ) column_infos = {} for col_name, col_type in columns.items(): column_infos[col_name] = ColumnInfo( name=col_name, type=col_type, is_genomic=False ) # Add virtual genomic column with mappings to physical columns column_infos[genomic_column] = ColumnInfo( name=genomic_column, type="GENOMIC_RANGE", # Virtual type is_genomic=True, chrom_col=chrom_col, start_col=start_col, end_col=end_col, strand_col=strand_col, coordinate_system=coord_sys, interval_type=int_type, ) table_schema = TableSchema(table_name, column_infos) self.schema_info.register_table(table_name, table_schema)
[docs] def load_csv(self, table_name: str, file_path: str): """Load CSV file into database. :param table_name: Name to assign to the table :param file_path: Path to the CSV file """ if self.target_dialect == "duckdb": self.conn.execute( f"CREATE TABLE {table_name} " f"AS SELECT * FROM read_csv_auto('{file_path}')" ) elif self.target_dialect == "sqlite": # Use pandas for SQLite df = pd.read_csv(file_path) df.to_sql(table_name, self.conn, if_exists="replace", index=False) if self.verbose: print(f"Loaded {table_name} from {file_path}")
[docs] def load_parquet(self, table_name: str, file_path: str): """Load Parquet file into database. :param table_name: Name to assign to the table :param file_path: Path to the Parquet file """ if self.target_dialect == "duckdb": self.conn.execute( f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}')" ) else: df = pd.read_parquet(file_path) df.to_sql(table_name, self.conn, if_exists="replace", index=False) if self.verbose: print(f"Loaded {table_name} from {file_path}")
[docs] def transpile(self, giql: str) -> str: """Transpile a GIQL query to the engine's target SQL dialect. Parses the GIQL syntax and transpiles it to the target SQL dialect without executing it. Useful for debugging or generating SQL for external use. :param giql: Query string with GIQL genomic extensions :return: Transpiled SQL query string in the target dialect :raises ValueError: If the query cannot be parsed or transpiled """ # Parse with GIQL dialect try: ast = parse_one(giql, dialect=GIQLDialect) except Exception as e: raise ValueError(f"Parse error: {e}\nQuery: {giql}") # Transform query (MERGE first, then CLUSTER) try: # Apply MERGE transformation (which may internally use CLUSTER) ast = self.merge_transformer.transform(ast) # Apply CLUSTER transformation for any standalone CLUSTER expressions ast = self.cluster_transformer.transform(ast) except Exception as e: raise ValueError(f"Transformation error: {e}") # Transpile to target dialect try: target_sql = self.generator.generate(ast) except Exception as e: raise ValueError(f"Transpilation error: {e}") if self.verbose: print(f"\n{'=' * 60}") print(f"Target Dialect: {self.target_dialect}") print("\nOriginal GIQL:") print(giql) print("\nTranspiled SQL:") print(target_sql) print(f"{'=' * 60}\n") return target_sql
[docs] def execute(self, giql: str) -> CursorLike: """Execute a GIQL query and return a database cursor. Parses the GIQL syntax, transpiles to target SQL dialect, and executes the query returning a cursor for lazy iteration. :param giql: Query string with GIQL genomic extensions :return: Database cursor (DB-API 2.0 compatible) that can be iterated :raises ValueError: If the query cannot be parsed, transpiled, or executed """ # Transpile GIQL to target SQL target_sql = self.transpile(giql) # Execute and return cursor try: return self.conn.execute(target_sql) except Exception as e: raise ValueError(f"Execution error: {e}\nSQL: {target_sql}")
[docs] def execute_raw(self, sql: str) -> pd.DataFrame: """Execute raw SQL directly, bypassing GIQL parsing. :param sql: Raw SQL query string :return: Query results as a pandas DataFrame """ return pd.read_sql(sql, self.conn)
[docs] def close(self): """Close database connection. Only closes connections created by the engine. If an external connection was provided during initialization, it is not closed. """ if self.owns_connection and self.conn: self.conn.close()
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): # noqa: ANN001 self.close()