Skip to content

Database

Database connections and query execution for DesignSafe research databases.

Database Accessor

Provides lazy access to different DesignSafe database connections via properties.

This class manages multiple database connections and provides convenient property-based access to different DesignSafe databases. Each database connection is created only when first accessed (lazy initialization) and reused for subsequent calls.

The accessor supports the following databases through properties: - ngl: Natural hazards engineering research database - vp: Vulnerability and performance database - eq: Post-earthquake recovery database

ATTRIBUTE DESCRIPTION
_connections

Internal storage for database instances.

TYPE: Dict[str, Optional[DSDatabase]]

Example

accessor = DatabaseAccessor() DatabaseAccessor initialized. Connections will be created on first access.

Access NGL database (created on first access)

ngl_db = accessor.ngl First access to 'ngl', initializing DSDatabase... Successfully connected to database 'sjbrande_ngl_db' on 129.114.52.174.

Query the database

results = ngl_db.read_sql("SELECT COUNT(*) as total FROM users")

Close all connections when done

accessor.close_all() Closing all active database engines/pools... Closed 1 database engine(s).

Initialize the DatabaseAccessor with empty connection slots.

Creates a dictionary to hold database connections for each configured database, but does not establish any connections until they are first accessed.

Source code in dapi/db/accessor.py
def __init__(self):
    """Initialize the DatabaseAccessor with empty connection slots.

    Creates a dictionary to hold database connections for each configured
    database, but does not establish any connections until they are first accessed.
    """
    self._connections: Dict[str, Optional[DSDatabase]] = {
        key: None for key in db_config.keys()
    }
    print(
        "DatabaseAccessor initialized. Connections will be created on first access."
    )

ngl property

ngl

Access the NGL (Natural Hazards Engineering) database connection manager.

Provides access to the sjbrande_ngl_db database containing natural hazards engineering research data. The connection is created on first access.

RETURNS DESCRIPTION
DSDatabase

Connected database instance for the NGL database.

TYPE: DSDatabase

RAISES DESCRIPTION
Exception

If database connection fails during initialization.

Example

ngl_db = accessor.ngl df = ngl_db.read_sql("SELECT * FROM earthquake_data LIMIT 10")

vp property

vp

Access the VP (Vulnerability and Performance) database connection manager.

Provides access to the sjbrande_vpdb database containing vulnerability and performance analysis data. The connection is created on first access.

RETURNS DESCRIPTION
DSDatabase

Connected database instance for the VP database.

TYPE: DSDatabase

RAISES DESCRIPTION
Exception

If database connection fails during initialization.

Example

vp_db = accessor.vp df = vp_db.read_sql("SELECT * FROM vulnerability_models LIMIT 10")

eq property

eq

Access the EQ (Post-Earthquake Recovery) database connection manager.

Provides access to the post_earthquake_recovery database containing post-earthquake recovery research data. The connection is created on first access.

RETURNS DESCRIPTION
DSDatabase

Connected database instance for the EQ database.

TYPE: DSDatabase

RAISES DESCRIPTION
Exception

If database connection fails during initialization.

Example

eq_db = accessor.eq df = eq_db.read_sql("SELECT * FROM recovery_metrics LIMIT 10")

close_all

close_all()

Close all active database engines and their connection pools.

This method iterates through all database connections and properly closes their SQLAlchemy engines and connection pools. This should be called when the DatabaseAccessor is no longer needed to prevent connection leaks.

Note

After calling close_all(), accessing any database property will create new connections since the instances are reset to None.

Example

accessor = DatabaseAccessor() ngl_db = accessor.ngl # Creates connection vp_db = accessor.vp # Creates connection accessor.close_all() # Closes both connections Closing all active database engines/pools... Closing connection pool for database 'sjbrande_ngl_db'. Closing connection pool for database 'sjbrande_vpdb'. Closed 2 database engine(s).

Source code in dapi/db/accessor.py
def close_all(self):
    """Close all active database engines and their connection pools.

    This method iterates through all database connections and properly closes
    their SQLAlchemy engines and connection pools. This should be called when
    the DatabaseAccessor is no longer needed to prevent connection leaks.

    Note:
        After calling close_all(), accessing any database property will create
        new connections since the instances are reset to None.

    Example:
        >>> accessor = DatabaseAccessor()
        >>> ngl_db = accessor.ngl  # Creates connection
        >>> vp_db = accessor.vp    # Creates connection
        >>> accessor.close_all()   # Closes both connections
        Closing all active database engines/pools...
        Closing connection pool for database 'sjbrande_ngl_db'.
        Closing connection pool for database 'sjbrande_vpdb'.
        Closed 2 database engine(s).
    """
    print("Closing all active database engines/pools...")
    closed_count = 0
    for dbname, db_instance in self._connections.items():
        if db_instance is not None:
            try:
                # Call the close method on the DSDatabase instance
                db_instance.close()
                self._connections[
                    dbname
                ] = None  # Clear instance after closing engine
                closed_count += 1
            except Exception as e:
                print(f"Error closing engine for '{dbname}': {e}")
    if closed_count == 0:
        print("No active database engines to close.")
    else:
        print(f"Closed {closed_count} database engine(s).")

Database Engine

Manages connection and querying for a specific DesignSafe database.

This class provides a high-level interface for connecting to preconfigured DesignSafe databases using SQLAlchemy with connection pooling. It supports environment-based configuration and provides query results in multiple formats.

ATTRIBUTE DESCRIPTION
user

Database username for authentication.

TYPE: str

password

Database password for authentication.

TYPE: str

host

Database host address.

TYPE: str

port

Database port number.

TYPE: int

db

Name of the connected database.

TYPE: str

dbname_short

Shorthand name for the database.

TYPE: str

engine

SQLAlchemy engine for database connections.

TYPE: Engine

Session

Session factory for database operations.

TYPE: sessionmaker

Example

db = DSDatabase("ngl") df = db.read_sql("SELECT COUNT(*) as total FROM users") print(df.iloc[0]['total']) db.close()

Initialize the DSDatabase instance and create the SQLAlchemy engine.

Sets up database connection parameters from environment variables or defaults, creates a SQLAlchemy engine with connection pooling, and prepares the session factory.

PARAMETER DESCRIPTION
dbname

Shorthand name for the database to connect to. Must be a key in db_config. Defaults to "ngl". Available options: "ngl", "vp", "eq".

TYPE: str DEFAULT: 'ngl'

RAISES DESCRIPTION
ValueError

If dbname is not a valid configured database name.

SQLAlchemyError

If database engine creation or connection fails.

Example

db = DSDatabase("ngl") # Connect to NGL database db = DSDatabase("vp") # Connect to VP database

Source code in dapi/db/db.py
def __init__(self, dbname="ngl"):
    """Initialize the DSDatabase instance and create the SQLAlchemy engine.

    Sets up database connection parameters from environment variables or defaults,
    creates a SQLAlchemy engine with connection pooling, and prepares the session factory.

    Args:
        dbname (str, optional): Shorthand name for the database to connect to.
            Must be a key in db_config. Defaults to "ngl".
            Available options: "ngl", "vp", "eq".

    Raises:
        ValueError: If dbname is not a valid configured database name.
        SQLAlchemyError: If database engine creation or connection fails.

    Example:
        >>> db = DSDatabase("ngl")  # Connect to NGL database
        >>> db = DSDatabase("vp")   # Connect to VP database
    """
    if dbname not in db_config:
        raise ValueError(
            f"Invalid db shorthand '{dbname}'. Allowed: {', '.join(db_config.keys())}"
        )

    config = db_config[dbname]
    env_prefix = config["env_prefix"]

    self.user = os.getenv(f"{env_prefix}DB_USER", "dspublic")
    self.password = os.getenv(f"{env_prefix}DB_PASSWORD", "R3ad0nlY")
    self.host = os.getenv(f"{env_prefix}DB_HOST", "129.114.52.174")
    self.port = os.getenv(f"{env_prefix}DB_PORT", 3306)
    self.db = config["dbname"]
    self.dbname_short = dbname  # Store shorthand name for reference

    print(
        f"Creating SQLAlchemy engine for database '{self.db}' ({self.dbname_short})..."
    )
    # Setup the database connection engine with pooling
    self.engine = create_engine(
        f"mysql+pymysql://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}",
        pool_recycle=3600,  # Recycle connections older than 1 hour
        pool_pre_ping=True,  # Check connection validity before use
    )
    # Create a configured "Session" class
    self.Session = sessionmaker(bind=self.engine)
    print(f"Engine for '{self.dbname_short}' created.")

user instance-attribute

user = getenv(f'{env_prefix}DB_USER', 'dspublic')

password instance-attribute

password = getenv(f'{env_prefix}DB_PASSWORD', 'R3ad0nlY')

host instance-attribute

host = getenv(f'{env_prefix}DB_HOST', '129.114.52.174')

port instance-attribute

port = getenv(f'{env_prefix}DB_PORT', 3306)

db instance-attribute

db = config['dbname']

dbname_short instance-attribute

dbname_short = dbname

engine instance-attribute

engine = create_engine(
    f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}",
    pool_recycle=3600,
    pool_pre_ping=True,
)

Session instance-attribute

Session = sessionmaker(bind=engine)

read_sql

read_sql(sql, output_type='DataFrame')

Execute a SQL query using a dedicated session and return the results.

This method obtains a session from the connection pool, executes the provided SQL query, and returns results in the specified format. The session is automatically closed after execution, returning the connection to the pool.

PARAMETER DESCRIPTION
sql

The SQL query string to execute. Can be any valid SQL statement including SELECT, INSERT, UPDATE, DELETE, etc.

TYPE: str

output_type

Format for query results. Must be either 'DataFrame' for pandas.DataFrame or 'dict' for list of dictionaries. Defaults to "DataFrame".

TYPE: str DEFAULT: 'DataFrame'

RETURNS DESCRIPTION

pandas.DataFrame or List[Dict]: Query results in the requested format. - 'DataFrame': Returns a pandas DataFrame with column names as headers - 'dict': Returns a list of dictionaries where each dict represents a row

RAISES DESCRIPTION
ValueError

If sql is empty/None or output_type is not 'DataFrame' or 'dict'.

SQLAlchemyError

If database error occurs during query execution.

Exception

If unexpected errors occur during query processing.

Example

Get DataFrame result

df = db.read_sql("SELECT name, age FROM users WHERE age > 25") print(df.columns.tolist()) # ['name', 'age']

Get dictionary result

results = db.read_sql("SELECT COUNT(*) as total FROM users", output_type="dict") print(results[0]['total']) # 150

Source code in dapi/db/db.py
def read_sql(self, sql, output_type="DataFrame"):
    """Execute a SQL query using a dedicated session and return the results.

    This method obtains a session from the connection pool, executes the provided
    SQL query, and returns results in the specified format. The session is
    automatically closed after execution, returning the connection to the pool.

    Args:
        sql (str): The SQL query string to execute. Can be any valid SQL
            statement including SELECT, INSERT, UPDATE, DELETE, etc.
        output_type (str, optional): Format for query results. Must be either
            'DataFrame' for pandas.DataFrame or 'dict' for list of dictionaries.
            Defaults to "DataFrame".

    Returns:
        pandas.DataFrame or List[Dict]: Query results in the requested format.
            - 'DataFrame': Returns a pandas DataFrame with column names as headers
            - 'dict': Returns a list of dictionaries where each dict represents a row

    Raises:
        ValueError: If sql is empty/None or output_type is not 'DataFrame' or 'dict'.
        SQLAlchemyError: If database error occurs during query execution.
        Exception: If unexpected errors occur during query processing.

    Example:
        >>> # Get DataFrame result
        >>> df = db.read_sql("SELECT name, age FROM users WHERE age > 25")
        >>> print(df.columns.tolist())  # ['name', 'age']

        >>> # Get dictionary result
        >>> results = db.read_sql("SELECT COUNT(*) as total FROM users", output_type="dict")
        >>> print(results[0]['total'])  # 150
    """
    if not sql:
        raise ValueError("SQL query string is required")
    if output_type not in ["DataFrame", "dict"]:
        raise ValueError('Output type must be either "DataFrame" or "dict"')

    # Obtain a new session for this query
    session = self.Session()
    print(f"Executing query on '{self.dbname_short}'...")
    try:
        if output_type == "DataFrame":
            # pandas read_sql_query handles connection/session management implicitly sometimes,
            # but using the session explicitly ensures consistency.
            # Pass the engine bound to the session.
            return pd.read_sql_query(
                sql, session.bind.connect()
            )  # Get connection from engine
        else:
            sql_text = text(sql)
            # Execute within the session context
            result = session.execute(sql_text)
            # Fetch results before closing session
            data = [
                dict(row._mapping) for row in result
            ]  # Use ._mapping for modern SQLAlchemy
            return data
    except exc.SQLAlchemyError as e:
        print(f"SQLAlchemyError executing query on '{self.dbname_short}': {e}")
        raise  # Re-raise the exception
    except Exception as e:
        print(f"Unexpected error executing query on '{self.dbname_short}': {e}")
        raise
    finally:
        # Ensure the session is closed, returning the connection to the pool
        session.close()

close

close()

Dispose of the SQLAlchemy engine and close all database connections.

This method properly shuts down the database engine and its connection pool. It should be called when the database instance is no longer needed to prevent connection leaks and free up database resources.

Note

After calling close(), this DSDatabase instance should not be used for further database operations as the engine will be disposed.

Example

db = DSDatabase("ngl")

... perform database operations ...

db.close() Disposing engine and closing pool for 'ngl'... Engine for 'ngl' disposed.

Source code in dapi/db/db.py
def close(self):
    """Dispose of the SQLAlchemy engine and close all database connections.

    This method properly shuts down the database engine and its connection pool.
    It should be called when the database instance is no longer needed to
    prevent connection leaks and free up database resources.

    Note:
        After calling close(), this DSDatabase instance should not be used
        for further database operations as the engine will be disposed.

    Example:
        >>> db = DSDatabase("ngl")
        >>> # ... perform database operations ...
        >>> db.close()
        Disposing engine and closing pool for 'ngl'...
        Engine for 'ngl' disposed.
    """
    if self.engine:
        print(f"Disposing engine and closing pool for '{self.dbname_short}'...")
        self.engine.dispose()
        self.engine = None  # Mark as disposed
        print(f"Engine for '{self.dbname_short}' disposed.")
    else:
        print(f"Engine for '{self.dbname_short}' already disposed.")

Database Configuration

dict: Database configuration mapping.

Maps shorthand database names to their configuration details.

Keys
  • "ngl": Natural hazards engineering research database
  • "vp": Vulnerability and performance database
  • "eq": Post-earthquake recovery database
Each value contains
  • "dbname" (str): Actual database name in the MySQL server
  • "env_prefix" (str): Prefix for environment variables containing credentials
Environment Variable Pattern

For each database, the following environment variables are checked: - {env_prefix}DB_USER: Database username (default: "dspublic") - {env_prefix}DB_PASSWORD: Database password (default: "R3ad0nlY") - {env_prefix}DB_HOST: Database host (default: "129.114.52.174") - {env_prefix}DB_PORT: Database port (default: 3306)