From 2a9b8349bafebb2e5a81623dbe58c9c9d4d561d0 Mon Sep 17 00:00:00 2001 From: youainti Date: Fri, 31 Mar 2023 16:56:05 -0700 Subject: [PATCH] storing local details --- .../drugtools/historical_nct_downloader.py | 74 +++++++++++++------ scripts/runall.py | 8 +- 2 files changed, 59 insertions(+), 23 deletions(-) diff --git a/scripts/drugtools/historical_nct_downloader.py b/scripts/drugtools/historical_nct_downloader.py index af3ce3e..31cf456 100644 --- a/scripts/drugtools/historical_nct_downloader.py +++ b/scripts/drugtools/historical_nct_downloader.py @@ -5,11 +5,13 @@ from multiprocess import Pool, Value import math import time from drugtools.env_setup import postgres_conn, ENV +from tqdm import tqdm ############ GLOBALS RESET_TIME = Value('I',int(ENV["TRIAL_DOWNLOAD_RESET_TIME"])) DELAY_TIME = Value("I",int(ENV["TRIAL_DOWNLOAD_DELAY_TIME"])) TRIAL_RESERVATION_LIMIT=int(ENV["TRIAL_RESERVATION_LIMIT"]) +TRIAL_RESERVATION_BATCH_SIZE=int(ENV["TRIAL_RESERVATION_BATCH_SIZE"]) ############ Functions def get_highest_version_number(response): @@ -25,7 +27,10 @@ def get_highest_version_number(response): soup = BeautifulSoup(response.text, features="lxml") #get version table rows - table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr") + try: + table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr") + except IndexError as ie: + raise ie for row in reversed(table_rows): # if it is xx then it contains what we need. @@ -44,7 +49,11 @@ def make_request(nct_id,version1,version2): url = baseurl.format(nct_id,version1,version2) #make request - response = requests.get(url) + try: + time.sleep(0.02) + response = requests.get(url) + except requests.exceptions.ConnectionError as ce: + raise ce #return the response return response @@ -85,16 +94,24 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, #check for if r.status_code == 200: upload_response(cursor,nct_id,version_a, version_b, r) + elif r.status_code == 404: + upload_response(cursor, nct_id, version_a, version_b, r) + write_incomplete(cursor,nct_id) + return None elif r.status_code == 503: # write http code to http.responses upload_response(cursor, nct_id, version_a, version_b, r) # write incomplete to http.download_status write_incomplete(cursor,nct_id) # tell all other processes to slow down the request speed - delay_time.value += 1 # Delay - print("Recieved 503 on {}, increasing delay count to {}".format(nct_id, delay_tiome)) - time.sleep(reset_time) + with delay_time.get_lock(): + delay_time.value += 1 + time.sleep(reset_time.value) + print("Recieved 503 on {}, increasing delay count to {}".format( + nct_id, + delay_time) + ) else: #TODO: this should handle errors by # write http code to http.responses @@ -104,8 +121,13 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, # raise exception #raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code)) + print("Recieved {} on {}, increasing delay count to {}".format( + r.status_code, + nct_id, + delay_time)) # Delay - time.sleep(reset_time) + with reset_time.get_lock(): + time.sleep(reset_time.value) return r def write_incomplete(cursor, nct_id): @@ -128,9 +150,6 @@ def download_trial_records(nct_id, delay_time, reset_time): This doesn't reserve a trial for download, but it does release the reservation. """ - #for testing - print(nct_id) - # A new connection is created every time the function is called so that this # function can be run using a multiprocessing pool @@ -140,8 +159,14 @@ def download_trial_records(nct_id, delay_time, reset_time): #upload the first two versions r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time) #extract last version - v = get_highest_version_number(r) + if r is None: + return None + + try: + v = get_highest_version_number(r) + except IndexError as ie: + raise RuntimeError(ie.__str__() + " | nct_id {}".format(nct_id)) #download and upload the remaining versions if v == 2: @@ -205,23 +230,29 @@ def reserve_trials(db_connection, limit=10): return nctids_list +def chunker(seq, size): + return [seq[pos:pos + size] for pos in range(0, len(seq), size)] def reserve_and_download_versions(limit): - #db connection - with postgres_conn() as con: - - #get list of nct_ids - nctids = reserve_trials(con, limit) - print("reserving_trials: ", nctids) - #lambda that parameterizes the downloader, allowing it to be passed to the pool. def downloader(nct): download_trial_records(nct, DELAY_TIME, RESET_TIME) - #start analyzing them - with Pool(processes=12) as process_pool: - process_pool.map(downloader, nctids) + #db connection + with postgres_conn() as con: + itt = 0 + while (nctids := reserve_trials(con,TRIAL_RESERVATION_BATCH_SIZE)) and \ + itt < TRIAL_RESERVATION_LIMIT: + print(nctids) + with Pool(processes=12) as process_pool: + l = len(nctids) + itt += l + with tqdm(total=l) as prog_bar: + for _ in process_pool.imap_unordered(downloader, nctids): + prog_bar.update() + con.commit() + def run(): @@ -231,4 +262,5 @@ if __name__ == "__main__": """ Main! """ - run() \ No newline at end of file + run() + #db connection diff --git a/scripts/runall.py b/scripts/runall.py index 39935f0..01e481d 100644 --- a/scripts/runall.py +++ b/scripts/runall.py @@ -11,9 +11,13 @@ print(env_setup.ENV) cont = input("Are you willing to continue with the current environmnet? y/[n]") if cont == "Y" or cont == "y": - hts.run() - hnd.run() + print("SelectingTrials") + #hts.run() + print("downloading trials") + #hnd.run() + print("extracting trials") hne.run() + exit(0) daen.run() mm2p.run() else: