from tkinter import W import requests import psycopg2 as psyco from datetime import datetime from bs4 import BeautifulSoup from multiprocessing import Pool def get_highest_version_number(response): """ Extract the highest version currently available from the version number. """ #navigate to a specific part of the returned html and extract the highest posted version. soup = BeautifulSoup(response.text, features="lxml") version_value = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")[-1].td.text return int(version_value) def make_request(nct_id,version1,version2): """ Request a page comparing two snapshots """ #create url baseurl = "https://clinicaltrials.gov/ct2/history/{}?A={}&B={}&C=Side-by-Side" url = baseurl.format(nct_id,version1,version2) #make request response = requests.get(url) #return the response return response def upload_response(db_cursor, nct_id, version_a, version_b, response): """ Upload a requested page (with versions) to the database. """ timestamp = datetime.strptime(response.headers['date'], "%a, %d %b %Y %H:%M:%S %Z") #this uploads the response values. db_cursor.execute(""" INSERT INTO http.responses (nct_id,version_a,version_b,url,response_code,response_date, html) VALUES (%s,%s,%s,%s,%s,%s,%s) ; """ ,(nct_id ,version_a ,version_b ,response.url ,response.status_code ,datetime.isoformat(timestamp) ,response.text ) ) def download_and_handle_errors(cursor, nct_id, version_a, version_b): """ Request a page, checking for http error codes, and handle the errors as requested. """ #request page r = make_request(nct_id, version_a, version_b) #check for if r.status_code == 200: upload_response(cursor,nct_id,version_a, version_b, r) else: #TODO: this should handle errors by # - [ ] write http code to http.responses # - [ ] write incomplete to http.download_status # - [ ] tell all other processes to slow down the request speed # - [x] raise exception raise Exception("Download of {} (versions {},{}) returned http code != 200".format(nct_id,version_a,version_b)) return r def download_trial_records(nct_id, db_connection_specs): """ Manage the download of all records associated with a given trial. It uses a single connection and cursor for downloading the entire trial. The benefit of distributing the work at the trial level is that errors related to a trial can be handled at that level. This doesn't reserve a trial for download, but it does release the reservation. """ #for testing print(nct_id) # A new connection is created every time the function is called so that this # function can be run using a multiprocessing pool with db_connection_specs.new() as db_conn: with db_conn.cursor() as cursor: #upload the first two versions r = download_and_handle_errors(cursor, nct_id, 1, 2) #extract last version v = get_highest_version_number(r) #download and upload the remaining versions if v == 2: return None elif v % 2 == 0: for version_a, version_b in step_generator(v): #download the history, handling any errors as they come up, and submitting it to the database. download_and_handle_errors(cursor, nct_id, version_a, version_b) elif v % 2 == 1: #if there are an odd number of submissions treat at as even for version_a, version_b in step_generator(v): download_and_handle_errors(cursor, nct_id, version_a, version_b) #now handle an odd number of versions by downloading the 1 vs (end) comparison. download_and_handle_errors(cursor, nct_id, 1, v) #now mark the trial as having been downloaded cursor.execute( """ INSERT INTO http.download_status (nct_id,status) VALUES (%s, 'Downloaded'::http.history_download_status) """ , [nct_id] ) class DBConnectionCreator(): """ Creates new database connections based on a specified set of parameters. This simplifies connection creation by allowing the programmer to pass around the preconfigured connection creator. """ def __init__(self,dbname, user, host, port, password): self.dbname = dbname self.user = user self.host = host self.port = port self.password=password def new(self): return psyco.connect( dbname=self.dbname ,user=self.user ,host=self.host ,port=self.port ,password=self.password ) def step_generator(max_version): """ Used to generate a list of versions to request The specific pattern generated is (3,4), (5,6), (7,8),...,(max_version-1,max_version) """ old=3 for i in range(4,max_version,2): yield (old,i) old = i + 1 def flag_trials_of_interest(db_connection): """ Mark the queries of interest as "of interest" INCOMPLETE """ query = """ INSERT INTO http.download_status (nct_id, status) SELECT nct_id, 'Of Interest'::http.history_download_status AS status FROM ctgov.studies WHERE is_fda_regulated_drug=TRUE AND study_type = 'Interventional' AND phase='Phase 3' AND overall_status in ('Terminated', 'Completed') AND start_date > '2008-01-01' AND completion_date < '2022-01-01' ; """ #TODO: actually send it to the database. def reserve_trials(db_connection, limit=10): """ Reserves a certain number of trials for processing in the DB. """ query = """ WITH OF_INTEREST AS (SELECT NCT_ID FROM HTTP.MOST_RECENT_DOWNLOAD_STATUS WHERE HTTP.MOST_RECENT_DOWNLOAD_STATUS.STATUS = 'Of Interest'::HTTP.HISTORY_DOWNLOAD_STATUS LIMIT %s ) INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS) SELECT OF_INTEREST.NCT_ID, 'Reserved'::HTTP.HISTORY_DOWNLOAD_STATUS AS STATUS FROM OF_INTEREST RETURNING NCT_ID; """ with db_connection.cursor() as cursor: cursor.execute(query, [limit] ) nctids_list = cursor.fetchall() nctids_list = [ x[0] for x in nctids_list] return nctids_list if __name__ == "__main__": """ Main! """ #instantiate a database connnection creator dbc = DBConnectionCreator( dbname="aact_db" ,user="python_downloader" ,host="localhost" ,port=5432 ,password="download") #lambda that parameterizes the downloader, allowing it to be passed to the pool. def downloader(nct): download_trial_records(nct, dbc) #db connection with dbc.new() as con: #select lists to download #flag_trials_of_interest(con) #SHould only be run once #get list of nct_ids nctids = reserve_trials(con, 4) print(nctids) #start analyzing them with Pool(processes=3) as process_pool: process_pool.map(downloader, nctids)