Merging work from different computers. FF merge.

Includes a lot of data and updates to various files.

Merge branch 'main' of ssh://git.youainti.com:3022/youainti/ClinicalTrialsDataProcessing
main
will king 2 years ago
commit de3698052b

6
.gitattributes vendored

@ -0,0 +1,6 @@
*.sql.gzip filter=lfs diff=lfs merge=lfs -text
*.xlsx filter=lfs diff=lfs merge=lfs -text
containers/AACT_Reloader/2023-09-06_aactdb_with_matches.sql.gz filter=lfs diff=lfs merge=lfs -text
other_data/USP[[:space:]]DC/usp_dc_pub_2023_release_2.0_updated_final.csv filter=lfs diff=lfs merge=lfs -text
other_data/USP[[:space:]]MMG/MMG_v8.0_Alignment_File.csv filter=lfs diff=lfs merge=lfs -text
other_data/VA[[:space:]]Formulary/PharmacyProductSystem_NationalDrugCodeExtract.csv filter=lfs diff=lfs merge=lfs -text

@ -0,0 +1 @@
backup/2023-09-06_aactdb_with_matches.sql.gz filter=lfs diff=lfs merge=lfs -text

@ -0,0 +1,39 @@
#!/bin/bash
RESTORE_DUMP_GZ=2023-09-06_aactdb_with_matches.sql.gz
POSTGRES_USER=root
POSTGRES_PASSWORD=root
POSTGRES_DB=aact_db
#start container
podman run \
-e POSTGRES_PASSWORD="${POSTGRES_PASSWORD}" \
-e POSTGRES_USER="${POSTGRES_USER}" \
-e POSTGRES_DB="${POSTGRES_DB}" \
--name "${POSTGRES_DB}" \
--detach \
--shm-size=512mb \
--volume ./backup/:/backup/ \
-p 5432:5432\
postgres:14-alpine
sleep 10
# Function to check if PostgreSQL is ready
function check_postgres {
podman exec -i "${POSTGRES_DB}" psql -h localhost -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c '\q' > /dev/null 2>&1
}
# Wait for PostgreSQL to be ready
until check_postgres; do
echo "Waiting for PostgreSQL to be ready..."
sleep 4
done
echo "PostgreSQL is ready. Restoring the database..."
# Decompress the dump file and restore it to the database
podman exec -i "${POSTGRES_DB}" sh -c "gunzip -c /backup/${RESTORE_DUMP_GZ} | psql -h localhost -U ${POSTGRES_USER} -d ${POSTGRES_DB}"
echo "Database restoration complete."

@ -5,23 +5,19 @@
# - move postgress login credentials (allow them to be printed from just while setting up)
#paths for aact_db (postgres)
aact_download_link := "https://ctti-aact.nyc3.digitaloceanspaces.com/27grtsnhtccplxapj2o8ak9aotvv"
aact_download_file := "2022-12-23_postgres_data.zip"
aact_download_path := "./containers/AACT_downloader/aact_downloads"
aact_zipped_data_filepath := aact_download_path / aact_download_file
data_link := "https://ctti-aact.nyc3.digitaloceanspaces.com/27grtsnhtccplxapj2o8ak9aotvv"
data_file := "2022-12-23_postgres_data.zip"
data_path := "./containers/AACT_downloader/aact_downloads"
data_filepath := data_path / data_file
#must match the 'container name: aact_db' in the docker-compose.yaml
docker_container := `docker container ls -a | grep "aact_db|rxnav_db" | cut -f 1 -d " " | tr "\n" " "`
#paths for rxnavinabox
rxnav_path := "./containers/RxNav-In-a-box"
rxnav_version := "rxnav-in-a-box-20230103"
rxnav_data_path := rxnav_path / rxnav_version / "mysql" / "02_data.sql"
docker_container := `docker container ls -a | grep aact_db | cut -f 1 -d " " | tr "\n" " "`
#Various paths for docker stuff
docker-compose_path := "./containers/docker-compose.yaml"
docker-compose_path := "./AACT_downloader/docker-compose.yaml"
#rxnorm_mappings
rxnorm_mappings_url := "https://dailymed-data.nlm.nih.gov/public-release-files/rxnorm_mappings.zip"
#Number of historical trials to download.
count := "100"
@ -32,24 +28,19 @@ check-status:
docker --version
#check if python version > 3.10.
python --version
#python -c 'import sys; exit(sys.hexversion >= 50859504)'
python -c 'import sys; exit(sys.hexversion >= 50859504)'
curl --version
echo "current docker containers:{{docker_container}}"
#Setup the AACT container
setup-containers:
echo "todo"
@echo "Check for downloaded data"
#aact
[ -s {{aact_download_path}}/postgres_data.dmp ]
#rxnav
[ -s {{rxnav_data_path}} ]
[ -s {{data_path}}/postgres_data.dmp ]
#run docker compose
@echo "Setting up AACT_db & RxNav_db container"
@echo "Setting up AACT container"
docker-compose -f {{docker-compose_path}} up -d
#Stop the appropriate docker container
stop-containers:
#stop all docker containers if they are currently running.
@ -69,13 +60,10 @@ clean-docker: stop-containers
#Download the AACT data
download-aact-data:
#download
curl {{aact_download_link}} > {{aact_zipped_data_filepath}}
unzip {{aact_zipped_data_filepath}} -d {{aact_download_path}}
rm {{aact_zipped_data_filepath}}
curl {{data_link}} > ./AACT_downloader/aact_downloads/{{data_file}}
unzip {{data_filepath}} -d {{data_path}}
rm {{data_filepath}}
download-rxnav-data:
echo "Currently manually downloaded."
#build based on previously downloaded data
build: check-status setup-containers
@ -117,3 +105,8 @@ get-nsde:
cd market_data && bash download_nsde.sh
cd market_data && python extract_nsde.py
get-rxnorm-mappings:
#this may not be needed, all it does is match spls to rxcuis and I think I already have that.
curl {{rxnorm_mappings_url}} > ./market_data/rxnorm_mappings.zip
cd ./market_data && unzip ./rxnorm_mappings.zip
rm ./market_data/rxnorm_mappings.zip

Binary file not shown.

Binary file not shown.
1 version https://git-lfs.github.com/spec/v1
2 oid sha256:d08d3944a859c0b1f6bbd466ca027fc46c86ef5bb0328cb005fa002b7b61e70b
3 size 2451625

Binary file not shown.

Binary file not shown.
1 version https://git-lfs.github.com/spec/v1
2 oid sha256:85859dae3971460d36e0643ee1396cb646dba158b75862d557210cb2c50707a9
3 size 874058

Binary file not shown.

Binary file not shown.
1 version https://git-lfs.github.com/spec/v1
2 oid sha256:8c877a461be9e75565f78d0552e76f597b5cce709c82a3f9ad30dcc0f26ddafc
3 size 32481883

Binary file not shown.

@ -5,11 +5,13 @@ from multiprocess import Pool, Value
import math
import time
from drugtools.env_setup import postgres_conn, ENV
from tqdm import tqdm
############ GLOBALS
RESET_TIME = Value('I',int(ENV["TRIAL_DOWNLOAD_RESET_TIME"]))
DELAY_TIME = Value("I",int(ENV["TRIAL_DOWNLOAD_DELAY_TIME"]))
TRIAL_RESERVATION_LIMIT=int(ENV["TRIAL_RESERVATION_LIMIT"])
TRIAL_RESERVATION_BATCH_SIZE=int(ENV["TRIAL_RESERVATION_BATCH_SIZE"])
############ Functions
def get_highest_version_number(response):
@ -25,7 +27,10 @@ def get_highest_version_number(response):
soup = BeautifulSoup(response.text, features="lxml")
#get version table rows
try:
table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
except IndexError as ie:
raise ie
for row in reversed(table_rows):
# if it is <td headers="VersionNumber">xx</td> then it contains what we need.
@ -44,7 +49,11 @@ def make_request(nct_id,version1,version2):
url = baseurl.format(nct_id,version1,version2)
#make request
try:
time.sleep(0.02)
response = requests.get(url)
except requests.exceptions.ConnectionError as ce:
raise ce
#return the response
return response
@ -85,16 +94,24 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time,
#check for
if r.status_code == 200:
upload_response(cursor,nct_id,version_a, version_b, r)
elif r.status_code == 404:
upload_response(cursor, nct_id, version_a, version_b, r)
write_incomplete(cursor,nct_id)
return None
elif r.status_code == 503:
# write http code to http.responses
upload_response(cursor, nct_id, version_a, version_b, r)
# write incomplete to http.download_status
write_incomplete(cursor,nct_id)
# tell all other processes to slow down the request speed
delay_time.value += 1
# Delay
print("Recieved 503 on {}, increasing delay count to {}".format(nct_id, delay_tiome))
time.sleep(reset_time)
with delay_time.get_lock():
delay_time.value += 1
time.sleep(reset_time.value)
print("Recieved 503 on {}, increasing delay count to {}".format(
nct_id,
delay_time)
)
else:
#TODO: this should handle errors by
# write http code to http.responses
@ -104,8 +121,13 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time,
# raise exception
#raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code))
print("Recieved {} on {}, increasing delay count to {}".format(
r.status_code,
nct_id,
delay_time))
# Delay
time.sleep(reset_time)
with reset_time.get_lock():
time.sleep(reset_time.value)
return r
def write_incomplete(cursor, nct_id):
@ -128,9 +150,6 @@ def download_trial_records(nct_id, delay_time, reset_time):
This doesn't reserve a trial for download, but it does release the reservation.
"""
#for testing
print(nct_id)
# A new connection is created every time the function is called so that this
# function can be run using a multiprocessing pool
@ -140,9 +159,15 @@ def download_trial_records(nct_id, delay_time, reset_time):
#upload the first two versions
r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time)
#extract last version
v = get_highest_version_number(r)
if r is None:
return None
try:
v = get_highest_version_number(r)
except IndexError as ie:
raise RuntimeError(ie.__str__() + " | nct_id {}".format(nct_id))
#download and upload the remaining versions
if v == 2:
return None
@ -205,23 +230,29 @@ def reserve_trials(db_connection, limit=10):
return nctids_list
def chunker(seq, size):
return [seq[pos:pos + size] for pos in range(0, len(seq), size)]
def reserve_and_download_versions(limit):
#db connection
with postgres_conn() as con:
#get list of nct_ids
nctids = reserve_trials(con, limit)
print("reserving_trials: ", nctids)
#lambda that parameterizes the downloader, allowing it to be passed to the pool.
def downloader(nct):
download_trial_records(nct, DELAY_TIME, RESET_TIME)
#start analyzing them
#db connection
with postgres_conn() as con:
itt = 0
while (nctids := reserve_trials(con,TRIAL_RESERVATION_BATCH_SIZE)) and \
itt < TRIAL_RESERVATION_LIMIT:
print(nctids)
with Pool(processes=12) as process_pool:
process_pool.map(downloader, nctids)
l = len(nctids)
itt += l
with tqdm(total=l) as prog_bar:
for _ in process_pool.imap_unordered(downloader, nctids):
prog_bar.update()
con.commit()
def run():
@ -232,3 +263,4 @@ if __name__ == "__main__":
Main!
"""
run()
#db connection

@ -126,17 +126,18 @@ def extract_submission_dates(soup):
version_date_dict = {}
for row in reversed(table_rows):
for row in table_rows:
# if it is <td headers="VersionNumber">xx</td> then it contains what we need.
version_number = None
version_date = None
for td in row.findChildren("td"):
if ("headers" in td.attrs):
if (td.attrs["headers"][0]=="VersionNumber"):
version_number = int(td.text)
elif (td.attrs["headers"][0]=="VersionDate"):
version_date = td.text
version_date_dict[version_number] = datetime.strptime(version_date , "%B %d, %Y")
version_date = datetime.strptime(td.text.strip() , "%B %d, %Y")
print(version_date_dict)
version_date_dict[version_number] = version_date
return version_date_dict
def optional_strip(possible_string):
@ -396,14 +397,12 @@ date_MMMM_DD_YYYY = "%B %d, %Y"
def get_data_from_versions(nct_id,html, version_a_int, version_b_int):
soup = BeautifulSoup(html,"lxml")
print(getting_data_from_versions)
version_date_dict = extract_submission_dates(soup)
print(version_date_dict)
#preallocate version data
version_a = VersionData(nct_id, version_a_int, version_date_dict[version_a_int])
version_b = VersionData(nct_id, version_b_int, version_date_dict[version_b_int])
version_a = VersionData(nct_id, version_a_int, version_date_dict.get(version_a_int))
version_b = VersionData(nct_id, version_b_int, version_date_dict.get(version_b_int))
#extract data from html and put it in the preallocated objects
get_forms(soup, version_a, version_b)
@ -424,7 +423,6 @@ def run():
curse.execute(sql)
for response in tqdm(curse.fetchall()):
nct_id, version_a, version_b, html = response
print(nct_id)
print(nct_id, version_a, version_b) if VERBOSE else ""

@ -11,9 +11,13 @@ print(env_setup.ENV)
cont = input("Are you willing to continue with the current environmnet? y/[n]")
if cont == "Y" or cont == "y":
hts.run()
hnd.run()
print("SelectingTrials")
#hts.run()
print("downloading trials")
#hnd.run()
print("extracting trials")
hne.run()
exit(0)
daen.run()
mm2p.run()
else:

Loading…
Cancel
Save