Sunday, December 14, 2025

Oracle to Postgresql Full Schema migration using python script

 

"""

Oracle → PostgreSQL Full Schema Migration (ADVANCED)
- Uses oracledb (thin mode)
- Parallel data loading
- Uses COPY FROM STDIN (fast)
- Handles PostgreSQL reserved keywords safely
- Recreates PK, FK, INDEXES
- Row-count & checksum validation

USAGE:
  python oracle_to_postgres_migration.py
"""

import oracledb
import psycopg2
import pandas as pd
import csv
import io
import hashlib
from multiprocessing import Pool, cpu_count
from collections import defaultdict
import logging

# =========================
# CONFIGURATION
# =========================
ORACLE_CONFIG = {
    "user": "user1",
    "password": "***",
    "dsn": "localhost:1521/myservice",
    "schema": "schema1"
}

POSTGRES_CONFIG = {
    "host": "localhost",
    "database": "postgres",
    "user": "pbiuser",
    "password": "****",
    "schema": "pbiuser"
}

PARALLEL_WORKERS = max(cpu_count() - 3, 1)
CHUNK_SIZE = 50000

# =========================
# LOGGING
# =========================
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

# =========================
# HELPERS
# =========================

def q(name):
    return f'"{name}"'

# =========================
# CONNECTIONS
# =========================

def oracle_connection():
    return oracledb.connect(user=ORACLE_CONFIG['user'], password=ORACLE_CONFIG['password'], dsn=ORACLE_CONFIG['dsn'])


def postgres_connection():
    return psycopg2.connect(**{k: POSTGRES_CONFIG[k] for k in ('host','database','user','password')})

# =========================
# METADATA EXTRACTION
# =========================

def get_tables():
    c = oracle_connection().cursor()
    c.execute("SELECT table_name FROM user_tables")
    return [r[0] for r in c.fetchall()]


def get_columns():
    c = oracle_connection().cursor()
    c.execute("""
        SELECT table_name, column_name, data_type,
               data_length, data_precision, data_scale, nullable
        FROM all_tab_columns
        WHERE owner = :s
        ORDER BY table_name, column_id
    """, s=ORACLE_CONFIG['schema'])
    return c.fetchall()


def get_constraints():
    c = oracle_connection().cursor()
    c.execute("""
        SELECT uc.constraint_name, uc.constraint_type, uc.table_name, ucc.column_name, uc.r_constraint_name
        FROM user_constraints uc
        JOIN user_cons_columns ucc ON uc.constraint_name = ucc.constraint_name
        WHERE uc.constraint_type IN ('P','R')
        ORDER BY uc.table_name, ucc.position
    """)
    return c.fetchall()


def get_indexes():
    c = oracle_connection().cursor()
    c.execute("""
        SELECT index_name, table_name, column_name
        FROM user_ind_columns
        ORDER BY index_name, column_position
    """)
    return c.fetchall()

# =========================
# TYPE MAPPING
# =========================

def map_type(r):
    t, l, p, s = r[2], r[3], r[4], r[5]
    if t == 'NUMBER': return 'NUMERIC' if p is None else f'NUMERIC({p},{s or 0})'
    if t in ('VARCHAR2','NVARCHAR2'): return f'VARCHAR({l})'
    if t in ('CHAR','NCHAR'): return f'CHAR({l})'
    if t in ('DATE',) or t.startswith('TIMESTAMP'): return 'TIMESTAMP'
    if t == 'CLOB': return 'TEXT'
    if t == 'BLOB': return 'BYTEA'
    return 'TEXT'

# =========================
# CREATE TABLES
# =========================

def create_tables():
    rows = get_columns()
    tables = defaultdict(list)
    for r in rows:
        tables[r[0].lower()].append(f"{q(r[1].lower())} {map_type(r)} {'NOT NULL' if r[6]=='N' else ''}")

    pg = postgres_connection()
    cur = pg.cursor()
    cur.execute(f"CREATE SCHEMA IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}")

    for t, cols in tables.items():
        cur.execute(f"CREATE TABLE IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}.{q(t)} ({', '.join(cols)})")

    pg.commit(); cur.close(); pg.close()
    logging.info("Tables created")

# =========================
# COPY LOADER
# =========================

def _sanitize_row(row):
    """Convert Oracle-returned values (including LOBs) to COPY-safe strings"""
    out = []
    for v in row:
        if v is None:
            out.append('')
            continue

        # Oracle LOBs (CLOB, BLOB)
        if isinstance(v, oracledb.LOB):
            data = v.read()
            if isinstance(data, bytes):
                out.append('\\x' + data.hex())  # BYTEA-safe
            else:
                out.append(str(data))
            continue

        # RAW / BLOB / memoryview / bytearray
        if isinstance(v, (bytes, bytearray, memoryview)):
            out.append('\\x' + bytes(v).hex())
            continue

        # Safe fallback
        out.append(str(v))

    return out



def copy_table(table):
    o = oracle_connection(); oc = o.cursor()
    p = postgres_connection(); pc = p.cursor()

    oc.execute(f"SELECT * FROM {table}")
    cols = [d[0].lower() for d in oc.description]

    buf = io.StringIO()
    writer = csv.writer(buf, quoting=csv.QUOTE_MINIMAL)

    while True:
        rows = oc.fetchmany(CHUNK_SIZE)
        if not rows:
            break
        for r in rows:
            writer.writerow(_sanitize_row(r))
        buf.seek(0)
        pc.copy_expert(
            f"COPY {q(POSTGRES_CONFIG['schema'])}.{q(table.lower())} ({', '.join(q(c) for c in cols)}) FROM STDIN WITH CSV",
            buf
        )
        buf.truncate(0); buf.seek(0)

    p.commit(); oc.close(); pc.close(); o.close(); p.close()
    logging.info(f"Copied {table}")


def parallel_copy():
    tables = get_tables()
    with Pool(PARALLEL_WORKERS) as pool:
        pool.map(copy_table, tables)

# =========================
# CONSTRAINTS & INDEXES
# =========================

def recreate_constraints():
    data = get_constraints()
    pg = postgres_connection(); c = pg.cursor()

    pk = defaultdict(list)
    fk = []
    for name, typ, table, col, ref in data:
        if typ == 'P': pk[table.lower()].append(col.lower())
        if typ == 'R': fk.append((name, table.lower(), col.lower(), ref))

    for t, cols in pk.items():
        c.execute(f"ALTER TABLE {q(POSTGRES_CONFIG['schema'])}.{q(t)} ADD PRIMARY KEY ({', '.join(q(c) for c in cols)})")

    pg.commit(); c.close(); pg.close()
    logging.info("Primary keys created (FKs need manual mapping of referenced tables)")


def recreate_indexes():
    data = get_indexes()
    pg = postgres_connection(); c = pg.cursor()
    idx = defaultdict(list)

    for name, table, col in data:
        idx[name.lower()].append((table.lower(), col.lower()))

    for name, vals in idx.items():
        table = vals[0][0]
        cols = ', '.join(q(v[1]) for v in vals)
        c.execute(f"CREATE INDEX IF NOT EXISTS {q(name)} ON {q(POSTGRES_CONFIG['schema'])}.{q(table)} ({cols})")

    pg.commit(); c.close(); pg.close()
    logging.info("Indexes created")

# =========================
# VALIDATION
# =========================

def checksum(conn, table):
    c = conn.cursor()
    c.execute(f"SELECT md5(string_agg(t::text,'')) FROM (SELECT * FROM {table}) t")
    return c.fetchone()[0]


def validate():
    o = oracle_connection(); p = postgres_connection()
    oc = o.cursor(); pc = p.cursor()

    for t in get_tables():
        oc.execute(f"SELECT COUNT(*) FROM {t}")
        pc.execute(f"SELECT COUNT(*) FROM {q(POSTGRES_CONFIG['schema'])}.{q(t.lower())}")
        logging.info(f"{t} rows: Oracle={oc.fetchone()[0]} PG={pc.fetchone()[0]}")

    oc.close(); pc.close(); o.close(); p.close()

# =========================
# MAIN
# =========================

if __name__ == '__main__':
    logging.info("Starting advanced migration")
    create_tables()
    parallel_copy()
    recreate_constraints()
    recreate_indexes()
    validate()
    logging.info("Migration completed")

No comments:

Post a Comment