Source code for sqlalchemy_jdbcapi.jdbc.connection

"""
JDBC Connection implementation following DB-API 2.0 specification.
"""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Any

from .cursor import Cursor
from .exceptions import DatabaseError, InterfaceError
from .jvm import start_jvm

logger = logging.getLogger(__name__)


[docs] class Connection: """ DB-API 2.0 compliant connection to a JDBC database. This class wraps a JDBC Connection object from JPype. """
[docs] def __init__( self, jclassname: str, url: str, driver_args: dict[str, Any] | list[Any] | None = None, jars: list[str] | None = None, libs: list[str] | None = None, ) -> None: """ Create a JDBC connection. Args: jclassname: Fully qualified Java class name of JDBC driver url: JDBC connection URL driver_args: Connection properties (dict) or [user, password] (list) jars: List of JAR file paths (for classpath) libs: Additional native library paths Raises: InterfaceError: If JVM or driver cannot be loaded DatabaseError: If connection fails """ self._jclassname = jclassname self._url = url self._jdbc_connection: Any = None self._closed = False # Start JVM if not already running # Note: JVM can only be started once per Python process try: # Pass jars as-is to start_jvm - if None, it will use environment classpath classpath = ( [Path(jar) if isinstance(jar, str) else jar for jar in jars] if jars else None ) start_jvm(classpath=classpath, jvm_args=None) except Exception as e: raise InterfaceError(f"Failed to start JVM: {e}") from e # Load driver and establish connection try: import jpype # Load the JDBC driver class - this registers it with DriverManager # Even though we don't use the returned class, loading it has side effects jpype.JClass(jclassname) logger.debug(f"Loaded JDBC driver: {jclassname}") # Get DriverManager - this is the main entry point for JDBC connections dm = jpype.JClass("java.sql.DriverManager") # Create connection based on driver_args format # We support multiple formats for flexibility if driver_args is None: # Simple connection without credentials self._jdbc_connection = dm.getConnection(url) elif isinstance(driver_args, dict): # Dict format - convert to Java Properties # This allows passing custom JDBC properties like SSL settings props = jpype.JClass("java.util.Properties")() for k, v in driver_args.items(): props.setProperty(str(k), str(v)) self._jdbc_connection = dm.getConnection(url, props) elif isinstance(driver_args, (list, tuple)) and len(driver_args) == 2: # List format for simple user/password auth usr, pwd = driver_args self._jdbc_connection = dm.getConnection(url, str(usr), str(pwd)) else: # TODO: Maybe support more argument formats in the future? raise ValueError("driver_args must be dict, [user, password], or None") # Disable autocommit by default for transactional behavior # JDBC connections have autocommit=true by default self._jdbc_connection.setAutoCommit(False) logger.info(f"Connected to database: {url}") except Exception as e: logger.exception(f"Connection failed: {e}") raise DatabaseError(f"Failed to connect to database: {e}") from e
[docs] def close(self) -> None: """Close the connection.""" if self._closed: return try: if self._jdbc_connection is not None: self._jdbc_connection.close() logger.debug("Connection closed") except Exception as e: logger.warning(f"Error closing connection: {e}") finally: self._jdbc_connection = None self._closed = True
[docs] def commit(self) -> None: """Commit current transaction.""" if self._closed: raise InterfaceError("Connection is closed") try: if self._jdbc_connection is not None: self._jdbc_connection.commit() logger.debug("Transaction committed") except Exception as e: raise DatabaseError(f"Commit failed: {e}") from e
[docs] def rollback(self) -> None: """Rollback current transaction.""" if self._closed: raise InterfaceError("Connection is closed") try: if self._jdbc_connection is not None: self._jdbc_connection.rollback() logger.debug("Transaction rolled back") except Exception as e: raise DatabaseError(f"Rollback failed: {e}") from e
[docs] def cursor(self) -> Cursor: """ Create a new cursor. Returns: Cursor object Raises: InterfaceError: If connection is closed """ if self._closed: raise InterfaceError("Connection is closed") return Cursor(self, self._jdbc_connection)
[docs] def set_auto_commit(self, auto_commit: bool) -> None: """ Set auto-commit mode. Args: auto_commit: True to enable auto-commit, False to disable """ if self._closed: raise InterfaceError("Connection is closed") try: self._jdbc_connection.setAutoCommit(auto_commit) logger.debug(f"Auto-commit set to {auto_commit}") except Exception as e: raise DatabaseError(f"Failed to set auto-commit: {e}") from e
[docs] def get_auto_commit(self) -> bool: """Get current auto-commit mode.""" if self._closed: raise InterfaceError("Connection is closed") try: return self._jdbc_connection.getAutoCommit() except Exception as e: raise DatabaseError(f"Failed to get auto-commit: {e}") from e
@property def closed(self) -> bool: """Check if connection is closed.""" return self._closed
[docs] def __enter__(self) -> Connection: """Context manager entry.""" return self
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Context manager exit.""" if exc_type is None: self.commit() else: self.rollback() self.close()
[docs] def __del__(self) -> None: """Destructor to ensure connection is closed.""" if not self._closed: try: self.close() except Exception: pass
def connect( jclassname: str, url: str, driver_args: dict[str, Any] | list[Any] | None = None, jars: list[str] | None = None, libs: list[str] | None = None, ) -> Connection: """ Create a JDBC database connection. This is the main entry point for creating connections, following DB-API 2.0. Args: jclassname: Fully qualified Java class name of JDBC driver url: JDBC connection URL driver_args: Connection properties (dict) or [user, password] (list) jars: List of JAR file paths for classpath libs: Additional native library paths Returns: Connection object Example: >>> conn = connect( ... 'org.postgresql.Driver', ... 'jdbc:postgresql://localhost:5432/mydb', ... {'user': 'myuser', 'password': 'mypass'} ... ) >>> cursor = conn.cursor() >>> cursor.execute('SELECT * FROM users') >>> rows = cursor.fetchall() >>> conn.close() """ return Connection( jclassname=jclassname, url=url, driver_args=driver_args, jars=jars, libs=libs, )