import psycopg2 as psyco import pymysql from dotenv import load_dotenv import os ##############NOTE ''' mariadb --mariadb.connect--> incrementally fetched dict --psycopg2--> postgres I will have the ability to reduce memory usage and simplify what I am doing. ''' ####################CONSTANTS################################# #SPLIT_RE = re.compile("(\w+)(\((\d+)\))?") ###################QUERIES######################### QUERY_columns_from_Information_Schema = """ SELECT * FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA=%s and TABLE_NAME=%s ; """ QUERY_data_from_table = "SELECT * FROM {schema}.{table} limit 10" ########FUNCTIONS################# def convert_column(d): """ Given the metadata about a column in mysql, make the portion of the `create table` statement that corresponds to that column in postgres """ #extract data_type = d["DATA_TYPE"] position = d["ORDINAL_POSITION"] table_name = d["TABLE_NAME"] d["IS_NULLABLE"] = "NOT NULL" if d["IS_NULLABLE"] == "NO" else "" #convert if data_type=="varchar": string = "{COLUMN_NAME} character varying({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d) elif data_type=="char": string = "{COLUMN_NAME} character({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d) elif data_type=="tinyint": string = "{COLUMN_NAME} smallint {IS_NULLABLE}".format(**d) elif data_type=="decimal": string = "{COLUMN_NAME} numeric({NUMERIC_PRECISION},{NUMERIC_SCALE}) {IS_NULLABLE}".format(**d) elif data_type=="int": string = "{COLUMN_NAME} integer {IS_NULLABLE},".format(**d) elif data_type=="enum": string = None elif data_type=="text": string = None return string if __name__ == "__main__": #process environment variables load_dotenv() POSTGRES_HOST = os.getenv("POSTGRES_HOST") POSTGRES_DB = os.getenv("POSTGRES_DB") POSTGRES_USER = os.getenv("POSTGRES_USER") POSTGRES_PASSWD = os.getenv("POSTGRES_PASSWD") POSTGRES_PORT = os.getenv("POSTGRES_PORT") MARIADB_HOST = os.getenv("MARIADB_HOST") MARIADB_DB = os.getenv("MARIADB_DB") MARIADB_USER = os.getenv("MARIADB_USER") MARIADB_PASSWD = os.getenv("MARIADB_PASSWD") MARIADB_PORT = os.getenv("MARIADB_PORT") #get & convert datatypes for each table of interest tables_of_interest = ["rxnorm_props","rxnorm_relations"] mschema="rxnorm_current" pschema="rxnorm_migrated" with pymysql.connect( user=MARIADB_USER ,password=MARIADB_PASSWD ,host=MARIADB_HOST ,port=MARIADB_PORT ,database=MARIADB_DB ,cursorclass=pymysql.cursors.DictCursor ) as mcon, psyco.connect( user=POSTGRES_USER ,password=POSTGRES_PASSWD ,host=POSTGRES_HOST ,port=POSTGRES_PORT ,database=POSTGRES_DB ) as pcon: with mcon.cursor() as mcurse, pcon.cursor() as pcurse: for table in tables_of_interest: #create equivalent table in postgres continue q = QUERY_columns_from_Information_Schema mcurse.execute(q,[mschema,table]) columns = [convert_column(a) for a in mcurse.fetchall() ] column_sql = ",\n".join(columns) #create a header and footer header="CREATE TABLE IF NOT EXISTS {schema}.{table_name}\n(".format(schema=pschema, table_name=table) footer=");" #CREATE TABLE create_table_statement = "\n".join([header,column_sql,footer]) pcurse.execute(create_table_statement) #extract data from mysql # with mcon.cursor() as mcurse, pcon.cursor() as pcurse: for table in tables_of_interest: mcurse.execute("select * from rxnorm_current.{table} limit 10".format(table=table)) print(mcurse.fetchone())