- Uses oracledb (thin mode)
- Parallel data loading
- Handles PostgreSQL reserved keywords safely
- Repo-style single runnable script
USAGE:
python oracle_to_postgres_migration.py
"""
import oracledb
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
from multiprocessing import Pool, cpu_count
from collections import defaultdict
import logging
# =========================
# CONFIGURATION
# =========================
ORACLE_CONFIG = {
"user": "myoracleuser",
"password": "****",
"dsn": "localhost:1521/MYSERVICE",
"schema": "myoracleuser" # You can name your own schema here
}
POSTGRES_CONFIG = {
"host": "localhost",
"database": "postgres",
"user": "pbiuser",
"password": "*****",
"schema": "pbiuser"
}
PARALLEL_WORKERS = max(cpu_count() - 3, 1)
CHUNK_SIZE = 5000
# =========================
# LOGGING
# =========================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# =========================
# HELPERS
# =========================
def q(identifier: str) -> str:
"""Safely quote PostgreSQL identifiers"""
return f'"{identifier}"'
# =========================
# CONNECTIONS
# =========================
def oracle_connection():
return oracledb.connect(
user=ORACLE_CONFIG['user'],
password=ORACLE_CONFIG['password'],
dsn=ORACLE_CONFIG['dsn']
)
def postgres_connection():
return psycopg2.connect(
host=POSTGRES_CONFIG['host'],
database=POSTGRES_CONFIG['database'],
user=POSTGRES_CONFIG['user'],
password=POSTGRES_CONFIG['password']
)
def postgres_engine():
return create_engine(
f"postgresql+psycopg2://{POSTGRES_CONFIG['user']}:{POSTGRES_CONFIG['password']}"
f"@{POSTGRES_CONFIG['host']}:5432/{POSTGRES_CONFIG['database']}"
)
# =========================
# METADATA EXTRACTION
# =========================
def get_oracle_columns():
conn = oracle_connection()
cur = conn.cursor()
cur.execute("""
SELECT table_name, column_name, data_type,
data_length, data_precision, data_scale, nullable
FROM all_tab_columns
WHERE owner = :schema
ORDER BY table_name, column_id
""", schema=ORACLE_CONFIG['schema'])
rows = cur.fetchall()
cur.close()
conn.close()
return rows
def map_oracle_to_pg(row):
dtype, length, precision, scale = row[2], row[3], row[4], row[5]
if dtype == 'NUMBER':
return 'NUMERIC' if precision is None else f'NUMERIC({precision},{scale or 0})'
if dtype in ('VARCHAR2', 'NVARCHAR2'):
return f'VARCHAR({length})'
if dtype in ('CHAR', 'NCHAR'):
return f'CHAR({length})'
if dtype == 'DATE' or dtype.startswith('TIMESTAMP'):
return 'TIMESTAMP'
if dtype == 'CLOB':
return 'TEXT'
if dtype == 'BLOB':
return 'BYTEA'
return 'TEXT'
# =========================
# SCHEMA & TABLE CREATION
# =========================
def create_pg_schema_and_tables():
rows = get_oracle_columns()
tables = defaultdict(list)
for r in rows:
table = r[0].lower()
col = r[1].lower()
nullable = '' if r[6] == 'Y' else 'NOT NULL'
tables[table].append(f"{q(col)} {map_oracle_to_pg(r)} {nullable}")
conn = postgres_connection()
cur = conn.cursor()
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}")
for table, cols in tables.items():
ddl = f"""
CREATE TABLE IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}.{q(table)} (
{', '.join(cols)}
);
"""
cur.execute(ddl)
conn.commit()
cur.close()
conn.close()
logging.info("PostgreSQL schema & tables created")
# =========================
# DATA MIGRATION (PARALLEL)
# =========================
def migrate_single_table(table):
try:
oconn = oracle_connection()
pg_engine_local = postgres_engine()
logging.info(f"Migrating table: {table}")
df = pd.read_sql(f"SELECT * FROM {table}", oconn)
df.columns = [c.lower() for c in df.columns]
df.to_sql(
name=table.lower(),
con=pg_engine_local,
schema=POSTGRES_CONFIG['schema'],
if_exists='append',
index=False,
chunksize=CHUNK_SIZE,
method='multi'
)
oconn.close()
logging.info(f"Completed table: {table}")
except Exception as e:
logging.error(f"Failed table {table}: {e}")
def parallel_data_migration():
conn = oracle_connection()
cur = conn.cursor()
cur.execute("SELECT table_name FROM user_tables")
tables = [r[0] for r in cur.fetchall()]
cur.close()
conn.close()
with Pool(PARALLEL_WORKERS) as pool:
pool.map(migrate_single_table, tables)
# =========================
# SEQUENCES
# =========================
def migrate_sequences():
oconn = oracle_connection()
ocur = oconn.cursor()
ocur.execute("SELECT sequence_name, last_number FROM user_sequences")
pconn = postgres_connection()
pcur = pconn.cursor()
for name, last_val in ocur.fetchall():
pcur.execute(f"""
CREATE SEQUENCE IF NOT EXISTS {q(POSTGRES_CONFIG['schema'])}.{q(name.lower())}
START WITH {last_val};
""")
pconn.commit()
ocur.close()
pcur.close()
oconn.close()
pconn.close()
logging.info("Sequences migrated")
# =========================
# MAIN
# =========================
if __name__ == '__main__':
logging.info("Starting Oracle → PostgreSQL migration")
create_pg_schema_and_tables()
parallel_data_migration()
migrate_sequences()
logging.info("Migration completed successfully")
No comments:
Post a Comment