Sunday, December 14, 2025

Oracle to PostgreSQL full Schema Migration using Python


- Uses oracledb (thin mode)

- Parallel data loading
- Handles PostgreSQL reserved keywords safely
- Repo-style single runnable script

USAGE:
  python oracle_to_postgres_migration.py
"""

import oracledb
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
from multiprocessing import Pool, cpu_count
from collections import defaultdict
import logging

# =========================
# CONFIGURATION
# =========================
ORACLE_CONFIG = {
    "user": "myoracleuser",
    "password": "****",
    "dsn": "localhost:1521/MYSERVICE",
    "schema": "myoracleuser" # You can name your own schema here
}

POSTGRES_CONFIG = {
    "host": "localhost",
    "database": "postgres",
    "user": "pbiuser",
    "password": "*****",
    "schema": "pbiuser"
}

PARALLEL_WORKERS = max(cpu_count() - 3, 1)
CHUNK_SIZE = 5000

# =========================
# LOGGING
# =========================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# =========================
# HELPERS
# =========================

def q(identifier: str) -> str:
    """Safely quote PostgreSQL identifiers"""
    return f'"{identifier}"'

# =========================
# CONNECTIONS
# =========================

def oracle_connection():
    return oracledb.connect(
        user=ORACLE_CONFIG['user'],
        password=ORACLE_CONFIG['password'],
        dsn=ORACLE_CONFIG['dsn']
    )


def postgres_connection():
    return psycopg2.connect(
        host=POSTGRES_CONFIG['host'],
        database=POSTGRES_CONFIG['database'],
        user=POSTGRES_CONFIG['user'],
        password=POSTGRES_CONFIG['password']
    )


def postgres_engine():
    return create_engine(
        f"postgresql+psycopg2://{POSTGRES_CONFIG['user']}:{POSTGRES_CONFIG['password']}"
        f"@{POSTGRES_CONFIG['host']}:5432/{POSTGRES_CONFIG['database']}"
    )

# =========================
# METADATA EXTRACTION
# =========================

def get_oracle_columns():
    conn = oracle_connection()
    cur = conn.cursor()
    cur.execute("""
        SELECT table_name, column_name, data_type,
               data_length, data_precision, data_scale, nullable
        FROM all_tab_columns
        WHERE owner = :schema
        ORDER BY table_name, column_id
    """, schema=ORACLE_CONFIG['schema'])
    rows = cur.fetchall()
    cur.close()
    conn.close()
    return rows


def map_oracle_to_pg(row):
    dtype, length, precision, scale = row[2], row[3], row[4], row[5]

    if dtype == 'NUMBER':
        return 'NUMERIC' if precision is None else f'NUMERIC({precision},{scale or 0})'
    if dtype in ('VARCHAR2', 'NVARCHAR2'):
        return f'VARCHAR({length})'
    if dtype in ('CHAR', 'NCHAR'):
        return f'CHAR({length})'
    if dtype == 'DATE' or dtype.startswith('TIMESTAMP'):
        return 'TIMESTAMP'
    if dtype == 'CLOB':
        return 'TEXT'
    if dtype == 'BLOB':
        return 'BYTEA'
    return 'TEXT'

# =========================
# SCHEMA & TABLE CREATION
# =========================

def create_pg_schema_and_tables():
    rows = get_oracle_columns()
    tables = defaultdict(list)

    for r in rows:
        table = r[0].lower()
        col = r[1].lower()
        nullable = '' if r[6] == 'Y' else 'NOT NULL'
        tables[table].append(f"{q(col)} {map_oracle_to_pg(r)} {nullable}")

    conn = postgres_connection()
    cur = conn.cursor()

    cur.execute(f"CREATE SCHEMA IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}")

    for table, cols in tables.items():
        ddl = f"""
        CREATE TABLE IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}.{q(table)} (
            {', '.join(cols)}
        );
        """
        cur.execute(ddl)

    conn.commit()
    cur.close()
    conn.close()
    logging.info("PostgreSQL schema & tables created")

# =========================
# DATA MIGRATION (PARALLEL)
# =========================

def migrate_single_table(table):
    try:
        oconn = oracle_connection()
        pg_engine_local = postgres_engine()

        logging.info(f"Migrating table: {table}")
        df = pd.read_sql(f"SELECT * FROM {table}", oconn)
        df.columns = [c.lower() for c in df.columns]

        df.to_sql(
            name=table.lower(),
            con=pg_engine_local,
            schema=POSTGRES_CONFIG['schema'],
            if_exists='append',
            index=False,
            chunksize=CHUNK_SIZE,
            method='multi'
        )
        oconn.close()
        logging.info(f"Completed table: {table}")

    except Exception as e:
        logging.error(f"Failed table {table}: {e}")


def parallel_data_migration():
    conn = oracle_connection()
    cur = conn.cursor()
    cur.execute("SELECT table_name FROM user_tables")
    tables = [r[0] for r in cur.fetchall()]
    cur.close()
    conn.close()

    with Pool(PARALLEL_WORKERS) as pool:
        pool.map(migrate_single_table, tables)

# =========================
# SEQUENCES
# =========================

def migrate_sequences():
    oconn = oracle_connection()
    ocur = oconn.cursor()
    ocur.execute("SELECT sequence_name, last_number FROM user_sequences")

    pconn = postgres_connection()
    pcur = pconn.cursor()

    for name, last_val in ocur.fetchall():
        pcur.execute(f"""
            CREATE SEQUENCE IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}.{q(name.lower())}
            START WITH {last_val};
        """)

    pconn.commit()
    ocur.close()
    pcur.close()
    oconn.close()
    pconn.close()
    logging.info("Sequences migrated")

# =========================
# MAIN
# =========================

if __name__ == '__main__':
    logging.info("Starting Oracle → PostgreSQL migration")
    create_pg_schema_and_tables()
    parallel_data_migration()
    migrate_sequences()
    logging.info("Migration completed successfully")

No comments:

Post a Comment