You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ClinicalTrialsDataProcessing/data_mgmt_scripts/drugtools/historical_nct_downloader.py

267 lines
8.9 KiB
Python

import requests
from datetime import datetime
from bs4 import BeautifulSoup
from multiprocess import Pool, Value
import math
import time
from drugtools.env_setup import postgres_conn, ENV
from tqdm import tqdm
############ GLOBALS
RESET_TIME = Value('I',int(ENV["TRIAL_DOWNLOAD_RESET_TIME"]))
DELAY_TIME = Value("I",int(ENV["TRIAL_DOWNLOAD_DELAY_TIME"]))
TRIAL_RESERVATION_LIMIT=int(ENV["TRIAL_RESERVATION_LIMIT"])
TRIAL_RESERVATION_BATCH_SIZE=int(ENV["TRIAL_RESERVATION_BATCH_SIZE"])
############ Functions
def get_highest_version_number(response):
"""
Navigate to the version table and and extract the highest posted version.
As there are cases where the last element in the table IS NOT a
a version entry, this function iterates from the last row entry to the first,
looking for cells with the correct header, indicating
that it contains version information.
The last one occuring in the unreversed list is what we need.
"""
soup = BeautifulSoup(response.text, features="lxml")
#get version table rows
try:
table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
except IndexError as ie:
raise ie
for row in reversed(table_rows):
# if it is <td headers="VersionNumber">xx</td> then it contains what we need.
for td in row.findChildren("td"):
if ("headers" in td.attrs) and (td.attrs["headers"][0]=="VersionNumber"):
#Note the use of [0] above. attribute elements are lists.
version_number = int(td.text)
return version_number
def make_request(nct_id,version1,version2):
"""
Request a page comparing two snapshots
"""
#create url
baseurl = "https://clinicaltrials.gov/ct2/history/{}?A={}&B={}&C=Side-by-Side"
url = baseurl.format(nct_id,version1,version2)
#make request
try:
time.sleep(0.02)
response = requests.get(url)
except requests.exceptions.ConnectionError as ce:
raise ce
#return the response
return response
def upload_response(db_cursor, nct_id, version_a, version_b, response):
"""
Upload a requested page (with versions) to the database.
"""
timestamp = datetime.strptime(response.headers['date'], "%a, %d %b %Y %H:%M:%S %Z")
#this uploads the response values.
db_cursor.execute("""
INSERT INTO http.responses
(nct_id,version_a,version_b,url,response_code,response_date, html)
VALUES (%s,%s,%s,%s,%s,%s,%s)
;
"""
,(nct_id
,version_a
,version_b
,response.url
,response.status_code
,datetime.isoformat(timestamp)
,response.text
)
)
def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time):
"""
Request a page, checking for http error codes, and handle the errors as requested.
"""
#sleep log10(counts of delays)
time.sleep(math.log10(delay_time.value))
#request page
r = make_request(nct_id, version_a, version_b)
#check for
if r.status_code == 200:
upload_response(cursor,nct_id,version_a, version_b, r)
elif r.status_code == 404:
upload_response(cursor, nct_id, version_a, version_b, r)
write_incomplete(cursor,nct_id)
return None
elif r.status_code == 503:
# write http code to http.responses
upload_response(cursor, nct_id, version_a, version_b, r)
# write incomplete to http.download_status
write_incomplete(cursor,nct_id)
# tell all other processes to slow down the request speed
# Delay
with delay_time.get_lock():
delay_time.value += 1
time.sleep(reset_time.value)
print("Recieved 503 on {}, increasing delay count to {}".format(
nct_id,
delay_time)
)
else:
#TODO: this should handle errors by
# write http code to http.responses
upload_response(cursor, nct_id, version_a, version_b, r)
# write incomplete to http.download_status
write_incomplete(cursor,nct_id)
# raise exception
#raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code))
print("Recieved {} on {}, increasing delay count to {}".format(
r.status_code,
nct_id,
delay_time))
# Delay
with reset_time.get_lock():
time.sleep(reset_time.value)
return r
def write_incomplete(cursor, nct_id):
"""
Flags a trial as not having been fully downloaded.
"""
query = """
INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS) VALUES
(%s, 'Incomplete'::HTTP.HISTORY_DOWNLOAD_STATUS);
"""
cursor.execute(query, [nct_id] )
def download_trial_records(nct_id, delay_time, reset_time):
"""
Manage the download of all records associated with a given trial.
It uses a single connection and cursor for downloading the entire trial.
The benefit of distributing the work at the trial level is that errors related
to a trial can be handled at that level.
This doesn't reserve a trial for download, but it does release the reservation.
"""
# A new connection is created every time the function is called so that this
# function can be run using a multiprocessing pool
with postgres_conn() as db_conn:
with db_conn.cursor() as cursor:
#upload the first two versions
r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time)
#extract last version
if r is None:
return None
try:
v = get_highest_version_number(r)
except IndexError as ie:
raise RuntimeError(ie.__str__() + " | nct_id {}".format(nct_id))
#download and upload the remaining versions
if v == 2:
return None
elif v % 2 == 0:
for version_a, version_b in step_generator(v):
#download the history, handling any errors as they come up, and submitting it to the database.
download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time)
elif v % 2 == 1:
#if there are an odd number of submissions treat at as even
for version_a, version_b in step_generator(v):
download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time, reset_time)
#now handle an odd number of versions by downloading the 1 vs (end) comparison.
download_and_handle_errors(cursor, nct_id, 1, v, delay_time, reset_time)
#now mark the trial as having been downloaded
cursor.execute(
"""
INSERT INTO http.download_status (nct_id,status)
VALUES (%s, 'Downloaded'::http.history_download_status)
"""
, [nct_id]
)
def step_generator(max_version):
"""
Used to generate a list of versions to request
The specific pattern generated is
(3,4), (5,6), (7,8),...,(max_version-1,max_version)
"""
old=3
for i in range(4,max_version,2):
yield (old,i)
old = i + 1
def reserve_trials(db_connection, limit=10):
"""
Reserves a certain number of trials for processing in the DB.
"""
query = """
WITH OF_INTEREST AS
(SELECT NCT_ID
FROM HTTP.TRIALS_TO_DOWNLOAD
LIMIT %s
)
INSERT INTO HTTP.DOWNLOAD_STATUS (NCT_ID,STATUS)
SELECT OF_INTEREST.NCT_ID, 'Reserved'::HTTP.HISTORY_DOWNLOAD_STATUS AS STATUS
FROM OF_INTEREST
RETURNING NCT_ID;
"""
with db_connection.cursor() as cursor:
cursor.execute(query, [limit] )
nctids_list = cursor.fetchall()
nctids_list = [ x[0] for x in nctids_list]
return nctids_list
def chunker(seq, size):
return [seq[pos:pos + size] for pos in range(0, len(seq), size)]
def reserve_and_download_versions(limit):
#lambda that parameterizes the downloader, allowing it to be passed to the pool.
def downloader(nct):
download_trial_records(nct, DELAY_TIME, RESET_TIME)
#db connection
with postgres_conn() as con:
itt = 0
while (nctids := reserve_trials(con,TRIAL_RESERVATION_BATCH_SIZE)) and \
itt < TRIAL_RESERVATION_LIMIT:
print(nctids)
with Pool(processes=12) as process_pool:
l = len(nctids)
itt += l
with tqdm(total=l) as prog_bar:
for _ in process_pool.imap_unordered(downloader, nctids):
prog_bar.update()
con.commit()
def run():
reserve_and_download_versions(TRIAL_RESERVATION_LIMIT)
if __name__ == "__main__":
"""
Main!
"""
run()
#db connection