You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ClinicalTrialsDataProcessing/RxMix/migrate_rxnav.py

148 lines
4.1 KiB
Python

import psycopg2 as psyco
import pymysql
from dotenv import load_dotenv
import os
##############NOTE
'''
mariadb --mariadb.connect--> incrementally fetched dict --psycopg2--> postgres
I will have the ability to reduce memory usage and simplify what I am doing.
'''
####################CONSTANTS#################################
#SPLIT_RE = re.compile("(\w+)(\((\d+)\))?")
###################QUERIES#########################
QUERY_columns_from_Information_Schema = """
SELECT *
FROM INFORMATION_SCHEMA.columns
WHERE
TABLE_SCHEMA=%s
and
TABLE_NAME=%s
;
"""
QUERY_data_from_table = "SELECT * FROM {schema}.{table} limit 10"
########FUNCTIONS#################
def convert_column(d):
"""
Given the metadata about a column in mysql, make the portion of the `create table`
statement that corresponds to that column in postgres
"""
#extract
data_type = d["DATA_TYPE"]
position = d["ORDINAL_POSITION"]
table_name = d["TABLE_NAME"]
d["IS_NULLABLE"] = "NOT NULL" if d["IS_NULLABLE"] == "NO" else ""
#convert
if data_type=="varchar":
string = "{COLUMN_NAME} character varying({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
elif data_type=="char":
string = "{COLUMN_NAME} character({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
elif data_type=="tinyint":
string = "{COLUMN_NAME} smallint {IS_NULLABLE}".format(**d)
elif data_type=="decimal":
string = "{COLUMN_NAME} numeric({NUMERIC_PRECISION},{NUMERIC_SCALE}) {IS_NULLABLE}".format(**d)
elif data_type=="int":
string = "{COLUMN_NAME} integer {IS_NULLABLE},".format(**d)
elif data_type=="enum":
string = None
elif data_type=="text":
string = None
return string
if __name__ == "__main__":
#process environment variables
load_dotenv()
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWD = os.getenv("POSTGRES_PASSWD")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
MARIADB_HOST = os.getenv("MARIADB_HOST")
MARIADB_DB = os.getenv("MARIADB_DB")
MARIADB_USER = os.getenv("MARIADB_USER")
MARIADB_PASSWD = os.getenv("MARIADB_PASSWD")
MARIADB_PORT = os.getenv("MARIADB_PORT")
#get & convert datatypes for each table of interest
tables_of_interest = [
"rxnorm_props"
,"rxnorm_relations"
,"ALLNDC_HISTORY"
,"ALLRXCUI_HISTORY"
]
mschema="rxnorm_current"
pschema="rxnorm_migrated"
with pymysql.connect(
user=MARIADB_USER
,password=MARIADB_PASSWD
,host=MARIADB_HOST
,port=MARIADB_PORT
,database=MARIADB_DB
,cursorclass=pymysql.cursors.DictCursor
) as mcon, psyco.connect(
user=POSTGRES_USER
,password=POSTGRES_PASSWD
,host=POSTGRES_HOST
,port=POSTGRES_PORT
,database=POSTGRES_DB
) as pcon:
with mcon.cursor() as mcurse, pcon.cursor() as pcurse:
for table in tables_of_interest: #create equivalent table in postgres
continue
q = QUERY_columns_from_Information_Schema
mcurse.execute(q,[mschema,table])
columns = [convert_column(a) for a in mcurse.fetchall() ]
column_sql = ",\n".join(columns)
#create a header and footer
header="CREATE TABLE IF NOT EXISTS {schema}.{table_name}\n(".format(schema=pschema, table_name=table)
footer=");"
#CREATE TABLE
create_table_statement = "\n".join([header,column_sql,footer])
pcurse.execute(create_table_statement)
#extract data from mysql
#
with mcon.cursor() as mcurse, pcon.cursor() as pcurse:
for table in tables_of_interest:
mcurse.execute("select * from rxnorm_current.{table} limit 10".format(table=table))
print(mcurse.fetchone())