recording local changes
parent
4cc4c5c99f
commit
23826fb576
@ -0,0 +1,149 @@
|
|||||||
|
import psycopg2 as psyco
|
||||||
|
from psycopg2 import sql
|
||||||
|
from psycopg2 import extras
|
||||||
|
import pymysql
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
##############NOTE
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
mariadb --mariadb.connect--> incrementally fetched dict --psycopg2--> postgres
|
||||||
|
|
||||||
|
I will have the ability to reduce memory usage and simplify what I am doing.
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
########FUNCTIONS#################
|
||||||
|
|
||||||
|
|
||||||
|
def convert_column(d):
|
||||||
|
"""
|
||||||
|
Given the metadata about a column in mysql, make the portion of the `create table`
|
||||||
|
statement that corresponds to that column in postgres
|
||||||
|
"""
|
||||||
|
#extract
|
||||||
|
data_type = d["DATA_TYPE"]
|
||||||
|
position = d["ORDINAL_POSITION"]
|
||||||
|
table_name = d["TABLE_NAME"]
|
||||||
|
d["IS_NULLABLE"] = "NOT NULL" if d["IS_NULLABLE"] == "NO" else ""
|
||||||
|
|
||||||
|
#convert
|
||||||
|
if data_type=="varchar":
|
||||||
|
string = "{COLUMN_NAME} character varying({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
|
||||||
|
elif data_type=="char":
|
||||||
|
string = "{COLUMN_NAME} character({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
|
||||||
|
elif data_type=="tinyint":
|
||||||
|
string = "{COLUMN_NAME} smallint {IS_NULLABLE}".format(**d)
|
||||||
|
elif data_type=="decimal":
|
||||||
|
string = "{COLUMN_NAME} numeric({NUMERIC_PRECISION},{NUMERIC_SCALE}) {IS_NULLABLE}".format(**d)
|
||||||
|
elif data_type=="int":
|
||||||
|
string = "{COLUMN_NAME} integer {IS_NULLABLE},".format(**d)
|
||||||
|
elif data_type=="enum":
|
||||||
|
string = None
|
||||||
|
elif data_type=="text":
|
||||||
|
string = None
|
||||||
|
|
||||||
|
return string
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
#process environment variables
|
||||||
|
load_dotenv()
|
||||||
|
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
|
||||||
|
POSTGRES_DB = os.getenv("POSTGRES_DB")
|
||||||
|
POSTGRES_USER = os.getenv("POSTGRES_USER")
|
||||||
|
POSTGRES_PASSWD = os.getenv("POSTGRES_PASSWD")
|
||||||
|
POSTGRES_PORT = int(os.getenv("POSTGRES_PORT"))
|
||||||
|
|
||||||
|
MARIADB_HOST = os.getenv("MARIADB_HOST")
|
||||||
|
MARIADB_DB = os.getenv("MARIADB_DB")
|
||||||
|
MARIADB_USER = os.getenv("MARIADB_USER")
|
||||||
|
MARIADB_PASSWD = os.getenv("MARIADB_PASSWD")
|
||||||
|
MARIADB_PORT = int(os.getenv("MARIADB_PORT"))
|
||||||
|
|
||||||
|
#get & convert datatypes for each table of interest
|
||||||
|
tables_of_interest = ["rxnorm_props","rxnorm_relations"]
|
||||||
|
mschema="rxnorm_current"
|
||||||
|
pschema="rxnorm_migrated"
|
||||||
|
|
||||||
|
|
||||||
|
with pymysql.connect(
|
||||||
|
user=MARIADB_USER
|
||||||
|
,password=MARIADB_PASSWD
|
||||||
|
,host=MARIADB_HOST
|
||||||
|
,port=MARIADB_PORT
|
||||||
|
,database=MARIADB_DB
|
||||||
|
,cursorclass=pymysql.cursors.DictCursor
|
||||||
|
) as mcon, psyco.connect(
|
||||||
|
user=POSTGRES_USER
|
||||||
|
,password=POSTGRES_PASSWD
|
||||||
|
,host=POSTGRES_HOST
|
||||||
|
,port=POSTGRES_PORT
|
||||||
|
,database=POSTGRES_DB
|
||||||
|
) as pcon:
|
||||||
|
with mcon.cursor() as mcurse, pcon.cursor(cursor_factory=extras.DictCursor) as pcurse:
|
||||||
|
for table in tables_of_interest: #create equivalent table in postgres
|
||||||
|
q = "SELECT * FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA=%s and TABLE_NAME=%s;"
|
||||||
|
mcurse.execute(q,[mschema,table])
|
||||||
|
|
||||||
|
columns = [convert_column(a) for a in mcurse.fetchall() ]
|
||||||
|
column_sql = sql.SQL(",\n".join(columns))
|
||||||
|
|
||||||
|
#create a header and footer
|
||||||
|
header=sql.SQL("CREATE TABLE IF NOT EXISTS {}\n(").format(sql.Identifier(pschema,table))
|
||||||
|
footer=sql.SQL(");")
|
||||||
|
|
||||||
|
#CREATE TABLE
|
||||||
|
create_table_statement = sql.SQL("\n").join([header,column_sql,footer])
|
||||||
|
#print(create_table_statement.as_string(pcon))
|
||||||
|
pcurse.execute(create_table_statement)
|
||||||
|
pcon.commit() #commit the new table as they are done.
|
||||||
|
|
||||||
|
#FIX below uses a poor approach, need to change to use the parameters approach.
|
||||||
|
mcurse.execute("SELECT * FROM {schema}.{table}".format(schema=mschema,table=table))
|
||||||
|
a = mcurse.fetchall()
|
||||||
|
|
||||||
|
#get list of field names and build the appropriate
|
||||||
|
column_list = [sql.SQL(x) for x in a[0]]
|
||||||
|
column_inserts = [sql.SQL("%({})s".format(x)) for x in a[0]] #fix with sql.Placeholder
|
||||||
|
#print(column_inserts)
|
||||||
|
|
||||||
|
#Building the sql
|
||||||
|
psql_insert = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s ").format(
|
||||||
|
table=sql.Identifier(pschema,table)
|
||||||
|
,columns=sql.SQL(",").join(column_list)
|
||||||
|
)
|
||||||
|
#Note that this does not contain parenthases around the placeholder
|
||||||
|
|
||||||
|
#Building the template.
|
||||||
|
#Note that it must include the parenthases so that the
|
||||||
|
#VALUES portion is formatted correctly.
|
||||||
|
template = sql.SQL(",").join(column_inserts)
|
||||||
|
template = sql.Composed([
|
||||||
|
sql.SQL("(")
|
||||||
|
,template
|
||||||
|
,sql.SQL(")")
|
||||||
|
])
|
||||||
|
|
||||||
|
#insert the data with page_size
|
||||||
|
extras.execute_values(pcurse,psql_insert,argslist=a,template=template, page_size=1000)
|
||||||
|
"""
|
||||||
|
ISSUE HERE ^^^^^ somehow execute values isn't separating over dictionaries very well
|
||||||
|
https://www.psycopg.org/docs/extras.html#psycopg2.extras.execute_batch
|
||||||
|
maybe replace with execute_batch?
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,7 @@
|
|||||||
|
import RxMixInABox as rx
|
||||||
|
a = rx.FindRxcuiByString("Levothyroxine")
|
||||||
|
print(a)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
c= rx.get_brands_from_ingredients(a[0])
|
||||||
@ -0,0 +1,95 @@
|
|||||||
|
import connetorx as cx
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
import re
|
||||||
|
|
||||||
|
####################CONSTANTS#################################
|
||||||
|
MYSQL_CONNECTION_STRING="mysql://webuser:9521354c77aa@localhost/"
|
||||||
|
POSTGRES_CONNECTION_STRING="postgresql://root:root@localhost/aact_db"
|
||||||
|
POSTGRES_ENGINE = create_engine(POSTGRES_CONNECTION_STRING)
|
||||||
|
SPLIT_RE = re.compile("(\w+)(\((\d+)\))?")
|
||||||
|
|
||||||
|
|
||||||
|
###################QUERIES#########################
|
||||||
|
|
||||||
|
QUERY_columns_from_Information_Schema = """
|
||||||
|
SELECT *
|
||||||
|
FROM INFORMATION_SCHEMA.columns
|
||||||
|
WHERE
|
||||||
|
TABLE_SCHEMA="rxnorm_current"
|
||||||
|
"""
|
||||||
|
|
||||||
|
QUERY_data_from_table = ""
|
||||||
|
|
||||||
|
|
||||||
|
########FUNCTIONS#################
|
||||||
|
def query_mysql(query):
|
||||||
|
"""
|
||||||
|
runs a query against the MYSQL database, returning a pandas df
|
||||||
|
"""
|
||||||
|
return cx.read_sql(MYSQL_CONNECTION_STRING, query)
|
||||||
|
|
||||||
|
def insert_table_postgres(df, table, schema):
|
||||||
|
"""
|
||||||
|
Inserts data into a table
|
||||||
|
"""
|
||||||
|
return df.to_sql(
|
||||||
|
table
|
||||||
|
,POSTGRES_ENGINE
|
||||||
|
,schema=schema
|
||||||
|
,if_exists="append"
|
||||||
|
,method="multi"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def convert_mysql_types_to_pgsql(binary_type):
|
||||||
|
"""
|
||||||
|
Given a binary string of a column's type,
|
||||||
|
convert to utf8, and then parse it into
|
||||||
|
a postgres type
|
||||||
|
"""
|
||||||
|
string_type = binary_type.decode("utf-8").lower()
|
||||||
|
|
||||||
|
#get the value name and length out.
|
||||||
|
val_type,_,length = SPLIT_RE.match(string_type).groups()
|
||||||
|
|
||||||
|
def convert_column(df_row):
|
||||||
|
#extract
|
||||||
|
position = df_row.ORDINAL_POSITION
|
||||||
|
table_name = df_row.TABLE_NAME
|
||||||
|
|
||||||
|
#convert
|
||||||
|
if data_type=="varchar":
|
||||||
|
string = "{column_name} character varying({data_length}) COLLATE pg_catalog.\"default\" {is_nullable},".format(
|
||||||
|
column_name = df_row.COLUMN_NAME
|
||||||
|
,data_length = np.int64(df_row.CHARACTER_MAXIMUM_LENGTH)
|
||||||
|
,is_nullable = "NOT NULL" if df_row.IS_NULLABLE == "NO" else ""
|
||||||
|
)
|
||||||
|
elif data_type=="char":
|
||||||
|
string = "{column_name} char({data_length})[] COLLATE pg_catalog.\"default\" {is_nullable},".format(
|
||||||
|
column_name = df_row.COLUMN_NAME
|
||||||
|
,data_length = np.int64(df_row.CHARACTER_MAXIMUM_LENGTH)
|
||||||
|
,is_nullable = "NOT NULL" if df_row.IS_NULLABLE == "NO" else ""
|
||||||
|
)
|
||||||
|
elif data_type=="tinyint":
|
||||||
|
string = "{column_name} smallint {is_nullable},".format(
|
||||||
|
column_name = df_row.COLUMN_NAME
|
||||||
|
,is_nullable = "NOT NULL" if df_row.IS_NULLABLE == "NO" else ""
|
||||||
|
)
|
||||||
|
series_type = numpy.int8
|
||||||
|
elif data_type=="decimal":
|
||||||
|
string = "{column_name} numeric({precision},{scale}) {is_nullable},".format(
|
||||||
|
column_name = df_row.COLUMN_NAME
|
||||||
|
,is_nullable = "NOT NULL" if df_row.IS_NULLABLE == "NO" else ""
|
||||||
|
,precision= np.int64(df_row.NUMERIC_PRECISION)
|
||||||
|
,scale= np.int64(df_row.NUMERIC_SCALE)
|
||||||
|
)
|
||||||
|
elif data_type=="int":
|
||||||
|
pass
|
||||||
|
elif data_type=="enum":
|
||||||
|
pass
|
||||||
|
elif data_type=="text":
|
||||||
|
pass
|
||||||
|
|
||||||
|
return string
|
||||||
|
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue