diff --git a/scripts/drugtools/historical_nct_downloader.py b/scripts/drugtools/historical_nct_downloader.py
index af3ce3e..31cf456 100644
--- a/scripts/drugtools/historical_nct_downloader.py
+++ b/scripts/drugtools/historical_nct_downloader.py
@@ -5,11 +5,13 @@ from multiprocess import Pool, Value
import math
import time
from drugtools.env_setup import postgres_conn, ENV
+from tqdm import tqdm
############ GLOBALS
RESET_TIME = Value('I',int(ENV["TRIAL_DOWNLOAD_RESET_TIME"]))
DELAY_TIME = Value("I",int(ENV["TRIAL_DOWNLOAD_DELAY_TIME"]))
TRIAL_RESERVATION_LIMIT=int(ENV["TRIAL_RESERVATION_LIMIT"])
+TRIAL_RESERVATION_BATCH_SIZE=int(ENV["TRIAL_RESERVATION_BATCH_SIZE"])
############ Functions
def get_highest_version_number(response):
@@ -25,7 +27,10 @@ def get_highest_version_number(response):
soup = BeautifulSoup(response.text, features="lxml")
#get version table rows
- table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
+ try:
+ table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
+ except IndexError as ie:
+ raise ie
for row in reversed(table_rows):
# if it is
xx | then it contains what we need.
@@ -44,7 +49,11 @@ def make_request(nct_id,version1,version2):
url = baseurl.format(nct_id,version1,version2)
#make request
- response = requests.get(url)
+ try:
+ time.sleep(0.02)
+ response = requests.get(url)
+ except requests.exceptions.ConnectionError as ce:
+ raise ce
#return the response
return response
@@ -85,16 +94,24 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time,
#check for
if r.status_code == 200:
upload_response(cursor,nct_id,version_a, version_b, r)
+ elif r.status_code == 404:
+ upload_response(cursor, nct_id, version_a, version_b, r)
+ write_incomplete(cursor,nct_id)
+ return None
elif r.status_code == 503:
# write http code to http.responses
upload_response(cursor, nct_id, version_a, version_b, r)
# write incomplete to http.download_status
write_incomplete(cursor,nct_id)
# tell all other processes to slow down the request speed
- delay_time.value += 1
# Delay
- print("Recieved 503 on {}, increasing delay count to {}".format(nct_id, delay_tiome))
- time.sleep(reset_time)
+ with delay_time.get_lock():
+ delay_time.value += 1
+ time.sleep(reset_time.value)
+ print("Recieved 503 on {}, increasing delay count to {}".format(
+ nct_id,
+ delay_time)
+ )
else:
#TODO: this should handle errors by
# write http code to http.responses
@@ -104,8 +121,13 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time,
# raise exception
#raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code))
+ print("Recieved {} on {}, increasing delay count to {}".format(
+ r.status_code,
+ nct_id,
+ delay_time))
# Delay
- time.sleep(reset_time)
+ with reset_time.get_lock():
+ time.sleep(reset_time.value)
return r
def write_incomplete(cursor, nct_id):
@@ -128,9 +150,6 @@ def download_trial_records(nct_id, delay_time, reset_time):
This doesn't reserve a trial for download, but it does release the reservation.
"""
- #for testing
- print(nct_id)
-
# A new connection is created every time the function is called so that this
# function can be run using a multiprocessing pool
@@ -140,8 +159,14 @@ def download_trial_records(nct_id, delay_time, reset_time):
#upload the first two versions
r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time)
#extract last version
- v = get_highest_version_number(r)
+ if r is None:
+ return None
+
+ try:
+ v = get_highest_version_number(r)
+ except IndexError as ie:
+ raise RuntimeError(ie.__str__() + " | nct_id {}".format(nct_id))
#download and upload the remaining versions
if v == 2:
@@ -205,23 +230,29 @@ def reserve_trials(db_connection, limit=10):
return nctids_list
+def chunker(seq, size):
+ return [seq[pos:pos + size] for pos in range(0, len(seq), size)]
def reserve_and_download_versions(limit):
- #db connection
- with postgres_conn() as con:
-
- #get list of nct_ids
- nctids = reserve_trials(con, limit)
- print("reserving_trials: ", nctids)
-
#lambda that parameterizes the downloader, allowing it to be passed to the pool.
def downloader(nct):
download_trial_records(nct, DELAY_TIME, RESET_TIME)
- #start analyzing them
- with Pool(processes=12) as process_pool:
- process_pool.map(downloader, nctids)
+ #db connection
+ with postgres_conn() as con:
+ itt = 0
+ while (nctids := reserve_trials(con,TRIAL_RESERVATION_BATCH_SIZE)) and \
+ itt < TRIAL_RESERVATION_LIMIT:
+ print(nctids)
+ with Pool(processes=12) as process_pool:
+ l = len(nctids)
+ itt += l
+ with tqdm(total=l) as prog_bar:
+ for _ in process_pool.imap_unordered(downloader, nctids):
+ prog_bar.update()
+ con.commit()
+
def run():
@@ -231,4 +262,5 @@ if __name__ == "__main__":
"""
Main!
"""
- run()
\ No newline at end of file
+ run()
+ #db connection
diff --git a/scripts/runall.py b/scripts/runall.py
index 39935f0..01e481d 100644
--- a/scripts/runall.py
+++ b/scripts/runall.py
@@ -11,9 +11,13 @@ print(env_setup.ENV)
cont = input("Are you willing to continue with the current environmnet? y/[n]")
if cont == "Y" or cont == "y":
- hts.run()
- hnd.run()
+ print("SelectingTrials")
+ #hts.run()
+ print("downloading trials")
+ #hnd.run()
+ print("extracting trials")
hne.run()
+ exit(0)
daen.run()
mm2p.run()
else: