from tkinter import W import requests import psycopg2 as psyco from datetime import datetime from bs4 import BeautifulSoup from multiprocessing import Pool, Value import math import time def get_highest_version_number(response): """ Navigate to the version table and and extract the highest posted version. As there are cases where the last element in the table IS NOT a a version entry, this function iterates from the last row entry to the first, looking for cells with the correct header, indicating that it contains version information. The last one occuring in the unreversed list is what we need. """ soup = BeautifulSoup(response.text, features="lxml") #get version table rows table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr") for row in reversed(table_rows): # if it is xx then it contains what we need. for td in row.findChildren("td"): if ("headers" in td.attrs) and (td.attrs["headers"][0]=="VersionNumber"): #Note the use of [0] above. attribute elements are lists. version_number = int(td.text) return version_number def make_request(nct_id,version1,version2): """ Request a page comparing two snapshots """ #create url baseurl = "https://clinicaltrials.gov/ct2/history/{}?A={}&B={}&C=Side-by-Side" url = baseurl.format(nct_id,version1,version2) #make request response = requests.get(url) #return the response return response def upload_response(db_cursor, nct_id, version_a, version_b, response): """ Upload a requested page (with versions) to the database. """ timestamp = datetime.strptime(response.headers['date'], "%a, %d %b %Y %H:%M:%S %Z") #this uploads the response values. db_cursor.execute(""" INSERT INTO http.responses (nct_id,version_a,version_b,url,response_code,response_date, html) VALUES (%s,%s,%s,%s,%s,%s,%s) ; """ ,(nct_id ,version_a ,version_b ,response.url ,response.status_code ,datetime.isoformat(timestamp) ,response.text ) ) def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time): """ Request a page, checking for http error codes, and handle the errors as requested. """ #sleep log10(counts of delays) time.sleep(math.log10(delay_time.value)) #request page r = make_request(nct_id, version_a, version_b) #check for if r.status_code == 200: upload_response(cursor,nct_id,version_a, version_b, r) elif r.status_code == 503: # write http code to http.responses upload_response(cursor, nct_id, version_a, version_b, r) # write incomplete to http.download_status write_incomplete(cursor,nct_id) # tell all other processes to slow down the request speed delay_time.value += 1 # Delay print("Recieved 503 on {}, increasing delay count to {}".format(nct_id, delay_tiome)) time.sleep(reset_time) else: #TODO: this should handle errors by # write http code to http.responses upload_response(cursor, nct_id, version_a, version_b, r) # write incomplete to http.download_status write_incomplete(cursor,nct_id) # raise exception #raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code)) # Delay time.sleep(reset_time) return r def write_incomplete(cursor, nct_id): """ Flags a trial as not having been fully downloaded. """ query = """ INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS) %s, 'Incomplete'::HTTP.HISTORY_DOWNLOAD_STATUS """ cursor.execute(query, [nct_id] ) def download_trial_records(nct_id, db_connection_specs, delay_time, reset_time): """ Manage the download of all records associated with a given trial. It uses a single connection and cursor for downloading the entire trial. The benefit of distributing the work at the trial level is that errors related to a trial can be handled at that level. This doesn't reserve a trial for download, but it does release the reservation. """ #for testing print(nct_id) # A new connection is created every time the function is called so that this # function can be run using a multiprocessing pool with db_connection_specs.new() as db_conn: with db_conn.cursor() as cursor: #upload the first two versions r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time) #extract last version v = get_highest_version_number(r) #download and upload the remaining versions if v == 2: return None elif v % 2 == 0: for version_a, version_b in step_generator(v): #download the history, handling any errors as they come up, and submitting it to the database. download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time) elif v % 2 == 1: #if there are an odd number of submissions treat at as even for version_a, version_b in step_generator(v): download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time) #now handle an odd number of versions by downloading the 1 vs (end) comparison. download_and_handle_errors(cursor, nct_id, 1, v, delay_time, reset_time) #now mark the trial as having been downloaded cursor.execute( """ INSERT INTO http.download_status (nct_id,status) VALUES (%s, 'Downloaded'::http.history_download_status) """ , [nct_id] ) class DBConnectionCreator(): """ Creates new database connections based on a specified set of parameters. This simplifies connection creation by allowing the programmer to pass around the preconfigured connection creator. """ def __init__(self,dbname, user, host, port, password): self.dbname = dbname self.user = user self.host = host self.port = port self.password=password def new(self): return psyco.connect( dbname=self.dbname ,user=self.user ,host=self.host ,port=self.port ,password=self.password ) def step_generator(max_version): """ Used to generate a list of versions to request The specific pattern generated is (3,4), (5,6), (7,8),...,(max_version-1,max_version) """ old=3 for i in range(4,max_version,2): yield (old,i) old = i + 1 def flag_trials_of_interest(db_connection): """ Mark the queries of interest as "of interest" INCOMPLETE """ query = """ INSERT INTO http.download_status (nct_id, status) SELECT nct_id, 'Of Interest'::http.history_download_status AS status FROM ctgov.studies WHERE is_fda_regulated_drug=TRUE AND study_type = 'Interventional' AND phase='Phase 3' AND overall_status in ('Terminated', 'Completed') AND start_date > '2008-01-01' AND completion_date < '2022-01-01' ; """ #TODO: actually send it to the database. def reserve_trials(db_connection, limit=10): """ Reserves a certain number of trials for processing in the DB. """ query = """ WITH OF_INTEREST AS (SELECT NCT_ID FROM HTTP.TRIALS_TO_DOWNLOAD LIMIT %s ) INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS) SELECT OF_INTEREST.NCT_ID, 'Reserved'::HTTP.HISTORY_DOWNLOAD_STATUS AS STATUS FROM OF_INTEREST RETURNING NCT_ID; """ with db_connection.cursor() as cursor: cursor.execute(query, [limit] ) nctids_list = cursor.fetchall() nctids_list = [ x[0] for x in nctids_list] return nctids_list if __name__ == "__main__": """ Main! """ #instantiate a database connnection creator dbc = DBConnectionCreator( dbname="aact_db" ,user="python_downloader" ,host="will-office" ,port=5432 ,password="download") #db connection with dbc.new() as con: #get list of nct_ids nctids = reserve_trials(con, 1500) print(nctids) reset_time = 10 delay_time = Value("I",1) #lambda that parameterizes the downloader, allowing it to be passed to the pool. def downloader(nct): download_trial_records(nct, dbc, delay_time, reset_time) #start analyzing them with Pool(processes=12) as process_pool: process_pool.map(downloader, nctids)