Merge remote-tracking branch 'refs/remotes/origin/main'

Adjusted some git lfs stuff.
main
will king 2 years ago
commit 3311159ab6

@ -1,64 +0,0 @@
{
"folders": {},
"connections": {
"mariaDB-186c896820e-6ff11b5b802d8b82": {
"provider": "mysql",
"driver": "mariaDB",
"name": "rxnav",
"save-password": true,
"configuration": {
"host": "will-office",
"port": "3306",
"url": "jdbc:mariadb://will-office:3306/",
"configurationType": "MANUAL",
"type": "dev",
"auth-model": "native"
}
},
"postgres-jdbc-186c896a347-2a3d946d2dea4df7": {
"provider": "postgresql",
"driver": "postgres-jdbc",
"name": "aact_db",
"save-password": true,
"configuration": {
"host": "100.95.169.11",
"port": "5432",
"database": "aact_db",
"url": "jdbc:postgresql://100.95.169.11:5432/aact_db",
"configurationType": "MANUAL",
"type": "dev",
"provider-properties": {},
"auth-model": "native"
},
"custom-properties": {
"resultset.maxrows": "500"
}
},
"postgres-jdbc-186cd8f479f-6cc3c10c8adc3359": {
"provider": "postgresql",
"driver": "postgres-jdbc",
"name": "drugcentral",
"save-password": true,
"configuration": {
"host": "localhost",
"port": "54320",
"database": "postgres",
"url": "jdbc:postgresql://localhost:54320/postgres",
"configurationType": "MANUAL",
"type": "dev",
"auth-model": "native"
}
}
},
"connection-types": {
"dev": {
"name": "Development",
"color": "255,255,255",
"description": "Regular development database",
"auto-commit": true,
"confirm-execute": false,
"confirm-data-change": false,
"auto-close-transactions": false
}
}
}

@ -1 +0,0 @@
{"resources":{"Scripts/ASSOICATING NCTIDs to NDCs and Marketing dates.sql":{"default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db","default-schema":"public"},"Scripts/Data_summaries.sql":{"default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db","default-schema":"public"},"Scripts/DevelopingLinks.sql":{"default-schema":"public","default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db"},"Scripts/DiseaseBurdens_create_table.sql":{"default-schema":"public","default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db"},"Scripts/GlobalBurdensOfDisease2019Codebook.sql":{"default-schema":"DiseaseBurden","default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db"},"Scripts/GroupingTrials.sql":{"default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db","default-schema":"public"},"Scripts/Script.sql":{"default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db","default-schema":"public"},"Scripts/TablesAndViews_Public.sql":{"default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db","default-schema":"public"},"development_sql/ASSOICATING NCTIDs to NDCs and Marketing dates.sql":{"default-schema":"public","default-datasource":"postgres-jdbc-186c896a347-2a3d946d2dea4df7","default-catalog":"aact_db"}}}

1
.gitattributes vendored

@ -1,2 +1,3 @@
*.sql.gzip filter=lfs diff=lfs merge=lfs -text
*.xlsx filter=lfs diff=lfs merge=lfs -text
containers/AACT_Reloader/2023-09-06_aactdb_with_matches.sql.gz filter=lfs diff=lfs merge=lfs -text

2
.gitignore vendored

@ -194,4 +194,4 @@ containers/drugcentral/docker-entrypoint-initdb.d/*.sql
containers/drugcentral/docker-entrypoint-initdb.d/*.sql.gz
containers/drugcentral/db_store/*
.dbeaver/

BIN
2023-09-06_aactdb_with_matches.sql.gzip (Stored with Git LFS)

Binary file not shown.

@ -5,23 +5,19 @@
# - move postgress login credentials (allow them to be printed from just while setting up)
#paths for aact_db (postgres)
aact_download_link := "https://ctti-aact.nyc3.digitaloceanspaces.com/27grtsnhtccplxapj2o8ak9aotvv"
aact_download_file := "2022-12-23_postgres_data.zip"
aact_download_path := "./containers/AACT_downloader/aact_downloads"
aact_zipped_data_filepath := aact_download_path / aact_download_file
data_link := "https://ctti-aact.nyc3.digitaloceanspaces.com/27grtsnhtccplxapj2o8ak9aotvv"
data_file := "2022-12-23_postgres_data.zip"
data_path := "./containers/AACT_downloader/aact_downloads"
data_filepath := data_path / data_file
#must match the 'container name: aact_db' in the docker-compose.yaml
docker_container := `docker container ls -a | grep "aact_db|rxnav_db" | cut -f 1 -d " " | tr "\n" " "`
#paths for rxnavinabox
rxnav_path := "./containers/RxNav-In-a-box"
rxnav_version := "rxnav-in-a-box-20230103"
rxnav_data_path := rxnav_path / rxnav_version / "mysql" / "02_data.sql"
docker_container := `docker container ls -a | grep aact_db | cut -f 1 -d " " | tr "\n" " "`
#Various paths for docker stuff
docker-compose_path := "./containers/docker-compose.yaml"
docker-compose_path := "./AACT_downloader/docker-compose.yaml"
#rxnorm_mappings
rxnorm_mappings_url := "https://dailymed-data.nlm.nih.gov/public-release-files/rxnorm_mappings.zip"
#Number of historical trials to download.
count := "100"
@ -32,23 +28,18 @@ check-status:
docker --version
#check if python version > 3.10.
python --version
#python -c 'import sys; exit(sys.hexversion >= 50859504)'
python -c 'import sys; exit(sys.hexversion >= 50859504)'
curl --version
echo "current docker containers:{{docker_container}}"
#Setup the AACT container
setup-containers:
echo "todo"
@echo "Check for downloaded data"
#aact
[ -s {{aact_download_path}}/postgres_data.dmp ]
#rxnav
[ -s {{rxnav_data_path}} ]
[ -s {{data_path}}/postgres_data.dmp ]
#run docker compose
@echo "Setting up AACT_db & RxNav_db container"
@echo "Setting up AACT container"
docker-compose -f {{docker-compose_path}} up -d
#Stop the appropriate docker container
stop-containers:
@ -69,13 +60,10 @@ clean-docker: stop-containers
#Download the AACT data
download-aact-data:
#download
curl {{aact_download_link}} > {{aact_zipped_data_filepath}}
unzip {{aact_zipped_data_filepath}} -d {{aact_download_path}}
rm {{aact_zipped_data_filepath}}
curl {{data_link}} > ./AACT_downloader/aact_downloads/{{data_file}}
unzip {{data_filepath}} -d {{data_path}}
rm {{data_filepath}}
download-rxnav-data:
echo "Currently manually downloaded."
#build based on previously downloaded data
build: check-status setup-containers
@ -117,3 +105,8 @@ get-nsde:
cd market_data && bash download_nsde.sh
cd market_data && python extract_nsde.py
get-rxnorm-mappings:
#this may not be needed, all it does is match spls to rxcuis and I think I already have that.
curl {{rxnorm_mappings_url}} > ./market_data/rxnorm_mappings.zip
cd ./market_data && unzip ./rxnorm_mappings.zip
rm ./market_data/rxnorm_mappings.zip

@ -0,0 +1,9 @@
USP[[:space:]]DC/USP_DC_12_2021_RELEASE_1.0.xlsx filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]DC/usp_dc_pub_2023_release_2.0_updated_final.xlsx filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]MMG/Final_Report_and_Summary_of_Methodology_and_Approach_v1.1.pdf filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]MMG/MMG_v8.0_Alignment_File.xlsx filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]MMG/Summary_of_Changes_between_MMGv7.0_and_MMGv8.0.pdf filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]MMG/USP_Medicare_Model_Guidelines_v8.0__All_Excel_Spreadsheets_.xlsx filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]MMG/USP_Medicare_Model_Guidelines_v8.0__Categories_and_Classes_.pdf filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]MMG/USP_Medicare_Model_Guidelines_v8.0__Showing_changes_from_v7.0_.pdf filter=lfs diff=lfs merge=lfs -text
USP[[:space:]]MMG/USP_Medicare_Model_Guidelines_v8.0__With_Example_Part_D_Drugs_.pdf filter=lfs diff=lfs merge=lfs -text

File diff suppressed because it is too large Load Diff

@ -1,44 +0,0 @@
from flask import Flask
import os
from dotenv import dotenv_values
env_path = "../../containers/.env"
ENV = dotenv_values(env_path)
def create_app(test_config=None):
# create and configure the app
app = Flask(__name__, instance_relative_config=True)
app.config.from_mapping(
SECRET_KEY='6e674d6e41b733270fd01c6257b3a1b4769eb80f3f773cd0fe8eff25f350fc1f',
POSTGRES_DB=ENV["POSTGRES_DB"],
POSTGRES_USER=ENV["POSTGRES_USER"],
POSTGRES_HOST=ENV["POSTGRES_HOST"],
POSTGRES_PORT=ENV["POSTGRES_PORT"],
POSTGRES_PASSWORD=ENV["POSTGRES_PASSWORD"],
)
# ensure the instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
# a simple page that says hello
@app.route('/')
def hello():
return 'Hello, World!'
from . import db_interface
db_interface.init_database(app)
from . import validation
app.register_blueprint(validation.bp)
return app

@ -1,175 +0,0 @@
import psycopg2 as psyco
from psycopg2 import extras
from datetime import datetime
import click #used for cli commands. Not needed for what I am doing.
from flask import current_app, g
def get_db(**kwargs):
if "db" not in g:
g.db = psyco.connect(
dbname=current_app.config["POSTGRES_DB"]
,user=current_app.config["POSTGRES_USER"]
,host=current_app.config["POSTGRES_HOST"]
,port=current_app.config["POSTGRES_PORT"]
,password=current_app.config["POSTGRES_PASSWORD"]
,**kwargs
)
return g.db
def close_db(e=None):
db = g.pop('db', None)
if db is not None:
db.close()
def check_initialization(app):
db = get_db()
with db.cursor() as curse:
curse.execute("select count(*) from \"DiseaseBurden\".trial_to_icd10")
curse.fetchall()
#just checking if everything is going to fail
def init_database(app):
#check_initialization(app)
app.teardown_appcontext(close_db)
def select_remaing_trials_to_analyze(db_conn):
'''
This will get the set of trials that need to be analyzed.
'''
sql = '''
select distinct nct_id
from "DiseaseBurden".trial_to_icd10 tti
where tti.approved is null
order by nct_id
;
'''
with db_conn.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchall()
def select_analyzed_trials(db_conn):
'''
This will get the set of trials that have been analyzed.
'''
sql = '''
select distinct nct_id, max(approval_timestamp)
from "DiseaseBurden".trial_to_icd10 tti
where tti.approved in ('accepted','rejected')
group by nct_id
order by max(approval_timestamp) desc
;
'''
with db_conn.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchall()
def select_unmatched_trials(db_conn):
'''
This will get the set of trials that have been analyzed.
'''
sql = '''
select distinct nct_id
from "DiseaseBurden".trial_to_icd10 tti
where tti.approved = 'unmatched'
order by nct_id
;
'''
with db_conn.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchall()
def get_trial_conditions_and_proposed_matches(db_conn, nct_id):
sql = '''
select *
from "DiseaseBurden".trial_to_icd10 tti
where nct_id = %s
'''
with db_conn.cursor() as cursor:
cursor.execute(sql,[nct_id])
return cursor.fetchall()
def store_validation(db_conn, list_of_insert_data):
sql = """
update "DiseaseBurden".trial_to_icd10
set approved=%s, approval_timestamp=%s
where id=%s
;
"""
with db_conn.cursor() as cursor:
for l in list_of_insert_data:
cursor.execute(sql, l)
db_conn.commit()
def get_trial_summary(db_conn,nct_id):
sql_summary ="""
select
s.nct_id,
brief_title ,
official_title ,
bs.description as brief_description,
dd.description as detailed_description
from ctgov.studies s
left join ctgov.brief_summaries bs
on bs.nct_id = s.nct_id
left join ctgov.detailed_descriptions dd
on dd.nct_id = s.nct_id
where s.nct_id = %s
;
"""
sql_conditions="""
--conditions mentioned
select * from ctgov.conditions c
where c.nct_id = %s
;
"""
sql_keywords="""
select nct_id ,downcase_name
from ctgov.keywords k
where k.nct_id = %s
;
"""
with db_conn.cursor() as curse:
curse.execute(sql_summary,[nct_id])
summary = curse.fetchall()
curse.execute(sql_keywords,[nct_id])
keywords = curse.fetchall()
curse.execute(sql_conditions,[nct_id])
conditions = curse.fetchall()
return {"summary":summary, "keywords":keywords, "conditions":conditions}
def get_list_icd10_codes(db_conn):
sql = """
select distinct code
from "DiseaseBurden".icd10_to_cause itc
order by code;
"""
with db_conn.cursor() as curse:
curse.execute(sql)
codes = curse.fetchall()
return [ x[0] for x in codes ]
def record_suggested_matches(db_conn, nct_id,condition,icd10_code):
sql1 = """
INSERT INTO "DiseaseBurden".trial_to_icd10
(nct_id,"condition",ui,"source",approved,approval_timestamp)
VALUES (%s,%s,%s,'hand matched','accepted',%s)
;
"""
with db_conn.cursor() as curse:
curse.execute(sql1,[nct_id,condition,icd10_code,datetime.now()])
db_conn.commit()

@ -1 +0,0 @@
#at some point I need to add a login or something.

@ -1,25 +0,0 @@
<!doctype html>
<title>{% block title %}{% endblock %} - ClinicalTrialsProject</title>
<!--<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">-->
<nav>
<h1>Nav</h1>
<ul>
<li>
<a href="{{ url_for('validation.remaining') }}">Validation Home</a>
</li>
<li>
<a href="https://icd.who.int/browse10/2019/en">WHO ICD-10 Codes (2019)</a>
</li>
<li>
<a href="https://uts.nlm.nih.gov/uts/umls/home">UMLS Metathesaurs browser (requires login)</a>
</li>
</ul>
</nav>
<section class="content">
<header>
{% block header %}{% endblock %}
</header>
{% block content %}{% endblock %}
</section>

@ -1,49 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %} ICD-10 to Trial Conditions Validation {% endblock %}</h1>
{% endblock %}
{% block content %}
<h2>Trials to Validate</h2>
<table>
<th>Trials</th>
{% for trial in list_to_validate %}
<tr><td>
<a href="{{ url_for('.validate_trial', nct_id=trial[0] ) }}">
{{ trial [0] }}
</a>
</td></tr>
{% endfor %}
</table>
<h2>Trials that have been Validated</h2>
<table>
<th>Trials Links</th>
{% for trial in validated_list %}
<tr><td>
<a href="{{ url_for('.validate_trial', nct_id=trial[0] ) }}">
{{ trial [0] }}
</a>
(Most recently updated {{trial[1]}})
</td></tr>
{% endfor %}
</table>
<h2>Trials that don't have a good match</h2>
<table>
<th>Trial Links</th>
{% for trial in unmatched_list %}
<tr><td>
<a href="{{ url_for('.validate_trial', nct_id=trial[0] ) }}">
{{ trial [0] }}
</a>
</td></tr>
{% endfor %}
</table>
{% endblock %}

@ -1,95 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1> ICD-10 to Trial Conditions Validation: {{ nct_id }} </h1>
{% endblock %}
{% block content %}
<section class="summary">
<h3>Trial Summary</h3>
<div class="text_summary">
<ul>
<li>NCT: {{ summary_dats["summary"][0][0] }}</li>
<li>Brief Title: {{ summary_dats["summary"][0][1] }}</li>
<li>Long Title: {{ summary_dats["summary"][0][2] }}</li>
<li>Brief Description: {{ summary_dats["summary"][0][3] }}</li>
<li>Long Description: {{ summary_dats["summary"][0][4] }}</li>
</ul>
</div>
<div class="keywords">
<h4>Keywords</h4>
<ul>
{% for keyword in summary_dats["keywords"] %}
<li>
{{ keyword[1] }}
</li>
{% endfor %}
</ul>
</div>
<div class="conditions">
<h4>Raw Conditions </h4>
<ul>
{% for condition in summary_dats["conditions"] %}
<li>
{{ condition[3] }}
</li>
{% endfor %}
</ul>
</div>
</section>
<section class="proposed_conditions">
<h3>Proposed Conditions</h3>
<form method="post">
<table>
<tr>
<th>Approve</th>
<th>Condition (MeSH normalized)</th>
<th>Identifier</th>
<th>Source</th>
<th>Description</th>
<th>Source</th>
</tr>
{% for condition in condition_list %}
<tr>
<td> <input type="checkbox" id="{{ condition[0] }}" name="{{condition[0]}}" value="accepted" {% if condition[8] == "accepted" %}checked{% endif %}> </td>
<td> {{condition[2]}} </td>
<td> {{condition[3]}} </td>
<td> {{condition[5]}} </td>
<td> {{condition[6]}} </td>
<td> {{condition[7]}} </td>
</tr>
{% endfor %}
</table>
<input type="submit" name="submission" value="Submit approvals">
<br/>
<input type="submit" name="marked_unmatched" value="Mark unmmatched">
</form>
</section>
<section class="submit_alternate">
<h3>Submit Alternate Conditions</h3>
<!--For each listed condition, provide a spot to enter a ICT10 code-->
<form method="post">
<label for="alternate_sub">Please enter the proposed code that appears to be the best match:</label>
<input name="alt_sub" id="alternate_sub">
<br/>
<label for="condition">
Please give a name to the condition you used to match this<br/>
Condition:
</label>
<input name="condition", id="condition">
<br/>
<input type="submit" name="alternate_submission" value="Submit alternate ICD-10 code">
</form>
</section>
<section class="approved">
<!--TODO:This will list the already approved values-->
</section>
{% endblock %}

@ -1,98 +0,0 @@
import functools
from flask import (Blueprint, flash, g, redirect, render_template, request, session, url_for)
from Icd10ConditionsMatching.db_interface import (
get_db,select_remaing_trials_to_analyze,
select_analyzed_trials,
select_unmatched_trials,
get_trial_conditions_and_proposed_matches,
store_validation,
get_trial_summary,
get_list_icd10_codes,
record_suggested_matches,
)
from datetime import datetime
#### First Blueprint: Checking Data
bp = Blueprint("validation", __name__, url_prefix="/validation")
@bp.route("/",methods=["GET"])
def remaining():
db_conn = get_db()
to_validate = select_remaing_trials_to_analyze(db_conn)
validated = select_analyzed_trials(db_conn)
unmatched_list = select_unmatched_trials(db_conn)
return render_template(
"validation_index.html",
list_to_validate=to_validate,
validated_list = validated,
unmatched_list = unmatched_list
)
@bp.route("/<nct_id>", methods=["GET","POST"])
def validate_trial(nct_id):
if request.method == "GET":
db_conn = get_db()
condition_list = get_trial_conditions_and_proposed_matches(db_conn, nct_id)
summary_dats = get_trial_summary(db_conn, nct_id)
return render_template(
"validation_of_trial.html",
nct_id=nct_id,
condition_list=condition_list,
summary_dats=summary_dats,
)
elif request.method == "POST":
db_conn = get_db()
list_of_insert_data = []
db_conn = get_db()
condition_list = get_trial_conditions_and_proposed_matches(db_conn, nct_id)
print(request.form)
if "submission" in request.form:
#if it is a submission:
#grab all match ids from db
#if match id in submitted form, mark as approved, otherwise mark as rejected
for condition in condition_list:
id = condition[0]
list_of_insert_data.append((request.form.get(str(id),"rejected"), datetime.now(),id))
store_validation(db_conn, list_of_insert_data)
return redirect(url_for("validation.remaining"))
elif "marked_unmatched" in request.form:
#if this was marked as "unmatched", store that for each entry.
for condition in condition_list:
id = condition[0]
list_of_insert_data.append(( "unmatched", datetime.now(), id))
store_validation(db_conn, list_of_insert_data)
return redirect(url_for("validation.remaining"))
elif "alternate_submission" in request.form:
code = request.form["alt_sub"]
code = code.strip().replace(".",'').ljust(7,"-")
condition = request.form["condition"].strip()
codelist = get_list_icd10_codes(db_conn)
if code in codelist:
record_suggested_matches(db_conn, nct_id, condition, code)
return redirect(request.path)
else:
record_suggested_matches(db_conn, nct_id, condition + "| Code not in GBD list", code)
return """
Entered `{}`, which is not in the list of available ICD-10 codes. <a href={}>Return to trial summary</a>
""".format(code.strip("-"),request.path), 422

@ -1,13 +0,0 @@
from setuptools import setup
setup(
name='Icd10ConditionsMatching',
packages=['Icd10ConditionsMatching'],
include_package_data=True,
install_requires=[
'flask',
'psycopg2',
'datetime',
'python-dotenv',
],
)

@ -1 +0,0 @@
waitress-serve --port=5000 --call 'Icd10ConditionsMatching:create_app'

@ -1,11 +0,0 @@
from drugtools.env_setup import postgres_conn, mariadb_conn, ENV
print(ENV)
with postgres_conn() as pconn, pconn.cursor() as curse:
curse.execute("select nct_id FROM ctgov.studies LIMIT 10;")
print(curse.fetchall())
with mariadb_conn() as mconn, mconn.cursor() as mcurse:
mcurse.execute("select * FROM ALLNDC_HISTORY LIMIT 10;")
print(mcurse.fetchall())

@ -1,42 +0,0 @@
from drugtools.historical_nct_downloader import make_request, get_highest_version_number, step_generator
#this uses the history downloader script to test downloads
def trial_downloads(nct_id):
"""
Mirrors `download_and_handle_errors` and , without writing to a database
Instead it writes to files
"""
print("downloading {}".format(nct_id))
r = make_request(nct_id,1,2)
responses = [ r ]
print(r.url)
v = get_highest_version_number(r)
if v == 2:
pass
elif v%2 == 0:
for version_a, version_b in step_generator(v):
print("\t versions {} & {}".format(version_a,version_b))
req = make_request(nct_id,version_a,version_b)
responses.append(req)
elif v %2 == 1:
for version_a, version_b in step_generator(v):
print("\t downloading versions {} & {}".format(version_a,version_b))
req = make_request(nct_id,version_a,version_b)
responses.append(req)
responses.append(make_request(nct_id,1,v))
print("\tDownloaded {} versions".format(v))
return responses
if __name__ == "__main__":
test_list = ['NCT01303796', 'NCT01371708', 'NCT01374906', 'NCT01382602', 'NCT01384539', 'NCT01391546', 'NCT01392469', 'NCT01413178', 'NCT01416441', 'NCT01418339']
for t in test_list:
resp = trial_downloads(t)

@ -1,96 +0,0 @@
import json
from psycopg2.extras import execute_values
import datetime as dt
from drugtools.env_setup import postgres_conn, ENV
import requests
import zipfile
import io
URL_STEM = 'https://download.open.fda.gov/other/nsde/'
NUMBER_OF_NSDE_FILES = int(ENV["NUMBER_OF_NSDE_FILES"])
def filename_generator(max_num):
for itt in range(1,max_num+1):
filename = "other-nsde-{:0>4}-of-{:0>4}.json.zip".format(itt,max_num)
yield filename
def get_date(result,key):
r = result.get(key)
if r:
return dt.datetime.strptime(r, "%Y%m%d")
else:
return None
def build_values(result):
#adjust types
proprietary_name = result.get("proprietary_name")
application_number_or_citation = result.get("application_number_or_citation")
product_type = result.get("product_type")
package_ndc = result.get("package_ndc")
marketing_category = result.get("marketing_category")
package_ndc11 = result.get("package_ndc11")
dosage_form = result.get("dosage_form")
billing_unit = result.get("billing_unit")
marketing_start_date = get_date(result,"marketing_start_date")
marketing_end_date = get_date(result, "marketing_end_date")
inactivation_date = get_date(result, "inactivation_date")
reactivation_date = get_date(result,"reactivation_date")
return (
proprietary_name
,application_number_or_citation
,product_type
,package_ndc
,marketing_category
,package_ndc11
,dosage_form
,billing_unit
,marketing_start_date
,marketing_end_date
,inactivation_date
,reactivation_date
)
def download_and_extract_zip(base_url,filename):
response = requests.get(base_url + filename)
with zipfile.ZipFile(io.BytesIO(response.content)) as the_zip:
contents_list = the_zip.infolist()
for content_name in contents_list:
return the_zip.read(content_name)
def run():
for filename in filename_generator(NUMBER_OF_NSDE_FILES):
#It would be nice to replace this^^ file_generator with something that retrieves and unzips the files directly.
with (postgres_conn() as con , con.cursor() as curse):
print(filename)
j = download_and_extract_zip(URL_STEM, filename)
results = json.loads(j)["results"]
query = """
INSERT INTO spl.nsde (
proprietary_name
,application_number_or_citation
,product_type
,package_ndc
,marketing_category
,package_ndc11
,dosage_form
,billing_unit
,marketing_start_date
,marketing_end_date
,inactivation_date
,reactivation_date
)
VALUES %s;
"""
values = [build_values(y) for y in results]
execute_values(curse,query,values)
if __name__ == "__main__":
run()

@ -1,43 +0,0 @@
import pymysql
import psycopg2 as psyco
from psycopg2.sql import SQL
from dotenv import dotenv_values
env_path = "../containers/.env"
ENV = dotenv_values(env_path)
def mariadb_conn(**kwargs):
return pymysql.connect(
database=ENV["MYSQL_DB"]
,user=ENV["MYSQL_USER"]
,host=ENV["MYSQL_HOST"]
,port=int(ENV["MYSQL_PORT"])
,password=ENV["MYSQL_PASSWORD"]
,**kwargs
)
def postgres_conn(**kwargs):
return psyco.connect(
dbname=ENV["POSTGRES_DB"]
,user=ENV["POSTGRES_USER"]
,host=ENV["POSTGRES_HOST"]
,port=ENV["POSTGRES_PORT"]
,password=ENV["POSTGRES_PASSWORD"]
,**kwargs
)
def get_tables_of_interest():
return ENV["TABLES_OF_INTEREST"].split(",")
def postgres_table_delete_entries(schema,table):
with postgres_conn() as con:
with con.cursor() as curse:
delete_statement = SQL("delete from {schema}.{table}").format(
schema=Identifier(schema),
talbe=Identifier(table)
)
curse.execute(delete_statement)
con.commit()

@ -1,266 +0,0 @@
import requests
from datetime import datetime
from bs4 import BeautifulSoup
from multiprocess import Pool, Value
import math
import time
from drugtools.env_setup import postgres_conn, ENV
from tqdm import tqdm
############ GLOBALS
RESET_TIME = Value('I',int(ENV["TRIAL_DOWNLOAD_RESET_TIME"]))
DELAY_TIME = Value("I",int(ENV["TRIAL_DOWNLOAD_DELAY_TIME"]))
TRIAL_RESERVATION_LIMIT=int(ENV["TRIAL_RESERVATION_LIMIT"])
TRIAL_RESERVATION_BATCH_SIZE=int(ENV["TRIAL_RESERVATION_BATCH_SIZE"])
############ Functions
def get_highest_version_number(response):
"""
Navigate to the version table and and extract the highest posted version.
As there are cases where the last element in the table IS NOT a
a version entry, this function iterates from the last row entry to the first,
looking for cells with the correct header, indicating
that it contains version information.
The last one occuring in the unreversed list is what we need.
"""
soup = BeautifulSoup(response.text, features="lxml")
#get version table rows
try:
table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
except IndexError as ie:
raise ie
for row in reversed(table_rows):
# if it is <td headers="VersionNumber">xx</td> then it contains what we need.
for td in row.findChildren("td"):
if ("headers" in td.attrs) and (td.attrs["headers"][0]=="VersionNumber"):
#Note the use of [0] above. attribute elements are lists.
version_number = int(td.text)
return version_number
def make_request(nct_id,version1,version2):
"""
Request a page comparing two snapshots
"""
#create url
baseurl = "https://clinicaltrials.gov/ct2/history/{}?A={}&B={}&C=Side-by-Side"
url = baseurl.format(nct_id,version1,version2)
#make request
try:
time.sleep(0.02)
response = requests.get(url)
except requests.exceptions.ConnectionError as ce:
raise ce
#return the response
return response
def upload_response(db_cursor, nct_id, version_a, version_b, response):
"""
Upload a requested page (with versions) to the database.
"""
timestamp = datetime.strptime(response.headers['date'], "%a, %d %b %Y %H:%M:%S %Z")
#this uploads the response values.
db_cursor.execute("""
INSERT INTO http.responses
(nct_id,version_a,version_b,url,response_code,response_date, html)
VALUES (%s,%s,%s,%s,%s,%s,%s)
;
"""
,(nct_id
,version_a
,version_b
,response.url
,response.status_code
,datetime.isoformat(timestamp)
,response.text
)
)
def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time):
"""
Request a page, checking for http error codes, and handle the errors as requested.
"""
#sleep log10(counts of delays)
time.sleep(math.log10(delay_time.value))
#request page
r = make_request(nct_id, version_a, version_b)
#check for
if r.status_code == 200:
upload_response(cursor,nct_id,version_a, version_b, r)
elif r.status_code == 404:
upload_response(cursor, nct_id, version_a, version_b, r)
write_incomplete(cursor,nct_id)
return None
elif r.status_code == 503:
# write http code to http.responses
upload_response(cursor, nct_id, version_a, version_b, r)
# write incomplete to http.download_status
write_incomplete(cursor,nct_id)
# tell all other processes to slow down the request speed
# Delay
with delay_time.get_lock():
delay_time.value += 1
time.sleep(reset_time.value)
print("Recieved 503 on {}, increasing delay count to {}".format(
nct_id,
delay_time)
)
else:
#TODO: this should handle errors by
# write http code to http.responses
upload_response(cursor, nct_id, version_a, version_b, r)
# write incomplete to http.download_status
write_incomplete(cursor,nct_id)
# raise exception
#raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code))
print("Recieved {} on {}, increasing delay count to {}".format(
r.status_code,
nct_id,
delay_time))
# Delay
with reset_time.get_lock():
time.sleep(reset_time.value)
return r
def write_incomplete(cursor, nct_id):
"""
Flags a trial as not having been fully downloaded.
"""
query = """
INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS) VALUES
(%s, 'Incomplete'::HTTP.HISTORY_DOWNLOAD_STATUS);
"""
cursor.execute(query, [nct_id] )
def download_trial_records(nct_id, delay_time, reset_time):
"""
Manage the download of all records associated with a given trial.
It uses a single connection and cursor for downloading the entire trial.
The benefit of distributing the work at the trial level is that errors related
to a trial can be handled at that level.
This doesn't reserve a trial for download, but it does release the reservation.
"""
# A new connection is created every time the function is called so that this
# function can be run using a multiprocessing pool
with postgres_conn() as db_conn:
with db_conn.cursor() as cursor:
#upload the first two versions
r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time)
#extract last version
if r is None:
return None
try:
v = get_highest_version_number(r)
except IndexError as ie:
raise RuntimeError(ie.__str__() + " | nct_id {}".format(nct_id))
#download and upload the remaining versions
if v == 2:
return None
elif v % 2 == 0:
for version_a, version_b in step_generator(v):
#download the history, handling any errors as they come up, and submitting it to the database.
download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time)
elif v % 2 == 1:
#if there are an odd number of submissions treat at as even
for version_a, version_b in step_generator(v):
download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time)
#now handle an odd number of versions by downloading the 1 vs (end) comparison.
download_and_handle_errors(cursor, nct_id, 1, v, delay_time, reset_time)
#now mark the trial as having been downloaded
cursor.execute(
"""
INSERT INTO http.download_status (nct_id,status)
VALUES (%s, 'Downloaded'::http.history_download_status)
"""
, [nct_id]
)
def step_generator(max_version):
"""
Used to generate a list of versions to request
The specific pattern generated is
(3,4), (5,6), (7,8),...,(max_version-1,max_version)
"""
old=3
for i in range(4,max_version,2):
yield (old,i)
old = i + 1
def reserve_trials(db_connection, limit=10):
"""
Reserves a certain number of trials for processing in the DB.
"""
query = """
WITH OF_INTEREST AS
(SELECT NCT_ID
FROM HTTP.TRIALS_TO_DOWNLOAD
LIMIT %s
)
INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS)
SELECT OF_INTEREST.NCT_ID, 'Reserved'::HTTP.HISTORY_DOWNLOAD_STATUS AS STATUS
FROM OF_INTEREST
RETURNING NCT_ID;
"""
with db_connection.cursor() as cursor:
cursor.execute(query, [limit] )
nctids_list = cursor.fetchall()
nctids_list = [ x[0] for x in nctids_list]
return nctids_list
def chunker(seq, size):
return [seq[pos:pos + size] for pos in range(0, len(seq), size)]
def reserve_and_download_versions(limit):
#lambda that parameterizes the downloader, allowing it to be passed to the pool.
def downloader(nct):
download_trial_records(nct, DELAY_TIME, RESET_TIME)
#db connection
with postgres_conn() as con:
itt = 0
while (nctids := reserve_trials(con,TRIAL_RESERVATION_BATCH_SIZE)) and \
itt < TRIAL_RESERVATION_LIMIT:
print(nctids)
with Pool(processes=12) as process_pool:
l = len(nctids)
itt += l
with tqdm(total=l) as prog_bar:
for _ in process_pool.imap_unordered(downloader, nctids):
prog_bar.update()
con.commit()
def run():
reserve_and_download_versions(TRIAL_RESERVATION_LIMIT)
if __name__ == "__main__":
"""
Main!
"""
run()
#db connection

@ -1,467 +0,0 @@
from collections import namedtuple
from copy import copy
from datetime import datetime
from bs4 import BeautifulSoup
from drugtools.env_setup import ENV,postgres_conn
from tqdm import tqdm
#requires Python 3.10
#### GLOBALS
VERBOSE = True if ENV["VERBOSE"] == "True" else False
###CLASSES AND CONSTRUCTORS
TagDatePair = namedtuple("TagDatePair", ["tag","date"])
TagTextPair = namedtuple("TagTextPair", ["tag","text"])
#superclasses
class VersionData():
"""
This class holds two types of data:
- Data with a 1-to-1 relationship with the trial/version pair.
- Data with a child relationship with the trial/version pair.
This initializes with None attributes, and implements setter
methods to load them (just to double check types)
That way I can just pass around the VersionData instance
and add data as I go.
It will also implement the ability to load the data to the database
"""
def __init__(self,nct_id,version_id,submission_date):
#identifiers
self.nct_id = nct_id.strip()
self.version_id = version_id
self.submission_date = submission_date
#Study Status
self._primary_completion_date = None
self._primary_completion_date_category = None
self._start_date = None
self._start_date_category = None
self._completion_date = None
self._completion_date_category = None
self._overall_status = None
self._enrollment = None
self._enrollment_category = None
self._sponsor = None
#self._sponsor_category = None #I don't believe this is included in the raw data
self._responsible_party = None
#self._responsible_party_category = None #I don't believe this is included in the raw data
#self._collaborators = None #currently going to ignore as I've not fount it in AACT
def load_to_db(self,db_connection):
#load to initial table, then load any extra details into other tables
sql = """
INSERT INTO history.trial_snapshots
(
nct_id,
version,
submission_date,
primary_completion_date,
primary_completion_date_category,
start_date,
start_date_category,
completion_date,
completion_date_category,
overall_status,
enrollment,
enrollment_category,
sponsor,
responsible_party
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
)
"""
with db_connection.cursor() as db_cursor:
try:
db_cursor.execute(
sql,
(
self.nct_id,
self.version_id,
self.submission_date,
self._primary_completion_date,
self._primary_completion_date_category,
self._start_date,
self._start_date_category,
self._completion_date,
self._completion_date_category,
self._overall_status,
self._enrollment,
self._enrollment_category,
self._sponsor,
self._responsible_party
)
)
except Exception as err:
#catch any error, print the applicable information, and raise the error.
print(self)
raise err
db_connection.commit()
############ Functions
def extract_submission_dates(soup):
"""
Extract dates for each version
"""
table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
version_date_dict = {}
for row in reversed(table_rows):
# if it is <td headers="VersionNumber">xx</td> then it contains what we need.
for td in row.findChildren("td"):
if ("headers" in td.attrs):
if (td.attrs["headers"][0]=="VersionNumber"):
version_number = int(td.text)
elif (td.attrs["headers"][0]=="VersionDate"):
version_date = td.text
version_date_dict[version_number] = datetime.strptime(version_date , "%B %d, %Y")
print(version_date_dict)
return version_date_dict
def optional_strip(possible_string):
if type(possible_string) == str:
return possible_string.strip()
else:
return possible_string
def extract_study_statuses(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
match tr_to_td(trow):
case ["Primary Completion:" as row_label, old,new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
tagdate1 = extract_date_and_tag(old.text)
version_a._primary_completion_date = tagdate1.date
version_a._primary_completion_date_category = optional_strip(tagdate1.tag)
tagdate2 = extract_date_and_tag(new.text)
version_b._primary_completion_date = tagdate2.date
version_b._primary_completion_date_category = optional_strip(tagdate2.tag)
case ["Study Start:" as row_label, old, new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
tagdate1 = extract_date_and_tag(old.text)
version_a._start_date = tagdate1.date
version_a._start_date_category = optional_strip(tagdate1.tag)
tagdate2 = extract_date_and_tag(new.text)
version_b._start_date = tagdate2.date
version_b._start_date_category = optional_strip(tagdate2.tag)
case ["Study Completion:" as row_label, old,new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
tagdate1 = extract_date_and_tag(old.text)
version_a._completion_date = tagdate1.date
version_a._completion_date_category = optional_strip(tagdate1.tag)
tagdate2 = extract_date_and_tag(new.text)
version_b._completion_date = tagdate2.date
version_b._completion_date_category = optional_strip(tagdate2.tag)
case ["Overall Status:" as row_label, old,new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
#split out any notes such as "Suspended [reason for suspenstion ]"
version_a._overall_status = optional_strip(old.text.split("[")[0])
#split out any notes such as "Suspended [reason for suspenstion ]"
version_b._overall_status = optional_strip(new.text.split("[")[0])
#FIX: There is an issue with NCT00789633 where the overall status includes information as to why it was suspended.
case _ as row_label:
print("row not matched: {}".format(row_label)) if VERBOSE else ""
def extract_study_design(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
match tr_to_td(trow):
case ["Enrollment:" as row_label, old, new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
#Extract tag and text, add them to preallocated object
tagtext1 = extract_text_and_tag(old.text)
version_a._enrollment = tagtext1.text
version_a._enrollment_category = optional_strip(tagtext1.tag)
tagtext2 = extract_text_and_tag(new.text)
version_b._enrollment = tagtext2.text
version_b._enrollment_category = optional_strip(tagtext2.tag)
case _ as row_label:
print("row not matched: {}".format(row_label)) if VERBOSE else ""
def extract_sponsor_data(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
match tr_to_td(trow):
case ["Sponsor:" as row_label, old, new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
version_a._sponsor = optional_strip(old.text)
version_b._sponsor = optional_strip(new.text)
case ["Responsible Party:" as row_label, old, new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
version_a._responsible_party = optional_strip(old.text)
version_b._responsible_party = optional_strip(new.text)
case ["Collaborators:" as row_label, old, new]:
print("row matched: {}".format(row_label)) if VERBOSE else ""
#TODO: find a trial with multiple collaborators and figure out how to identify/count them:w
# So far can't figure out where this is in AACT, so I'm going to ignore it.
pass
case _ as row_label:
print("row not matched: {}".format(row_label)) if VERBOSE else ""
def split_by_version(tag):
'''
OUTDATED: With the new format that separates old and new versions, I don't technically need this. It is a nice place to identify exact changes if those are every needed though and it removes the highlights cleanly.
'''
#clone elements and remove sub-tags that are not needed.
old = copy(tag)
for span in old.find_all(class_="add_hilite"):
span.extract()
new = copy(tag)
for span in new.find_all(class_="drop_hilite"):
span.extract()
return old,new
def extract_date_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
try:
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_YYYY)
except ValueError as ve:
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_DD_YYYY)
return TagDatePair(estimate_tag, date_object)
def extract_text_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagTextPair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
text_object = date_split[0].strip()
return TagTextPair(estimate_tag, text_object)
### FUNCTIONS
def tr_to_td(tr) -> tuple[str, str, str]:
"""
Takes an html data row of interest, extracts the record_name from the first <td>, and the data from the second <td>.
For the data, it just extracts the text.
The text itself then needs processed separately, based on what it should contain.
"""
#get list of cells
td_list = tr.find_all("td")
if len(td_list) == 3:
return td_list[0].text, td_list[1], td_list[2]
else:
return None, None, None
def get_forms(soup,version_a,version_b):
#extract all forms
for form in soup.body.find_all("form"):
#Match forms against ID types
if not "id" in form.attrs:
continue
#for each type of form (identified by the ID field)
# extract and add the data to the preallocated objects
match form.attrs["id"]:
case "form_StudyStatus":
extract_study_statuses(form,version_a,version_b)
case "form_SponsorCollaborators":
extract_sponsor_data(form, version_a, version_b)
case "form_Oversight":
pass
case "form_StudyDescription":
pass
case "form_Conditions":
pass
case "form_StudyDesign":
extract_study_design(form,version_a,version_b)
case "form_ArmsandInterventions":
pass
case "form_ProtocolOutcomeMeasures":
pass
case "form_Eligibility":
pass
case "form_ContactsLocations":
pass
case "form_IPDSharing":
pass
case "form_References":
pass
case "form_ParticipantFlow":
pass
case "form_BaselineCharacteristics":
pass
case "form_ROutcomeMeasures":
pass
case "form_AdverseEvents":
pass
case "form_LimitationsandCaveats":
pass
case "form_MoreInformation":
pass
case _ as form_name:
print("form not matched: {}".format(form_name)) if VERBOSE else ""
### CONSTANTS
date_MMMM_YYYY = "%B %Y"
date_MMMM_DD_YYYY = "%B %d, %Y"
def get_data_from_versions(nct_id,html, version_a_int, version_b_int):
soup = BeautifulSoup(html,"lxml")
print(getting_data_from_versions)
version_date_dict = extract_submission_dates(soup)
print(version_date_dict)
#preallocate version data
version_a = VersionData(nct_id, version_a_int, version_date_dict[version_a_int])
version_b = VersionData(nct_id, version_b_int, version_date_dict[version_b_int])
#extract data from html and put it in the preallocated objects
get_forms(soup, version_a, version_b)
return version_a,version_b
def run():
with postgres_conn() as db_connection:
#pull the requests from the db
with db_connection.cursor() as curse:
sql = """
SELECT nct_id, version_a,version_b, html
FROM http.responses
WHERE response_code = 200
"""
curse.execute(sql)
for response in tqdm(curse.fetchall()):
nct_id, version_a, version_b, html = response
print(nct_id)
print(nct_id, version_a, version_b) if VERBOSE else ""
version1, version2 = get_data_from_versions(nct_id, html, version_a, version_b)
if version_b == version_a + 1:
version1.load_to_db(db_connection)
version2.load_to_db(db_connection)
else:
version2.load_to_db(db_connection)
if __name__ == "__main__":
run()
"""
Documentation:
TO add a new field to extraction-lib
1. Locate the field in the HTML
- form id (e.g. <form id="form_StudyStatus> gives the form id "form_StudyStatus)
- Table row's data label. This corresponds to the text of first column in the row and will
look something like
<td class="rowLabel" style="min-width: 210px;">Record Verification:</td>.
"Record Verification:" is the data label in the example above.
2. Identify what data you will be extracting
- type (date, text, int, etc)
- if it contains a category ([Actual] vs [Anticipated] etc)
3. Add data to:
- sql table: history.trial_snapshots
- the VersionData class
- the VersionData.load_to_db() function
4. Ensure the field matcher in `get_forms(*)` is matching on the form ID and has a function processing the form
5. Ensure the function processing the form has a match entry to proceess the row
- This should match on data label and then process the data by
- splitting into old and new versions
- Extracting the data for both old and new
- add the data to the passed VersionData objects
"""

@ -1,15 +0,0 @@
from .env_setup import postgres_conn
from pathlib import Path
def run():
#get relative path
p = Path(__file__).with_name("selected_trials.sql")
with open(p,'r') as fh:
sqlfile = fh.read()
with postgres_conn() as connection:
with connection.cursor() as curse:
curse.execute(sqlfile)
if __name__ == "__main__":
run()

@ -1,118 +0,0 @@
import psycopg2 as psyco
from psycopg2 import sql
from psycopg2 import extras
import pymysql
from dotenv import load_dotenv
import os
from .env_setup import postgres_conn, mariadb_conn, get_tables_of_interest
##############NOTE
'''
mariadb --mariadb.connect--> incrementally fetched dict --psycopg2--> postgres
I will have the ability to reduce memory usage and simplify what I am doing.
'''
############### GLOBALS
#these are hardcoded so they shouldn't require any updates
mschema="rxnorm_current"
pschema="rxnorm_migrated"
########FUNCTIONS#################
def convert_column(d):
"""
Given the metadata about a column in mysql, make the portion of the `create table`
statement that corresponds to that column in postgres
"""
#extract
data_type = d["DATA_TYPE"]
position = d["ORDINAL_POSITION"]
table_name = d["TABLE_NAME"]
d["IS_NULLABLE"] = "NOT NULL" if d["IS_NULLABLE"] == "NO" else ""
#convert
if data_type=="varchar":
string = "{COLUMN_NAME} character varying({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
elif data_type=="char":
string = "{COLUMN_NAME} character({CHARACTER_MAXIMUM_LENGTH}) COLLATE pg_catalog.\"default\" {IS_NULLABLE}".format(**d)
elif data_type=="tinyint":
string = "{COLUMN_NAME} smallint {IS_NULLABLE}".format(**d)
elif data_type=="decimal":
string = "{COLUMN_NAME} numeric({NUMERIC_PRECISION},{NUMERIC_SCALE}) {IS_NULLABLE}".format(**d)
elif data_type=="int":
string = "{COLUMN_NAME} integer {IS_NULLABLE}".format(**d)
elif data_type=="enum":
string = None
elif data_type=="text":
string = None
return string
def run():
#get & convert datatypes for each table of interest
tables_of_interest = get_tables_of_interest()
with mariadb_conn(cursorclass=pymysql.cursors.DictCursor) as mcon, postgres_conn() as pcon:
with mcon.cursor() as mcurse, pcon.cursor(cursor_factory=extras.DictCursor) as pcurse:
for table in tables_of_interest: #create equivalent table in postgres
#get columns from mysql
q = "SELECT * FROM INFORMATION_SCHEMA.columns WHERE TABLE_SCHEMA=%s and TABLE_NAME=%s;"
mcurse.execute(q,[mschema,table])
#convert mysql column names and types to postgres column statements.
columns = [convert_column(a) for a in mcurse.fetchall() ]
#TODO make sure this uses psycopg colums correctly.
column_sql = sql.SQL(",\n".join(columns))
#build a header and footer
header=sql.SQL("CREATE TABLE IF NOT EXISTS {}\n(").format(sql.Identifier(pschema,table))
footer=sql.SQL(");")
#Joint the header, columns, and footer.
create_table_statement = sql.SQL("\n").join([header,column_sql,footer])
print(create_table_statement.as_string(pcon))
#Create the table in postgres
pcurse.execute(create_table_statement)
pcon.commit()
#Get the data from mysql
mcurse.execute("SELECT * FROM {schema}.{table}".format(schema=mschema,table=table))
#FIX setting up sql this^^^ way is improper.
results = mcurse.fetchall()
#build the insert statement template
#get list of field names
column_list = [sql.SQL(x) for x in results[0]]
column_inserts = [sql.SQL("%({})s".format(x)) for x in results[0]] #fix with sql.Placeholder
#generate insert statement
psql_insert = sql.SQL("INSERT INTO {table} ({columns}) VALUES %s ").format(
table=sql.Identifier(pschema,table)
,columns=sql.SQL(",").join(column_list)
)
#Note that this^^^^ does not contain parenthases around the placeholder
#Building the values template.
#Note that it must include the parenthases so that the
#VALUES portion is formatted correctly.
template = sql.SQL(",").join(column_inserts)
template = sql.Composed([
sql.SQL("(")
,template
,sql.SQL(")")
])
#insert the data with page_size
extras.execute_values(pcurse,psql_insert,argslist=results,template=template, page_size=1000)
if __name__ == "__main__":
run()

@ -1,21 +0,0 @@
DELETE FROM http.download_status;
INSERT INTO http.download_status (nct_id, status)
SELECT nct_id, 'Of Interest'::http.history_download_status AS status
FROM ctgov.studies
WHERE
is_fda_regulated_drug=TRUE
AND
study_type = 'Interventional'
AND
phase='Phase 3'
AND
overall_status in ('Terminated', 'Completed')
AND
start_date > '2010-01-01'
AND
completion_date < '2022-01-01'
;
SELECT count(*) FROM http.download_status ;

@ -1,36 +0,0 @@
from drugtools.env_setup import ENV,postgres_conn
from psycopg2 import extras
from collections import namedtuple
from tqdm import tqdm
FILES=[
"../non-db_data_sources/GBD and ICD-10_(2019 version)/NONFATAL_cause2code.psv",
"../non-db_data_sources/GBD and ICD-10_(2019 version)/COD_cause2code.psv"
]
SEP="|"
sql = """
INSERT INTO "DiseaseBurden".icd10_to_cause
(code,cause_text)
VALUES %s
"""
with postgres_conn() as pconn, pconn.cursor(cursor_factory=extras.DictCursor) as pcurse:
entries = []
for fpath in FILES:
print(fpath)
with open(fpath,"r") as fh:
for line in tqdm(fh.readlines(),desc=fpath):
code,cause = line.split(SEP)
code = code.strip()
cause = cause.strip()
entries.append((code,cause))
extras.execute_values(pcurse, sql , entries)

@ -1,5 +0,0 @@
#!/bin/bash
rm -r ../containers/RxNav-In-a-box/rxnav_data/*
rm -r ../containers/AACT_downloader/postgresql/data

@ -1,24 +0,0 @@
from drugtools import env_setup
from drugtools import historical_trial_selector as hts
from drugtools import historical_nct_downloader as hnd
from drugtools import historical_nct_extractor as hne
from drugtools import download_and_extract_nsde as daen
from drugtools import migrate_mysql2pgsql as mm2p
print("Current Environment")
print(env_setup.ENV)
cont = input("Are you willing to continue with the current environmnet? y/[n]")
if cont == "Y" or cont == "y":
print("SelectingTrials")
#hts.run()
print("downloading trials")
#hnd.run()
print("extracting trials")
hne.run()
exit(0)
daen.run()
mm2p.run()
else:
print("Please fix your .env file and try again")

@ -1,87 +0,0 @@
import requests
import json
from drugtools.env_setup import ENV,postgres_conn
from psycopg2 import extras
from collections import namedtuple
from tqdm import tqdm
RecordStuff = namedtuple("RecordStuff", "nct_id condition ui uri rootSource name")
class Requestor():
def __init__(self,api_key):
self.key = api_key
def search(self,search_term,inputType="sourceUi", returnIdType="code", addnl_terms={}):
query_terms = {
"apiKey":self.key,
"sabs":"ICD10",
"string":search_term,
"returnIdType":returnIdType,
"inputType":inputType
} | addnl_terms
query = "https://uts-ws.nlm.nih.gov/rest/search/current/"
r = requests.get(query,params=query_terms)
return r
r = Requestor(ENV.get("UMLS_API_KEY"))
with postgres_conn() as pconn, pconn.cursor(cursor_factory=extras.DictCursor) as pcurse:
sql = """
select nct_id, downcase_mesh_term
from ctgov.browse_conditions bc
where
mesh_type = 'mesh-list'
and
nct_id in (select distinct nct_id from history.trial_snapshots ts)
order by nct_id
;
"""
sql2 = """
with cte as (
/* Keywords added too much noise
select nct_id,downcase_name
from ctgov.keywords k
where nct_id in (select distinct nct_id from history.trial_snapshots ts)
union */
select nct_id, downcase_name
from ctgov.conditions c
union
select nct_id ,downcase_mesh_term as downcase_name
from ctgov.browse_conditions bc
where mesh_type = 'mesh-list'
)
select nct_id, downcase_name from cte
where nct_id in (select distinct nct_id from history.trial_snapshots ts)
order by nct_id
"""
pcurse.execute(sql2)
rows = pcurse.fetchall()
entries = []
for row in tqdm(rows,desc="Search MeSH terms"):
nctid = row[0]
condition = row[1]
# print(nctid,condition)
results = r.search(row[1]).json().get('result', Exception("No result entry in json")).get('results',Exception("No results entry in json"))
#if results are empty?
if not results:
entries.append(RecordStuff(nctid,condition,None,None,None,None))
else:
for entry in results:
entries.append(RecordStuff(nctid, condition, entry["ui"], entry["uri"], entry["rootSource"], entry["name"]))
sql_insert = """
INSERT INTO "DiseaseBurden".trial_to_icd10
(nct_id, "condition", ui,uri,rootsource,"name","source",approved,approval_timestamp)
VALUES
(%(nct_id)s, %(condition)s, %(ui)s, %(uri)s, %(rootSource)s, %(name)s, 'UMLS API search', null,null)
"""
for entry in tqdm(entries,desc="Inserting entries to DB"):
pcurse.execute(sql_insert,entry._asdict())
Loading…
Cancel
Save