|
|
|
|
@ -43,7 +43,7 @@ def convert_column(d):
|
|
|
|
|
elif data_type=="decimal":
|
|
|
|
|
string = "{COLUMN_NAME} numeric({NUMERIC_PRECISION},{NUMERIC_SCALE}) {IS_NULLABLE}".format(**d)
|
|
|
|
|
elif data_type=="int":
|
|
|
|
|
string = "{COLUMN_NAME} integer {IS_NULLABLE},".format(**d)
|
|
|
|
|
string = "{COLUMN_NAME} integer {IS_NULLABLE}".format(**d)
|
|
|
|
|
elif data_type=="enum":
|
|
|
|
|
string = None
|
|
|
|
|
elif data_type=="text":
|
|
|
|
|
@ -67,7 +67,12 @@ if __name__ == "__main__":
|
|
|
|
|
MARIADB_PORT = int(os.getenv("MARIADB_PORT"))
|
|
|
|
|
|
|
|
|
|
#get & convert datatypes for each table of interest
|
|
|
|
|
tables_of_interest = ["rxnorm_props","rxnorm_relations"]
|
|
|
|
|
tables_of_interest = [
|
|
|
|
|
"rxnorm_props"
|
|
|
|
|
,"rxnorm_relations"
|
|
|
|
|
,"ALLNDC_HISTORY"
|
|
|
|
|
,"ALLRXCUI_HISTORY"
|
|
|
|
|
]
|
|
|
|
|
mschema="rxnorm_current"
|
|
|
|
|
pschema="rxnorm_migrated"
|
|
|
|
|
|
|
|
|
|
@ -88,39 +93,65 @@ if __name__ == "__main__":
|
|
|
|
|
) as pcon:
|
|
|
|
|
with mcon.cursor() as mcurse, pcon.cursor(cursor_factory=extras.DictCursor) as pcurse:
|
|
|
|
|
for table in tables_of_interest: #create equivalent table in postgres
|
|
|
|
|
|
|
|
|
|
#get columns from mysql
|
|
|
|
|
q = "SELECT * FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA=%s and TABLE_NAME=%s;"
|
|
|
|
|
mcurse.execute(q,[mschema,table])
|
|
|
|
|
|
|
|
|
|
columns = [convert_column(a) for a in mcurse.fetchall() ]
|
|
|
|
|
#convert mysql column names and types to postgres column statements.
|
|
|
|
|
columns = [convert_column(a) for a in mcurse.fetchall() ]
|
|
|
|
|
#TODO make sure this uses psycopg colums correctly.
|
|
|
|
|
column_sql = sql.SQL(",\n".join(columns))
|
|
|
|
|
|
|
|
|
|
#create a header and footer
|
|
|
|
|
#build a header and footer
|
|
|
|
|
header=sql.SQL("CREATE TABLE IF NOT EXISTS {}\n(").format(sql.Identifier(pschema,table))
|
|
|
|
|
footer=sql.SQL(");")
|
|
|
|
|
|
|
|
|
|
#CREATE TABLE
|
|
|
|
|
#Joint the header, columns, and footer.
|
|
|
|
|
create_table_statement = sql.SQL("\n").join([header,column_sql,footer])
|
|
|
|
|
#print(create_table_statement.as_string(pcon))
|
|
|
|
|
print(create_table_statement.as_string(pcon))
|
|
|
|
|
|
|
|
|
|
#Create the table in postgres
|
|
|
|
|
pcurse.execute(create_table_statement)
|
|
|
|
|
pcon.commit() #commit the new table as they are done.
|
|
|
|
|
pcon.commit()
|
|
|
|
|
|
|
|
|
|
#FIX below uses a poor approach, need to change to use the parameters approach.
|
|
|
|
|
#check if tables already exist and have the proper size
|
|
|
|
|
#msize_check = 'select count(*) from {schema}.{table};'.format(schema=mschema,table=table)
|
|
|
|
|
#psize_check = 'select count(*) from {schema}.{table};'.format(schema=pschema,table=table)
|
|
|
|
|
#yes I am using an insecure way to build these^^^ statements.
|
|
|
|
|
#It shouldn't matter because if someone is changing this source to
|
|
|
|
|
#to harm your db, you've already lost.
|
|
|
|
|
#mcurse.execute(msize_check)
|
|
|
|
|
#pcurse.execute(psize_check)
|
|
|
|
|
|
|
|
|
|
#psize = pcurse.fetchall()[0][0]
|
|
|
|
|
#msize = mcurse.fetchall()[0]['count(*)']
|
|
|
|
|
|
|
|
|
|
#if psize > msize :
|
|
|
|
|
# #if they arn't the same, mention error and continue
|
|
|
|
|
# raise Exception("TABLE {} in postgres has more data than mysql".format(table))
|
|
|
|
|
# continue
|
|
|
|
|
#elif psize != 0:
|
|
|
|
|
# raise Exception("TABLE {} in postgres is not empty".format(table))
|
|
|
|
|
# continue
|
|
|
|
|
|
|
|
|
|
#Get the data from mysql
|
|
|
|
|
mcurse.execute("SELECT * FROM {schema}.{table}".format(schema=mschema,table=table))
|
|
|
|
|
#FIX setting up sql this^^^ way is improper.
|
|
|
|
|
a = mcurse.fetchall()
|
|
|
|
|
|
|
|
|
|
#get list of field names and build the appropriate
|
|
|
|
|
#build the insert statement template
|
|
|
|
|
#get list of field names
|
|
|
|
|
column_list = [sql.SQL(x) for x in a[0]]
|
|
|
|
|
column_inserts = [sql.SQL("%({})s".format(x)) for x in a[0]] #fix with sql.Placeholder
|
|
|
|
|
#print(column_inserts)
|
|
|
|
|
|
|
|
|
|
#Building the sql
|
|
|
|
|
#generate insert statement
|
|
|
|
|
psql_insert = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s ").format(
|
|
|
|
|
table=sql.Identifier(pschema,table)
|
|
|
|
|
,columns=sql.SQL(",").join(column_list)
|
|
|
|
|
)
|
|
|
|
|
#Note that this does not contain parenthases around the placeholder
|
|
|
|
|
#Note that this^^^^ does not contain parenthases around the placeholder
|
|
|
|
|
|
|
|
|
|
#Building the template.
|
|
|
|
|
#Building the values template.
|
|
|
|
|
#Note that it must include the parenthases so that the
|
|
|
|
|
#VALUES portion is formatted correctly.
|
|
|
|
|
template = sql.SQL(",").join(column_inserts)
|
|
|
|
|
@ -132,11 +163,6 @@ if __name__ == "__main__":
|
|
|
|
|
|
|
|
|
|
#insert the data with page_size
|
|
|
|
|
extras.execute_values(pcurse,psql_insert,argslist=a,template=template, page_size=1000)
|
|
|
|
|
"""
|
|
|
|
|
ISSUE HERE ^^^^^ somehow execute values isn't separating over dictionaries very well
|
|
|
|
|
https://www.psycopg.org/docs/extras.html#psycopg2.extras.execute_batch
|
|
|
|
|
maybe replace with execute_batch?
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|