storing local details

llm-extraction
youainti 3 years ago
parent e88f450b8c
commit 2a9b8349ba

@ -5,11 +5,13 @@ from multiprocess import Pool, Value
import math
import time
from drugtools.env_setup import postgres_conn, ENV
from tqdm import tqdm
############ GLOBALS
RESET_TIME = Value('I',int(ENV["TRIAL_DOWNLOAD_RESET_TIME"]))
DELAY_TIME = Value("I",int(ENV["TRIAL_DOWNLOAD_DELAY_TIME"]))
TRIAL_RESERVATION_LIMIT=int(ENV["TRIAL_RESERVATION_LIMIT"])
TRIAL_RESERVATION_BATCH_SIZE=int(ENV["TRIAL_RESERVATION_BATCH_SIZE"])
############ Functions
def get_highest_version_number(response):
@ -25,7 +27,10 @@ def get_highest_version_number(response):
soup = BeautifulSoup(response.text, features="lxml")
#get version table rows
table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
try:
table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
except IndexError as ie:
raise ie
for row in reversed(table_rows):
# if it is <td headers="VersionNumber">xx</td> then it contains what we need.
@ -44,7 +49,11 @@ def make_request(nct_id,version1,version2):
url = baseurl.format(nct_id,version1,version2)
#make request
response = requests.get(url)
try:
time.sleep(0.02)
response = requests.get(url)
except requests.exceptions.ConnectionError as ce:
raise ce
#return the response
return response
@ -85,16 +94,24 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time,
#check for
if r.status_code == 200:
upload_response(cursor,nct_id,version_a, version_b, r)
elif r.status_code == 404:
upload_response(cursor, nct_id, version_a, version_b, r)
write_incomplete(cursor,nct_id)
return None
elif r.status_code == 503:
# write http code to http.responses
upload_response(cursor, nct_id, version_a, version_b, r)
# write incomplete to http.download_status
write_incomplete(cursor,nct_id)
# tell all other processes to slow down the request speed
delay_time.value += 1
# Delay
print("Recieved 503 on {}, increasing delay count to {}".format(nct_id, delay_tiome))
time.sleep(reset_time)
with delay_time.get_lock():
delay_time.value += 1
time.sleep(reset_time.value)
print("Recieved 503 on {}, increasing delay count to {}".format(
nct_id,
delay_time)
)
else:
#TODO: this should handle errors by
# write http code to http.responses
@ -104,8 +121,13 @@ def download_and_handle_errors(cursor, nct_id, version_a, version_b, delay_time,
# raise exception
#raise Exception("Download of {} (versions {},{}) returned http code {}".format(nct_id,version_a,version_b, r.status_code))
print("Recieved {} on {}, increasing delay count to {}".format(
r.status_code,
nct_id,
delay_time))
# Delay
time.sleep(reset_time)
with reset_time.get_lock():
time.sleep(reset_time.value)
return r
def write_incomplete(cursor, nct_id):
@ -128,9 +150,6 @@ def download_trial_records(nct_id, delay_time, reset_time):
This doesn't reserve a trial for download, but it does release the reservation.
"""
#for testing
print(nct_id)
# A new connection is created every time the function is called so that this
# function can be run using a multiprocessing pool
@ -140,8 +159,14 @@ def download_trial_records(nct_id, delay_time, reset_time):
#upload the first two versions
r = download_and_handle_errors(cursor, nct_id, 1, 2, delay_time, reset_time)
#extract last version
v = get_highest_version_number(r)
if r is None:
return None
try:
v = get_highest_version_number(r)
except IndexError as ie:
raise RuntimeError(ie.__str__() + " | nct_id {}".format(nct_id))
#download and upload the remaining versions
if v == 2:
@ -205,23 +230,29 @@ def reserve_trials(db_connection, limit=10):
return nctids_list
def chunker(seq, size):
return [seq[pos:pos + size] for pos in range(0, len(seq), size)]
def reserve_and_download_versions(limit):
#db connection
with postgres_conn() as con:
#get list of nct_ids
nctids = reserve_trials(con, limit)
print("reserving_trials: ", nctids)
#lambda that parameterizes the downloader, allowing it to be passed to the pool.
def downloader(nct):
download_trial_records(nct, DELAY_TIME, RESET_TIME)
#start analyzing them
with Pool(processes=12) as process_pool:
process_pool.map(downloader, nctids)
#db connection
with postgres_conn() as con:
itt = 0
while (nctids := reserve_trials(con,TRIAL_RESERVATION_BATCH_SIZE)) and \
itt < TRIAL_RESERVATION_LIMIT:
print(nctids)
with Pool(processes=12) as process_pool:
l = len(nctids)
itt += l
with tqdm(total=l) as prog_bar:
for _ in process_pool.imap_unordered(downloader, nctids):
prog_bar.update()
con.commit()
def run():
@ -231,4 +262,5 @@ if __name__ == "__main__":
"""
Main!
"""
run()
run()
#db connection

@ -11,9 +11,13 @@ print(env_setup.ENV)
cont = input("Are you willing to continue with the current environmnet? y/[n]")
if cont == "Y" or cont == "y":
hts.run()
hnd.run()
print("SelectingTrials")
#hts.run()
print("downloading trials")
#hnd.run()
print("extracting trials")
hne.run()
exit(0)
daen.run()
mm2p.run()
else:

Loading…
Cancel
Save