Problem Statement
In many data-driven environments, we often receive data in CSV format from upstream systems. The challenge lies not just in loading this data into a database, but doing so intelligently:
- How can we insert new records without duplicating existing ones?
- How do we handle updates to existing records?
- How can we track what changed in each load?
- Can we maintain logs for auditing?
This sounds simple — but if not handled properly, it can lead to:
- Duplicate data
- Inconsistent reports
- Untraceable changes
Different Approaches Explored
Approach 1: Direct Insert into Main Table
Pros
- Very simple and fast to implement
Cons
- Cannot detect duplicates
- Cannot update existing records
- No logging or traceability
Approach 2: Truncate & Reload
Pros
- Easy to implement
- Always has the latest data
Cons
- Risk of data loss if the file is incomplete or corrupted
- No history or audit trail
- Cannot be used for transactional or incremental systems
Approach 3: Row-by-Row UPSERT Logic in Python
Pros
- Accurate handling of insert/update logic
Cons
- Very slow for large files (due to per-row SQL)
- Error-prone with schema mismatches
- Difficult to maintain
Final Approach (My Solution): Staging → Merge → Log
After exploring the above methods, a robust, scalable, and auditable solution was designed using Python and Oracle SQL.
Key Features of This Approach
-
Staging Table
- Load data into a temporary (staging) table first
-
MERGE Statement
- Compare staging and main table using MERGE (UPSERT) logic
-
Audit Logging
- Insert metadata into a log table (RECORD_HIST_TEST) after each run
-
Unique Key Matching
- Ensures updates are only done where records match
-
Timestamped Logging
- Every load is traceable by datetime and filename
Step 1: Read CSV & Normalize
df = pd.read_csv(csv_file_path, encoding='latin1')
df.columns = [col.upper().strip() for col in df.columns]
Ensures column names are uppercase (Oracle is case-insensitive).
Step 2: Insert into Staging Table
insert_sql = f'INSERT INTO {staging_table} (...) VALUES (...)'
cur.executemany(insert_sql, data)Efficiently loads all data into a staging table.
Step 3: Merge into Main Table
MERGE INTO PATIENT_VISITS tgt
USING STG_PATIENT_VISITS stg
ON (tgt.VISIT_ID = stg.VISIT_ID)
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...This SQL handles both updates and inserts. Ensures only new or updated records are touched.
Step 4: Insert into Log Table
INSERT INTO RECORD_HIST_TEST (table_name, schema_name, row_count, run_time, refresh_flag, new_records_count, file_name)
VALUES (:1, :2, :3, :4, :5, :6, :7)
Compares previous row count. Logs number of new records, timestamp, file name, and refresh flag.
Conclusion
While inserting data from CSV into a database might seem simple at first, doing it right requires handling edge cases like duplicates, updates, and change tracking. By building a staging + merge + log pipeline in Python, we create a reliable, auditable, and scalable data ingestion framework.
Full Python Script:
import pandas as pd
from db_config import conn
from datetime import datetime
import os
# Configuration
csv_file_path = 'csvfiles/patient_visits.csv'
schema_name = 'HEALTH_MART'
table_name = 'PATIENT_VISITS'
staging_table = 'STG_PATIENT_VISITS'
unique_keys = ['VISIT_ID'] # list of unique keys
run_time = datetime.now()
timestamped_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
# Helper Functions
def table_exists(conn, schema, table):
cur = conn.cursor()
query = """
SELECT COUNT(*) FROM all_tables
WHERE owner = :1 AND table_name = :2
"""
cur.execute(query, (schema.upper(), table.upper()))
exists = cur.fetchone()[0] > 0
cur.close()
return exists
def get_table_data(conn, table):
cur = conn.cursor()
query = f"SELECT COUNT(*) FROM {table}"
cur.execute(query)
count = cur.fetchone()[0]
cur.close()
return count
def read_csv_file(csv_file_path):
print(f"{timestamped_message} - Reading CSV file")
df = pd.read_csv(csv_file_path, encoding='latin1')
df.columns = [col.upper().strip() for col in df.columns]
return df
def insert_or_merge_data(df, conn, schema, table, staging_table, unique_keys):
cur = conn.cursor()
df.columns = [col.upper().strip() for col in df.columns]
unique_keys = [key.upper().strip() for key in unique_keys]
columns = df.columns.tolist()
# Insert into staging table
placeholders = ', '.join([f':{i+1}' for i in range(len(columns))])
col_names = ', '.join([f'"{col}"' for col in columns])
insert_sql = f'INSERT INTO {staging_table} ({col_names}) VALUES ({placeholders})'
data = [tuple(row) for row in df.itertuples(index=False, name=None)]
print(f"{timestamped_message} - Inserting {len(data)} rows into staging table {staging_table}")
cur.executemany(insert_sql, data)
# Count new records in staging not in main table
count_new_sql = f"""
SELECT COUNT(*) FROM {staging_table} stg
WHERE NOT EXISTS (
SELECT 1 FROM {schema}.{table} tgt
WHERE {' AND '.join([f'tgt."{key}" = stg."{key}"' for key in unique_keys])}
)
"""
cur.execute(count_new_sql)
new_records_count = cur.fetchone()[0]
print(f"{timestamped_message} - Number of new records: {new_records_count}")
# Merge data from staging to main table
merge_on = ' AND '.join([f'tgt."{key}" = stg."{key}"' for key in unique_keys])
update_set = ', '.join([f'tgt."{col}" = stg."{col}"' for col in columns if col not in unique_keys])
insert_cols = ', '.join([f'"{col}"' for col in columns])
insert_vals = ', '.join([f'stg."{col}"' for col in columns])
merge_sql = f"""
MERGE INTO {schema}.{table} tgt
USING {staging_table} stg
ON ({merge_on})
WHEN MATCHED THEN
UPDATE SET {update_set}
WHEN NOT MATCHED THEN
INSERT ({insert_cols}) VALUES ({insert_vals})
"""
print(f"{timestamped_message} - Merging data into {schema}.{table}")
cur.execute(merge_sql)
conn.commit()
cur.close()
print(f"{timestamped_message} - Merge completed successfully")
return new_records_count
def insert_log(conn, table_name, schema_name, row_count, run_time, new_records_count, file_name):
cur = conn.cursor()
select_query = """
SELECT row_count FROM RECORD_HIST_TEST
WHERE table_name = :1 AND schema_name = :2
ORDER BY run_time DESC FETCH FIRST 1 ROWS ONLY
"""
cur.execute(select_query, (table_name, schema_name))
previous = cur.fetchone()
if previous and previous[0] == row_count:
refresh_flag = 'N'
else:
refresh_flag = 'Y'
insert_query = """
INSERT INTO RECORD_HIST_TEST (table_name, schema_name, row_count, run_time, refresh_flag, new_records_count, file_name)
VALUES (:1, :2, :3, :4, :5, :6, :7)
"""
cur.execute(insert_query, (table_name, schema_name, row_count, run_time, refresh_flag, new_records_count, file_name))
conn.commit()
cur.close()
# Main execution
if not table_exists(conn, schema_name, table_name) or not os.path.exists(csv_file_path):
print(f"{timestamped_message} - Table {table_name} or CSV file not found in schema {schema_name}")
else:
print(f"{timestamped_message} - Table {table_name} found in schema {schema_name}")
df = read_csv_file(csv_file_path)
file_name = os.path.basename(csv_file_path)
if not table_exists(conn, schema_name, staging_table):
print(f"{timestamped_message} - Staging table {staging_table} does not exist. Please create it first.")
else:
new_records_count = insert_or_merge_data(df, conn, schema_name, table_name, staging_table, unique_keys)
count = get_table_data(conn, table_name)
print(f"{timestamped_message} - Total rows in {table_name}: {count}")
insert_log(conn, table_name, schema_name, count, run_time, new_records_count, file_name)
print(f"{timestamped_message} - Log inserted. New records: {new_records_count}")
conn.close()