import psycopg2 as psyco from psycopg2 import sql from psycopg2 import extras import pymysql from dotenv import load_dotenv import os from drugtools.env_setup import postgres_conn, mariadb_conn, get_tables_of_interest ##############NOTE ''' mariadb --mariadb.connect--> incrementally fetched dict --psycopg2--> postgres I will have the ability to reduce memory usage and simplify what I am doing. ''' ############### GLOBALS #these are hardcoded so they shouldn't require any updates mschema="rxnorm_current" pschema="rxnorm_migrated" ########FUNCTIONS################# def convert_column(d): """ Given the metadata about a column in mysql, make the portion of the `create table` statement that corresponds to that column in postgres """ #extract data_type = d["DATA_TYPE"] position = d["ORDINAL_POSITION"] table_name = d["TABLE_NAME"] d["IS_NULLABLE"] = "NOT NULL" if d["IS_NULLABLE"] == "NO" else "" #convert if data_type=="varchar": string = "{COLUMN_NAME} character varying({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d) elif data_type=="char": string = "{COLUMN_NAME} character({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d) elif data_type=="tinyint": string = "{COLUMN_NAME} smallint {IS_NULLABLE}".format(**d) elif data_type=="decimal": string = "{COLUMN_NAME} numeric({NUMERIC_PRECISION},{NUMERIC_SCALE}) {IS_NULLABLE}".format(**d) elif data_type=="int": string = "{COLUMN_NAME} integer {IS_NULLABLE}".format(**d) elif data_type=="enum": string = None elif data_type=="text": string = None return string def run(): #get & convert datatypes for each table of interest tables_of_interest = get_tables_of_interest() with mariadb_conn(cursorclass=pymysql.cursors.DictCursor) as mcon, postgres_conn() as pcon: with mcon.cursor() as mcurse, pcon.cursor(cursor_factory=extras.DictCursor) as pcurse: for table in tables_of_interest: #create equivalent table in postgres #get columns from mysql q = "SELECT * FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA=%s and TABLE_NAME=%s;" mcurse.execute(q,[mschema,table]) #convert mysql column names and types to postgres column statements. columns = [convert_column(a) for a in mcurse.fetchall() ] #TODO make sure this uses psycopg colums correctly. column_sql = sql.SQL(",\n".join(columns)) #build a header and footer header=sql.SQL("CREATE TABLE IF NOT EXISTS {}\n(").format(sql.Identifier(pschema,table)) footer=sql.SQL(");") #Joint the header, columns, and footer. create_table_statement = sql.SQL("\n").join([header,column_sql,footer]) print(create_table_statement.as_string(pcon)) #Create the table in postgres pcurse.execute(create_table_statement) pcon.commit() #Get the data from mysql mcurse.execute("SELECT * FROM {schema}.{table}".format(schema=mschema,table=table)) #FIX setting up sql this^^^ way is improper. results = mcurse.fetchall() #build the insert statement template #get list of field names column_list = [sql.SQL(x) for x in results[0]] column_inserts = [sql.SQL("%({})s".format(x)) for x in results[0]] #fix with sql.Placeholder #generate insert statement psql_insert = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s ").format( table=sql.Identifier(pschema,table) ,columns=sql.SQL(",").join(column_list) ) #Note that this^^^^ does not contain parenthases around the placeholder #Building the values template. #Note that it must include the parenthases so that the #VALUES portion is formatted correctly. template = sql.SQL(",").join(column_inserts) template = sql.Composed([ sql.SQL("(") ,template ,sql.SQL(")") ]) #insert the data with page_size extras.execute_values(pcurse,psql_insert,argslist=results,template=template, page_size=1000) if __name__ == "__main__": run()