From ff2c5b9ddd43a2ef870d41606108ac3f2a88668d Mon Sep 17 00:00:00 2001 From: youainti Date: Fri, 3 Jun 2022 18:09:20 -0700 Subject: [PATCH] Added rate limiting functionality. Not tested as things ran just fine for the 1700 trials I downloaded. --- history_downloader/downloader.py | 73 ++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/history_downloader/downloader.py b/history_downloader/downloader.py index ab4b363..a93d9c4 100644 --- a/history_downloader/downloader.py +++ b/history_downloader/downloader.py @@ -3,8 +3,9 @@ import requests import psycopg2 as psyco from datetime import datetime from bs4 import BeautifulSoup - -from multiprocessing import Pool +from multiprocessing import Pool, Value +import math +import time def get_highest_version_number(response): @@ -25,7 +26,6 @@ def get_highest_version_number(response): for row in reversed(table_rows): # if it is xx then it contains what we need. for td in row.findChildren("td"): - print("\n", td) if ("headers" in td.attrs) and (td.attrs["headers"][0]=="VersionNumber"): #Note the use of [0] above. attribute elements are lists. version_number = int(td.text) @@ -69,26 +69,52 @@ def upload_response(db_cursor, nct_id, version_a, version_b, response): ) ) -def download_and_handle_errors(cursor, nct_id, version_a, version_b): +def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time): """ Request a page, checking for http error codes, and handle the errors as requested. """ + #sleep log10(counts of delays) + time.sleep(math.log10(delay_time.value)) + #request page r = make_request(nct_id, version_a, version_b) #check for if r.status_code == 200: upload_response(cursor,nct_id,version_a, version_b, r) + elif r.status_code == 503: + # write http code to http.responses + upload_response(cursor, nct_id, version_a, version_b, r) + # write incomplete to http.download_status + write_incomplete(cursor,nct_id) + # tell all other processes to slow down the request speed + delay_time.value += 1 + # Delay + print("Recieved 503 on {}, increasing delay count to {}".format(nct_id, delay_tiome)) + time.sleep(reset_time) else: #TODO: this should handle errors by - # - [ ] write http code to http.responses - # - [ ] write incomplete to http.download_status - # - [ ] tell all other processes to slow down the request speed - # - [x] raise exception - raise Exception("Download of {} (versions {},{}) returned http code != 200".format(nct_id,version_a,version_b)) + # write http code to http.responses + upload_response(cursor, nct_id, version_a, version_b, r) + # write incomplete to http.download_status + write_incomplete(cursor,nct_id) + # raise exception + #raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code)) + # Delay + time.sleep(reset_time) return r -def download_trial_records(nct_id, db_connection_specs): +def write_incomplete(cursor, nct_id): + """ + Flags a trial as not having been fully downloaded. + """ + query = """ + INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS) + %s, 'Incomplete'::HTTP.HISTORY_DOWNLOAD_STATUS + """ + cursor.execute(query, [nct_id] ) + +def download_trial_records(nct_id, db_connection_specs, delay_time, reset_time): """ Manage the download of all records associated with a given trial. It uses a single connection and cursor for downloading the entire trial. @@ -108,7 +134,7 @@ def download_trial_records(nct_id, db_connection_specs): with db_conn.cursor() as cursor: #upload the first two versions - r = download_and_handle_errors(cursor, nct_id, 1, 2) + r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time) #extract last version v = get_highest_version_number(r) @@ -119,13 +145,13 @@ def download_trial_records(nct_id, db_connection_specs): elif v % 2 == 0: for version_a, version_b in step_generator(v): #download the history, handling any errors as they come up, and submitting it to the database. - download_and_handle_errors(cursor, nct_id, version_a, version_b) + download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time) elif v % 2 == 1: #if there are an odd number of submissions treat at as even for version_a, version_b in step_generator(v): - download_and_handle_errors(cursor, nct_id, version_a, version_b) + download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time) #now handle an odd number of versions by downloading the 1 vs (end) comparison. - download_and_handle_errors(cursor, nct_id, 1, v) + download_and_handle_errors(cursor, nct_id, 1, v, delay_time, reset_time) #now mark the trial as having been downloaded cursor.execute( @@ -229,26 +255,27 @@ if __name__ == "__main__": dbc = DBConnectionCreator( dbname="aact_db" ,user="python_downloader" - ,host="localhost" + ,host="will-office" ,port=5432 ,password="download") - #lambda that parameterizes the downloader, allowing it to be passed to the pool. - def downloader(nct): - download_trial_records(nct, dbc) #db connection with dbc.new() as con: - #select lists to download - #flag_trials_of_interest(con) #SHould only be run once - #get list of nct_ids - nctids = reserve_trials(con, 10) + nctids = reserve_trials(con, 1500) print(nctids) + + reset_time = 10 + delay_time = Value("I",1) + #lambda that parameterizes the downloader, allowing it to be passed to the pool. + def downloader(nct): + download_trial_records(nct, dbc, delay_time, reset_time) + #start analyzing them - with Pool(processes=4) as process_pool: + with Pool(processes=12) as process_pool: process_pool.map(downloader, nctids)