Source code for sqlalchemy_jdbcapi.dialects.access

"""
Microsoft Access JDBC dialect for SQLAlchemy.

Provides support for Microsoft Access databases (.mdb, .accdb)
using the UCanAccess JDBC driver.
"""

from __future__ import annotations

import logging
import re
from typing import Any

from sqlalchemy import exc, sql
from sqlalchemy.engine import Connection, Dialect

from .base import BaseJDBCDialect, JDBCDriverConfig

logger = logging.getLogger(__name__)


[docs] class AccessDialect(BaseJDBCDialect, Dialect): # type: ignore """ Microsoft Access dialect using UCanAccess JDBC driver. UCanAccess is an open-source Java JDBC driver that reads and writes Microsoft Access databases (.mdb and .accdb files) without requiring Microsoft Office or ODBC drivers. Limitations: - No stored procedures support (Access macros not supported) - Limited transaction support - No sequences (uses AutoNumber/COUNTER) - Limited SQL syntax compared to full SQL databases - Single-user file locking considerations Supports Access-specific features including: - AutoNumber (COUNTER) columns - Memo fields (LONGCHAR) - OLE objects - Hyperlinks - Currency type - Yes/No (Boolean) type Connection URL format: jdbcapi+access:///path/to/database.accdb jdbcapi+msaccess:///path/to/database.mdb (alias) Special connection properties: - memory: true to open database in memory mode - newdatabaseversion: V2000, V2003, V2007, V2010 (for creating new DB) - encrypt: true to encrypt database - jackcessopener: custom opener class """ name = "access" driver = "jdbcapi" # Access capabilities (quite limited) supports_native_boolean = True # Yes/No type supports_sequences = False # Uses AutoNumber instead supports_identity_columns = True # AutoNumber/COUNTER supports_native_enum = False supports_multivalues_insert = False # Access doesn't support multi-row insert supports_statement_cache = True supports_default_values = True supports_empty_insert = False # Access-specific supports_sane_rowcount = False supports_sane_multi_rowcount = False postfetch_lastrowid = True
[docs] @classmethod def get_driver_config(cls) -> JDBCDriverConfig: """Get Microsoft Access JDBC driver configuration.""" return JDBCDriverConfig( driver_class="net.ucanaccess.jdbc.UcanaccessDriver", jdbc_url_template="jdbc:ucanaccess://{database}", default_port=0, # File-based, no port supports_transactions=True, # Limited support supports_schemas=False, # Access doesn't have schemas supports_sequences=False, )
[docs] @classmethod def create_connect_args(cls, url): # type: ignore # noqa: C901 """ Create connection arguments from URL. Access uses file paths instead of host/port. URL formats supported: - jdbcapi+access:///path/to/database.accdb (Unix absolute path) - jdbcapi+access:///C:/path/to/database.accdb (Windows absolute path) - jdbcapi+access://C:/path/to/database.accdb (Windows drive letter) - jdbcapi+access:////server/share/database.accdb (UNC path) """ driver_config = cls.get_driver_config() # Extract database path from URL database_path = "" if url.host: # Handle Windows paths like C:/path or //server/share if len(url.host) == 1 and url.host.isalpha(): # Windows drive letter (e.g., C:) database_path = f"{url.host}:" if url.database: database_path += url.database elif url.host.startswith("\\") or ( url.port is None and "/" in str(url.database or "") ): # UNC path: //server/share/file.accdb database_path = f"//{url.host}" if url.database: database_path += "/" + url.database else: # Network path or Unix path database_path = url.host if url.database: database_path += "/" + url.database elif url.database: database_path = url.database # Ensure path starts with / for Unix-like paths if ( database_path and not database_path.startswith("/") and ":" not in database_path ): database_path = "/" + database_path # Normalize the path - handle double slashes but preserve UNC paths if database_path.startswith("//"): # UNC path - preserve double slash at start normalized = "//" + database_path[2:].replace("//", "/") else: # Regular path normalized = database_path.replace("//", "/") database_path = normalized # Validate path has valid extension lower_path = database_path.lower() if not lower_path.endswith((".accdb", ".mdb")): logger.warning( f"Access database should have .accdb or .mdb extension: {database_path}" ) # Build JDBC URL jdbc_url = f"jdbc:ucanaccess://{database_path}" # Build driver arguments driver_args = {} query_params = dict(url.query) # UCanAccess-specific properties if "memory" in query_params: driver_args["memory"] = query_params.pop("memory") if "newdatabaseversion" in query_params: driver_args["newDatabaseVersion"] = query_params.pop("newdatabaseversion") elif "newDatabaseVersion" in query_params: driver_args["newDatabaseVersion"] = query_params.pop("newDatabaseVersion") if "encrypt" in query_params: driver_args["encrypt"] = query_params.pop("encrypt") if "jackcessopener" in query_params: driver_args["jackcessOpener"] = query_params.pop("jackcessopener") if "showschema" in query_params: driver_args["showSchema"] = query_params.pop("showschema") if "sysschema" in query_params: driver_args["sysSchema"] = query_params.pop("sysschema") # Add remaining query parameters driver_args.update(query_params) return ( driver_config.driver_class, jdbc_url, driver_args if driver_args else None, )
[docs] def initialize(self, connection: Connection) -> None: """Initialize Microsoft Access connection.""" if not hasattr(self, "_server_version_info"): self._server_version_info = self._get_server_version_info(connection) logger.debug("Initialized Microsoft Access JDBC dialect")
def _get_server_version_info(self, connection: Connection) -> tuple[int, ...]: """ Get Microsoft Access/UCanAccess version. Returns: Tuple of version numbers (UCanAccess version) """ try: # UCanAccess provides version via JDBC metadata # We'll try to get Jackcess/UCanAccess version connection.execute(sql.text("SELECT 1")).fetchone() # Default to a reasonable version # UCanAccess 5.x is current return (5, 0, 1) except exc.DBAPIError as e: logger.warning(f"Failed to get Access version: {e}") # Default fallback return (5, 0, 0)
[docs] def do_ping(self, dbapi_connection: Any) -> bool: """Check if Access connection is alive.""" try: cursor = dbapi_connection.cursor() cursor.execute("SELECT 1") cursor.close() return True except Exception as e: logger.debug(f"Access ping failed: {e}") return False
[docs] def has_table( self, connection: Connection, table_name: str, schema: str | None = None ) -> bool: """Check if a table exists.""" # Access doesn't have schemas, ignore schema parameter try: # Use JDBC metadata for Access # This is more reliable than querying system tables dbapi_conn = connection.connection.dbapi_connection metadata = dbapi_conn.jconn.getMetaData() result_set = metadata.getTables(None, None, table_name.upper(), ["TABLE"]) has_table = result_set.next() result_set.close() return has_table except Exception as e: logger.debug(f"has_table check failed: {e}") # Fallback: try to query the table using parameterized query # Note: Access doesn't support parameterized table names, so we validate # the table name to prevent SQL injection if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_ ]*$", table_name): logger.warning(f"Invalid table name format: {table_name}") return False try: connection.execute( sql.text(f"SELECT TOP 1 * FROM [{table_name}]") # noqa: S608 ).fetchone() return True except exc.DBAPIError: return False
def _jdbc_type_to_sqlalchemy(self, jdbc_type: int, type_name: str) -> Any: """ Convert JDBC type to SQLAlchemy type. Args: jdbc_type: JDBC type code type_name: JDBC type name string Returns: SQLAlchemy type class """ from sqlalchemy import ( Boolean, Date, DateTime, Float, Integer, LargeBinary, Numeric, String, Text, Time, ) # Map JDBC types to SQLAlchemy types type_map = { # Numeric types -6: Integer, # TINYINT 5: Integer, # SMALLINT 4: Integer, # INTEGER -5: Integer, # BIGINT 6: Float, # FLOAT 7: Float, # REAL 8: Float, # DOUBLE 2: Numeric, # NUMERIC 3: Numeric, # DECIMAL # String types 1: String, # CHAR 12: String, # VARCHAR -1: Text, # LONGVARCHAR # Date/time types 91: Date, # DATE 92: Time, # TIME 93: DateTime, # TIMESTAMP # Binary types -2: LargeBinary, # BINARY -3: LargeBinary, # VARBINARY -4: LargeBinary, # LONGVARBINARY # Boolean 16: Boolean, # BOOLEAN -7: Boolean, # BIT } # Check type name for Access-specific types upper_name = type_name.upper() if type_name else "" if "COUNTER" in upper_name or "AUTOINCREMENT" in upper_name: return Integer if "CURRENCY" in upper_name or "MONEY" in upper_name: return Numeric if "MEMO" in upper_name or "LONGCHAR" in upper_name: return Text if "YESNO" in upper_name: return Boolean if "OLEOBJECT" in upper_name: return LargeBinary if "HYPERLINK" in upper_name: return String return type_map.get(jdbc_type, String)
[docs] def get_columns( self, connection: Connection, table_name: str, schema: str | None = None, **kwargs: Any, ): # type: ignore """Get column information for a table.""" columns = [] try: dbapi_conn = connection.connection.dbapi_connection metadata = dbapi_conn.jconn.getMetaData() result_set = metadata.getColumns(None, None, table_name.upper(), None) while result_set.next(): column_name = result_set.getString("COLUMN_NAME") data_type = result_set.getInt("DATA_TYPE") type_name = result_set.getString("TYPE_NAME") nullable = result_set.getInt("NULLABLE") != 0 default = result_set.getString("COLUMN_DEF") # Map Access types to SQLAlchemy types col_info = { "name": column_name, "type": self._jdbc_type_to_sqlalchemy(data_type, type_name), "nullable": nullable, "default": default, "autoincrement": "COUNTER" in type_name.upper() if type_name else False, } columns.append(col_info) result_set.close() except Exception as e: logger.error(f"Failed to get columns for {table_name}: {e}") return columns
[docs] class MSAccessDialect(AccessDialect): """Alias for AccessDialect.""" name = "msaccess"
# Export the dialect dialect = AccessDialect