You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ClinicalTrialsDataProcessing/RxNav/migrate_mysql2pgsql.py

166 lines
6.3 KiB
Python

import psycopg2 as psyco
from psycopg2 import sql
from psycopg2 import extras
import pymysql
from dotenv import load_dotenv
import os
##############NOTE
'''
mariadb --mariadb.connect--> incrementally fetched dict --psycopg2--> postgres
I will have the ability to reduce memory usage and simplify what I am doing.
'''
########FUNCTIONS#################
def convert_column(d):
"""
Given the metadata about a column in mysql, make the portion of the `create table`
statement that corresponds to that column in postgres
"""
#extract
data_type = d["DATA_TYPE"]
position = d["ORDINAL_POSITION"]
table_name = d["TABLE_NAME"]
d["IS_NULLABLE"] = "NOT NULL" if d["IS_NULLABLE"] == "NO" else ""
#convert
if data_type=="varchar":
string = "{COLUMN_NAME} character varying({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
elif data_type=="char":
string = "{COLUMN_NAME} character({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
elif data_type=="tinyint":
string = "{COLUMN_NAME} smallint {IS_NULLABLE}".format(**d)
elif data_type=="decimal":
string = "{COLUMN_NAME} numeric({NUMERIC_PRECISION},{NUMERIC_SCALE}) {IS_NULLABLE}".format(**d)
elif data_type=="int":
string = "{COLUMN_NAME} integer {IS_NULLABLE}".format(**d)
elif data_type=="enum":
string = None
elif data_type=="text":
string = None
return string
if __name__ == "__main__":
#process environment variables
load_dotenv()
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWD = os.getenv("POSTGRES_PASSWD")
POSTGRES_PORT = int(os.getenv("POSTGRES_PORT"))
MARIADB_HOST = os.getenv("MARIADB_HOST")
MARIADB_DB = os.getenv("MARIADB_DB")
MARIADB_USER = os.getenv("MARIADB_USER")
MARIADB_PASSWD = os.getenv("MARIADB_PASSWD")
MARIADB_PORT = int(os.getenv("MARIADB_PORT"))
#get & convert datatypes for each table of interest
tables_of_interest = [
"rxnorm_props"
,"rxnorm_relations"
,"ALLNDC_HISTORY"
,"ALLRXCUI_HISTORY"
]
mschema="rxnorm_current"
pschema="rxnorm_migrated"
with pymysql.connect(
user=MARIADB_USER
,password=MARIADB_PASSWD
,host=MARIADB_HOST
,port=MARIADB_PORT
,database=MARIADB_DB
,cursorclass=pymysql.cursors.DictCursor
) as mcon, psyco.connect(
user=POSTGRES_USER
,password=POSTGRES_PASSWD
,host=POSTGRES_HOST
,port=POSTGRES_PORT
,database=POSTGRES_DB
) as pcon:
with mcon.cursor() as mcurse, pcon.cursor(cursor_factory=extras.DictCursor) as pcurse:
for table in tables_of_interest: #create equivalent table in postgres
#get columns from mysql
q = "SELECT * FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA=%s and TABLE_NAME=%s;"
mcurse.execute(q,[mschema,table])
#convert mysql column names and types to postgres column statements.
columns = [convert_column(a) for a in mcurse.fetchall() ]
#TODO make sure this uses psycopg colums correctly.
column_sql = sql.SQL(",\n".join(columns))
#build a header and footer
header=sql.SQL("CREATE TABLE IF NOT EXISTS {}\n(").format(sql.Identifier(pschema,table))
footer=sql.SQL(");")
#Joint the header, columns, and footer.
create_table_statement = sql.SQL("\n").join([header,column_sql,footer])
print(create_table_statement.as_string(pcon))
#Create the table in postgres
pcurse.execute(create_table_statement)
pcon.commit()
#check if tables already exist and have the proper size
#msize_check = 'select count(*) from {schema}.{table};'.format(schema=mschema,table=table)
#psize_check = 'select count(*) from {schema}.{table};'.format(schema=pschema,table=table)
#yes I am using an insecure way to build these^^^ statements.
#It shouldn't matter because if someone is changing this source to
#to harm your db, you've already lost.
#mcurse.execute(msize_check)
#pcurse.execute(psize_check)
#psize = pcurse.fetchall()[0][0]
#msize = mcurse.fetchall()[0]['count(*)']
#if psize > msize :
# #if they arn't the same, mention error and continue
# raise Exception("TABLE {} in postgres has more data than mysql".format(table))
# continue
#elif psize != 0:
# raise Exception("TABLE {} in postgres is not empty".format(table))
# continue
#Get the data from mysql
mcurse.execute("SELECT * FROM {schema}.{table}".format(schema=mschema,table=table))
#FIX setting up sql this^^^ way is improper.
a = mcurse.fetchall()
#build the insert statement template
#get list of field names
column_list = [sql.SQL(x) for x in a[0]]
column_inserts = [sql.SQL("%({})s".format(x)) for x in a[0]] #fix with sql.Placeholder
#generate insert statement
psql_insert = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s ").format(
table=sql.Identifier(pschema,table)
,columns=sql.SQL(",").join(column_list)
)
#Note that this^^^^ does not contain parenthases around the placeholder
#Building the values template.
#Note that it must include the parenthases so that the
#VALUES portion is formatted correctly.
template = sql.SQL(",").join(column_inserts)
template = sql.Composed([
sql.SQL("(")
,template
,sql.SQL(")")
])
#insert the data with page_size
extras.execute_values(pcurse,psql_insert,argslist=a,template=template, page_size=1000)