Source code for sqlalchemy_jdbcapi.dialects.avatica

"""
Apache Avatica JDBC dialect for SQLAlchemy.

Provides support for databases accessible via Apache Calcite Avatica,
including Apache Phoenix, Apache Drill, and other Calcite-based systems.
"""

from __future__ import annotations

import logging
from typing import Any

from sqlalchemy import exc, sql
from sqlalchemy.engine import Connection, Dialect

from .base import BaseJDBCDialect, JDBCDriverConfig

logger = logging.getLogger(__name__)


[docs] class AvaticaDialect(BaseJDBCDialect, Dialect): # type: ignore """ Apache Avatica dialect using JDBC driver. Apache Avatica is a framework for building database drivers. It provides a wire protocol and client-side drivers that can communicate with any Avatica-compliant server, including: - Apache Phoenix (HBase SQL layer) - Apache Drill (schema-free SQL query engine) - Apache Calcite (SQL query planning and optimization) Supports Avatica-specific features including: - HTTP/HTTPS transport - Protobuf or JSON serialization - Authentication (Basic, SPNEGO/Kerberos, Digest) - Custom avatica properties Connection URL format: jdbcapi+avatica://host:8765/?serialization=protobuf jdbcapi+phoenix://host:8765/?schema=myschema (Phoenix alias) jdbcapi+calcite://host:8765/ (Calcite alias) Special connection properties: - serialization: PROTOBUF or JSON - authentication: BASIC, DIGEST, SPNEGO - avatica_user: Avatica authentication user - avatica_password: Avatica authentication password - truststore: Path to truststore for HTTPS - truststore_password: Truststore password """ name = "avatica" driver = "jdbcapi" # Avatica capabilities vary by backend, use conservative defaults supports_native_boolean = True supports_sequences = False # Depends on backend supports_identity_columns = False # Depends on backend supports_native_enum = False supports_multivalues_insert = True supports_statement_cache = True # Avatica-specific settings supports_server_side_cursors = False
[docs] @classmethod def get_driver_config(cls) -> JDBCDriverConfig: """Get Apache Avatica JDBC driver configuration.""" return JDBCDriverConfig( driver_class="org.apache.calcite.avatica.remote.Driver", jdbc_url_template="jdbc:avatica:remote:url=http://{host}:{port}", default_port=8765, supports_transactions=True, supports_schemas=True, supports_sequences=False, )
[docs] @classmethod def create_connect_args(cls, url): # type: ignore """ Create connection arguments from URL. Avatica URLs are complex and support various configurations. """ driver_config = cls.get_driver_config() # Get connection parameters host = url.host or "localhost" port = url.port or driver_config.default_port database = url.database or "" username = url.username or "" password = url.password or "" # Get query parameters query_params = dict(url.query) # Determine protocol (http or https) protocol = ( "https" if query_params.pop("ssl", "false").lower() == "true" else "http" ) # Build the Avatica remote URL avatica_url = f"{protocol}://{host}:{port}" # Add database/schema if specified if database: avatica_url += f";schema={database}" # Build JDBC URL for Avatica jdbc_url = f"jdbc:avatica:remote:url={avatica_url}" # Add serialization format serialization = query_params.pop("serialization", "PROTOBUF").upper() jdbc_url += f";serialization={serialization}" # Add authentication authentication = query_params.pop("authentication", "").upper() if authentication: jdbc_url += f";authentication={authentication}" # Add truststore for HTTPS if "truststore" in query_params: jdbc_url += f";truststore={query_params.pop('truststore')}" if "truststore_password" in query_params: jdbc_url += ( f";truststore_password={query_params.pop('truststore_password')}" ) # Build driver arguments driver_args = {} if username: # Avatica uses avatica_user for authentication driver_args["avatica_user"] = username if password: driver_args["avatica_password"] = password # Add remaining parameters driver_args.update(query_params) return ( driver_config.driver_class, jdbc_url, driver_args if driver_args else None, )
[docs] def initialize(self, connection: Connection) -> None: """Initialize Avatica connection.""" if not hasattr(self, "_server_version_info"): self._server_version_info = self._get_server_version_info(connection) logger.debug("Initialized Apache Avatica JDBC dialect")
def _get_server_version_info(self, connection: Connection) -> tuple[int, ...]: """ Get Avatica/backend server version. Returns: Tuple of version numbers """ try: # Try to get version via JDBC metadata connection.execute(sql.text("SELECT 1")).fetchone() # Avatica version is typically in JDBC metadata # Default to current stable version return (1, 23, 0) except exc.DBAPIError as e: logger.warning(f"Failed to get Avatica version: {e}") # Default fallback return (1, 20, 0)
[docs] def do_ping(self, dbapi_connection: Any) -> bool: """Check if Avatica connection is alive.""" try: cursor = dbapi_connection.cursor() cursor.execute("SELECT 1") cursor.close() return True except Exception as e: logger.debug(f"Avatica ping failed: {e}") return False
[docs] class PhoenixDialect(AvaticaDialect): """ Apache Phoenix dialect (HBase SQL layer). Phoenix provides SQL access to HBase with full ACID transaction support. Connection URL format: jdbcapi+phoenix://zookeeper-host:2181/?schema=myschema """ name = "phoenix" # Phoenix capabilities supports_sequences = True # Phoenix supports sequences supports_identity_columns = False
[docs] @classmethod def get_driver_config(cls) -> JDBCDriverConfig: """Get Apache Phoenix JDBC driver configuration.""" return JDBCDriverConfig( driver_class="org.apache.phoenix.jdbc.PhoenixDriver", jdbc_url_template="jdbc:phoenix:{host}:{port}", default_port=2181, # ZooKeeper port supports_transactions=True, supports_schemas=True, supports_sequences=True, )
[docs] @classmethod def create_connect_args(cls, url): # type: ignore """Create Phoenix-specific connection arguments.""" driver_config = cls.get_driver_config() host = url.host or "localhost" port = url.port or driver_config.default_port database = url.database or "" # Phoenix JDBC URL format jdbc_url = f"jdbc:phoenix:{host}:{port}" if database: jdbc_url += f":/{database}" # Phoenix-specific properties driver_args = {} query_params = dict(url.query) if "schema" in query_params: driver_args["phoenix.schema"] = query_params.pop("schema") if url.username: driver_args["user"] = url.username if url.password: driver_args["password"] = url.password driver_args.update(query_params) return ( driver_config.driver_class, jdbc_url, driver_args if driver_args else None, )
[docs] def do_ping(self, dbapi_connection: Any) -> bool: """Check if Phoenix connection is alive.""" try: cursor = dbapi_connection.cursor() # Phoenix-specific ping cursor.execute("SELECT 1 FROM SYSTEM.CATALOG LIMIT 1") cursor.close() return True except Exception as e: logger.debug(f"Phoenix ping failed: {e}") return False
[docs] class CalciteDialect(AvaticaDialect): """ Apache Calcite dialect. Calcite is a foundational framework for building databases and data management systems. Connection URL format: jdbcapi+calcite://host:8765/?model=/path/to/model.json """ name = "calcite"
[docs] @classmethod def get_driver_config(cls) -> JDBCDriverConfig: """Get Apache Calcite JDBC driver configuration.""" return JDBCDriverConfig( driver_class="org.apache.calcite.jdbc.Driver", jdbc_url_template="jdbc:calcite:model={database}", default_port=0, # Calcite can be embedded or remote supports_transactions=True, supports_schemas=True, supports_sequences=False, )
[docs] @classmethod def create_connect_args(cls, url): # type: ignore """Create Calcite-specific connection arguments.""" driver_config = cls.get_driver_config() # Calcite primarily uses model files database = url.database or "" query_params = dict(url.query) # Check if this is a remote Avatica connection or local model if url.host: # Remote Avatica connection return AvaticaDialect.create_connect_args(url) # Local Calcite with model file model = query_params.pop("model", database) jdbc_url = f"jdbc:calcite:model={model}" # Add any additional properties driver_args = dict(query_params) return ( driver_config.driver_class, jdbc_url, driver_args if driver_args else None, )
# Export the main dialect dialect = AvaticaDialect