"""
Oracle → PostgreSQL Full Schema Migration (ADVANCED)
- Uses oracledb (thin mode)
- Parallel data loading
- Uses COPY FROM STDIN (fast)
- Handles PostgreSQL reserved keywords safely
- Recreates PK, FK, INDEXES
- Row-count & checksum validation
USAGE:
python oracle_to_postgres_migration.py
"""
import oracledb
import psycopg2
import pandas as pd
import csv
import io
import hashlib
from multiprocessing import Pool, cpu_count
from collections import defaultdict
import logging
# =========================
# CONFIGURATION
# =========================
ORACLE_CONFIG = {
"user": "user1",
"password": "***",
"dsn": "localhost:1521/myservice",
"schema": "schema1"
}
POSTGRES_CONFIG = {
"host": "localhost",
"database": "postgres",
"user": "pbiuser",
"password": "****",
"schema": "pbiuser"
}
PARALLEL_WORKERS = max(cpu_count() - 3, 1)
CHUNK_SIZE = 50000
# =========================
# LOGGING
# =========================
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# =========================
# HELPERS
# =========================
def q(name):
return f'"{name}"'
# =========================
# CONNECTIONS
# =========================
def oracle_connection():
return oracledb.connect(user=ORACLE_CONFIG['user'], password=ORACLE_CONFIG['password'], dsn=ORACLE_CONFIG['dsn'])
def postgres_connection():
return psycopg2.connect(**{k: POSTGRES_CONFIG[k] for k in ('host','database','user','password')})
# =========================
# METADATA EXTRACTION
# =========================
def get_tables():
c = oracle_connection().cursor()
c.execute("SELECT table_name FROM user_tables")
return [r[0] for r in c.fetchall()]
def get_columns():
c = oracle_connection().cursor()
c.execute("""
SELECT table_name, column_name, data_type,
data_length, data_precision, data_scale, nullable
FROM all_tab_columns
WHERE owner = :s
ORDER BY table_name, column_id
""", s=ORACLE_CONFIG['schema'])
return c.fetchall()
def get_constraints():
c = oracle_connection().cursor()
c.execute("""
SELECT uc.constraint_name, uc.constraint_type, uc.table_name, ucc.column_name, uc.r_constraint_name
FROM user_constraints uc
JOIN user_cons_columns ucc ON uc.constraint_name = ucc.constraint_name
WHERE uc.constraint_type IN ('P','R')
ORDER BY uc.table_name, ucc.position
""")
return c.fetchall()
def get_indexes():
c = oracle_connection().cursor()
c.execute("""
SELECT index_name, table_name, column_name
FROM user_ind_columns
ORDER BY index_name, column_position
""")
return c.fetchall()
# =========================
# TYPE MAPPING
# =========================
def map_type(r):
t, l, p, s = r[2], r[3], r[4], r[5]
if t == 'NUMBER': return 'NUMERIC' if p is None else f'NUMERIC({p},{s or 0})'
if t in ('VARCHAR2','NVARCHAR2'): return f'VARCHAR({l})'
if t in ('CHAR','NCHAR'): return f'CHAR({l})'
if t in ('DATE',) or t.startswith('TIMESTAMP'): return 'TIMESTAMP'
if t == 'CLOB': return 'TEXT'
if t == 'BLOB': return 'BYTEA'
return 'TEXT'
# =========================
# CREATE TABLES
# =========================
def create_tables():
rows = get_columns()
tables = defaultdict(list)
for r in rows:
tables[r[0].lower()].append(f"{q(r[1].lower())} {map_type(r)} {'NOT NULL' if r[6]=='N' else ''}")
pg = postgres_connection()
cur = pg.cursor()
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}")
for t, cols in tables.items():
cur.execute(f"CREATE TABLE IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}.{q(t)} ({', '.join(cols)})")
pg.commit(); cur.close(); pg.close()
logging.info("Tables created")
# =========================
# COPY LOADER
# =========================
def _sanitize_row(row):
"""Convert Oracle-returned values (including LOBs) to COPY-safe strings"""
out = []
for v in row:
if v is None:
out.append('')
continue
# Oracle LOBs (CLOB, BLOB)
if isinstance(v, oracledb.LOB):
data = v.read()
if isinstance(data, bytes):
out.append('\\x' + data.hex()) # BYTEA-safe
else:
out.append(str(data))
continue
# RAW / BLOB / memoryview / bytearray
if isinstance(v, (bytes, bytearray, memoryview)):
out.append('\\x' + bytes(v).hex())
continue
# Safe fallback
out.append(str(v))
return out
def copy_table(table):
o = oracle_connection(); oc = o.cursor()
p = postgres_connection(); pc = p.cursor()
oc.execute(f"SELECT * FROM {table}")
cols = [d[0].lower() for d in oc.description]
buf = io.StringIO()
writer = csv.writer(buf, quoting=csv.QUOTE_MINIMAL)
while True:
rows = oc.fetchmany(CHUNK_SIZE)
if not rows:
break
for r in rows:
writer.writerow(_sanitize_row(r))
buf.seek(0)
pc.copy_expert(
f"COPY {q(POSTGRES_CONFIG['schema'])}.{q(table.lower())} ({', '.join(q(c) for c in cols)}) FROM STDIN WITH CSV",
buf
)
buf.truncate(0); buf.seek(0)
p.commit(); oc.close(); pc.close(); o.close(); p.close()
logging.info(f"Copied {table}")
def parallel_copy():
tables = get_tables()
with Pool(PARALLEL_WORKERS) as pool:
pool.map(copy_table, tables)
# =========================
# CONSTRAINTS & INDEXES
# =========================
def recreate_constraints():
data = get_constraints()
pg = postgres_connection(); c = pg.cursor()
pk = defaultdict(list)
fk = []
for name, typ, table, col, ref in data:
if typ == 'P': pk[table.lower()].append(col.lower())
if typ == 'R': fk.append((name, table.lower(), col.lower(), ref))
for t, cols in pk.items():
c.execute(f"ALTER TABLE {q(POSTGRES_CONFIG['schema'])}.{q(t)} ADD PRIMARY KEY ({', '.join(q(c) for c in cols)})")
pg.commit(); c.close(); pg.close()
logging.info("Primary keys created (FKs need manual mapping of referenced tables)")
def recreate_indexes():
data = get_indexes()
pg = postgres_connection(); c = pg.cursor()
idx = defaultdict(list)
for name, table, col in data:
idx[name.lower()].append((table.lower(), col.lower()))
for name, vals in idx.items():
table = vals[0][0]
cols = ', '.join(q(v[1]) for v in vals)
c.execute(f"CREATE INDEX IF NOT EXISTS {q(name)} ON {q(POSTGRES_CONFIG['schema'])}.{q(table)} ({cols})")
pg.commit(); c.close(); pg.close()
logging.info("Indexes created")
# =========================
# VALIDATION
# =========================
def checksum(conn, table):
c = conn.cursor()
c.execute(f"SELECT md5(string_agg(t::text,'')) FROM (SELECT * FROM {table}) t")
return c.fetchone()[0]
def validate():
o = oracle_connection(); p = postgres_connection()
oc = o.cursor(); pc = p.cursor()
for t in get_tables():
oc.execute(f"SELECT COUNT(*) FROM {t}")
pc.execute(f"SELECT COUNT(*) FROM {q(POSTGRES_CONFIG['schema'])}.{q(t.lower())}")
logging.info(f"{t} rows: Oracle={oc.fetchone()[0]} PG={pc.fetchone()[0]}")
oc.close(); pc.close(); o.close(); p.close()
# =========================
# MAIN
# =========================
if __name__ == '__main__':
logging.info("Starting advanced migration")
create_tables()
parallel_copy()
recreate_constraints()
recreate_indexes()
validate()
logging.info("Migration completed")
No comments:
Post a Comment