You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
468 lines
16 KiB
Python
468 lines
16 KiB
Python
from collections import namedtuple
|
|
from copy import copy
|
|
from datetime import datetime
|
|
from bs4 import BeautifulSoup
|
|
from drugtools.env_setup import ENV,postgres_conn
|
|
from tqdm import tqdm
|
|
#requires Python 3.10
|
|
|
|
#### GLOBALS
|
|
VERBOSE = True if ENV["VERBOSE"] == "True" else False
|
|
|
|
###CLASSES AND CONSTRUCTORS
|
|
|
|
TagDatePair = namedtuple("TagDatePair", ["tag","date"])
|
|
TagTextPair = namedtuple("TagTextPair", ["tag","text"])
|
|
|
|
#superclasses
|
|
class VersionData():
|
|
"""
|
|
This class holds two types of data:
|
|
- Data with a 1-to-1 relationship with the trial/version pair.
|
|
- Data with a child relationship with the trial/version pair.
|
|
|
|
This initializes with None attributes, and implements setter
|
|
methods to load them (just to double check types)
|
|
That way I can just pass around the VersionData instance
|
|
and add data as I go.
|
|
|
|
It will also implement the ability to load the data to the database
|
|
"""
|
|
def __init__(self,nct_id,version_id,submission_date):
|
|
#identifiers
|
|
self.nct_id = nct_id.strip()
|
|
self.version_id = version_id
|
|
self.submission_date = submission_date
|
|
|
|
#Study Status
|
|
self._primary_completion_date = None
|
|
self._primary_completion_date_category = None
|
|
self._start_date = None
|
|
self._start_date_category = None
|
|
self._completion_date = None
|
|
self._completion_date_category = None
|
|
self._overall_status = None
|
|
self._enrollment = None
|
|
self._enrollment_category = None
|
|
self._sponsor = None
|
|
#self._sponsor_category = None #I don't believe this is included in the raw data
|
|
self._responsible_party = None
|
|
#self._responsible_party_category = None #I don't believe this is included in the raw data
|
|
#self._collaborators = None #currently going to ignore as I've not fount it in AACT
|
|
|
|
def load_to_db(self,db_connection):
|
|
#load to initial table, then load any extra details into other tables
|
|
sql = """
|
|
INSERT INTO history.trial_snapshots
|
|
(
|
|
nct_id,
|
|
version,
|
|
submission_date,
|
|
primary_completion_date,
|
|
primary_completion_date_category,
|
|
start_date,
|
|
start_date_category,
|
|
completion_date,
|
|
completion_date_category,
|
|
overall_status,
|
|
enrollment,
|
|
enrollment_category,
|
|
sponsor,
|
|
responsible_party
|
|
)
|
|
VALUES
|
|
(
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s,
|
|
%s
|
|
)
|
|
"""
|
|
|
|
with db_connection.cursor() as db_cursor:
|
|
try:
|
|
db_cursor.execute(
|
|
sql,
|
|
(
|
|
self.nct_id,
|
|
self.version_id,
|
|
self.submission_date,
|
|
self._primary_completion_date,
|
|
self._primary_completion_date_category,
|
|
self._start_date,
|
|
self._start_date_category,
|
|
self._completion_date,
|
|
self._completion_date_category,
|
|
self._overall_status,
|
|
self._enrollment,
|
|
self._enrollment_category,
|
|
self._sponsor,
|
|
self._responsible_party
|
|
)
|
|
)
|
|
except Exception as err:
|
|
#catch any error, print the applicable information, and raise the error.
|
|
print(self)
|
|
raise err
|
|
|
|
db_connection.commit()
|
|
|
|
############ Functions
|
|
def extract_submission_dates(soup):
|
|
"""
|
|
Extract dates for each version
|
|
"""
|
|
table_rows = soup.findChildren("fieldset")[0].table.tbody.findChildren("tr")
|
|
|
|
version_date_dict = {}
|
|
|
|
for row in reversed(table_rows):
|
|
# if it is <td headers="VersionNumber">xx</td> then it contains what we need.
|
|
for td in row.findChildren("td"):
|
|
if ("headers" in td.attrs):
|
|
if (td.attrs["headers"][0]=="VersionNumber"):
|
|
version_number = int(td.text)
|
|
elif (td.attrs["headers"][0]=="VersionDate"):
|
|
version_date = td.text
|
|
version_date_dict[version_number] = datetime.strptime(version_date , "%B %d, %Y")
|
|
|
|
print(version_date_dict)
|
|
return version_date_dict
|
|
|
|
def optional_strip(possible_string):
|
|
if type(possible_string) == str:
|
|
return possible_string.strip()
|
|
else:
|
|
return possible_string
|
|
|
|
def extract_study_statuses(study_status_form, version_a,version_b):
|
|
"""
|
|
This extracts data from a study_status form and returns one or two
|
|
StudyStatusData objects,
|
|
|
|
"""
|
|
|
|
#get rows
|
|
rows = study_status_form.table.tbody.find_all("tr")
|
|
#iterate through rows,
|
|
for trow in rows:
|
|
#matching on rowLabels
|
|
|
|
match tr_to_td(trow):
|
|
case ["Primary Completion:" as row_label, old,new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
|
|
tagdate1 = extract_date_and_tag(old.text)
|
|
version_a._primary_completion_date = tagdate1.date
|
|
version_a._primary_completion_date_category = optional_strip(tagdate1.tag)
|
|
|
|
tagdate2 = extract_date_and_tag(new.text)
|
|
version_b._primary_completion_date = tagdate2.date
|
|
version_b._primary_completion_date_category = optional_strip(tagdate2.tag)
|
|
|
|
case ["Study Start:" as row_label, old, new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
tagdate1 = extract_date_and_tag(old.text)
|
|
version_a._start_date = tagdate1.date
|
|
version_a._start_date_category = optional_strip(tagdate1.tag)
|
|
|
|
tagdate2 = extract_date_and_tag(new.text)
|
|
version_b._start_date = tagdate2.date
|
|
version_b._start_date_category = optional_strip(tagdate2.tag)
|
|
|
|
case ["Study Completion:" as row_label, old,new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
tagdate1 = extract_date_and_tag(old.text)
|
|
version_a._completion_date = tagdate1.date
|
|
version_a._completion_date_category = optional_strip(tagdate1.tag)
|
|
tagdate2 = extract_date_and_tag(new.text)
|
|
version_b._completion_date = tagdate2.date
|
|
version_b._completion_date_category = optional_strip(tagdate2.tag)
|
|
|
|
case ["Overall Status:" as row_label, old,new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
#split out any notes such as "Suspended [reason for suspenstion ]"
|
|
version_a._overall_status = optional_strip(old.text.split("[")[0])
|
|
#split out any notes such as "Suspended [reason for suspenstion ]"
|
|
version_b._overall_status = optional_strip(new.text.split("[")[0])
|
|
#FIX: There is an issue with NCT00789633 where the overall status includes information as to why it was suspended.
|
|
|
|
case _ as row_label:
|
|
print("row not matched: {}".format(row_label)) if VERBOSE else ""
|
|
|
|
|
|
def extract_study_design(study_status_form, version_a,version_b):
|
|
"""
|
|
This extracts data from a study_status form and returns one or two
|
|
StudyStatusData objects,
|
|
|
|
"""
|
|
#get rows
|
|
rows = study_status_form.table.tbody.find_all("tr")
|
|
#iterate through rows,
|
|
for trow in rows:
|
|
#matching on rowLabels
|
|
match tr_to_td(trow):
|
|
case ["Enrollment:" as row_label, old, new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
|
|
#Extract tag and text, add them to preallocated object
|
|
tagtext1 = extract_text_and_tag(old.text)
|
|
version_a._enrollment = tagtext1.text
|
|
version_a._enrollment_category = optional_strip(tagtext1.tag)
|
|
|
|
tagtext2 = extract_text_and_tag(new.text)
|
|
version_b._enrollment = tagtext2.text
|
|
version_b._enrollment_category = optional_strip(tagtext2.tag)
|
|
|
|
case _ as row_label:
|
|
print("row not matched: {}".format(row_label)) if VERBOSE else ""
|
|
|
|
def extract_sponsor_data(study_status_form, version_a,version_b):
|
|
"""
|
|
This extracts data from a study_status form and returns one or two
|
|
StudyStatusData objects,
|
|
|
|
"""
|
|
#get rows
|
|
rows = study_status_form.table.tbody.find_all("tr")
|
|
#iterate through rows,
|
|
for trow in rows:
|
|
#matching on rowLabels
|
|
match tr_to_td(trow):
|
|
case ["Sponsor:" as row_label, old, new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
version_a._sponsor = optional_strip(old.text)
|
|
version_b._sponsor = optional_strip(new.text)
|
|
|
|
case ["Responsible Party:" as row_label, old, new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
version_a._responsible_party = optional_strip(old.text)
|
|
version_b._responsible_party = optional_strip(new.text)
|
|
|
|
case ["Collaborators:" as row_label, old, new]:
|
|
print("row matched: {}".format(row_label)) if VERBOSE else ""
|
|
#TODO: find a trial with multiple collaborators and figure out how to identify/count them:w
|
|
# So far can't figure out where this is in AACT, so I'm going to ignore it.
|
|
pass
|
|
|
|
case _ as row_label:
|
|
print("row not matched: {}".format(row_label)) if VERBOSE else ""
|
|
|
|
|
|
def split_by_version(tag):
|
|
'''
|
|
OUTDATED: With the new format that separates old and new versions, I don't technically need this. It is a nice place to identify exact changes if those are every needed though and it removes the highlights cleanly.
|
|
'''
|
|
#clone elements and remove sub-tags that are not needed.
|
|
old = copy(tag)
|
|
for span in old.find_all(class_="add_hilite"):
|
|
span.extract()
|
|
|
|
new = copy(tag)
|
|
for span in new.find_all(class_="drop_hilite"):
|
|
span.extract()
|
|
return old,new
|
|
|
|
|
|
def extract_date_and_tag(text):
|
|
"""
|
|
Extracts a datetype according to the date format
|
|
and the estimate tag based on
|
|
|
|
"""
|
|
|
|
text = text.strip()
|
|
|
|
#handle various empty cases
|
|
if not text or text == '':
|
|
return TagDatePair(None, None)
|
|
|
|
date_split = text.split("[")
|
|
if len(date_split) > 1:
|
|
estimate_tag = date_split[1].split("]")[0].strip()
|
|
else:
|
|
estimate_tag = None
|
|
|
|
try:
|
|
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_YYYY)
|
|
except ValueError as ve:
|
|
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_DD_YYYY)
|
|
|
|
return TagDatePair(estimate_tag, date_object)
|
|
|
|
|
|
def extract_text_and_tag(text):
|
|
"""
|
|
Extracts a datetype according to the date format
|
|
and the estimate tag based on
|
|
|
|
"""
|
|
text = text.strip()
|
|
|
|
#handle various empty cases
|
|
if not text or text == '':
|
|
return TagTextPair(None, None)
|
|
|
|
date_split = text.split("[")
|
|
if len(date_split) > 1:
|
|
estimate_tag = date_split[1].split("]")[0].strip()
|
|
else:
|
|
estimate_tag = None
|
|
text_object = date_split[0].strip()
|
|
|
|
return TagTextPair(estimate_tag, text_object)
|
|
|
|
### FUNCTIONS
|
|
|
|
def tr_to_td(tr) -> tuple[str, str, str]:
|
|
"""
|
|
Takes an html data row of interest, extracts the record_name from the first <td>, and the data from the second <td>.
|
|
|
|
For the data, it just extracts the text.
|
|
The text itself then needs processed separately, based on what it should contain.
|
|
"""
|
|
#get list of cells
|
|
td_list = tr.find_all("td")
|
|
if len(td_list) == 3:
|
|
return td_list[0].text, td_list[1], td_list[2]
|
|
else:
|
|
return None, None, None
|
|
|
|
def get_forms(soup,version_a,version_b):
|
|
|
|
#extract all forms
|
|
for form in soup.body.find_all("form"):
|
|
#Match forms against ID types
|
|
if not "id" in form.attrs:
|
|
continue
|
|
|
|
#for each type of form (identified by the ID field)
|
|
# extract and add the data to the preallocated objects
|
|
match form.attrs["id"]:
|
|
case "form_StudyStatus":
|
|
extract_study_statuses(form,version_a,version_b)
|
|
case "form_SponsorCollaborators":
|
|
extract_sponsor_data(form, version_a, version_b)
|
|
case "form_Oversight":
|
|
pass
|
|
case "form_StudyDescription":
|
|
pass
|
|
case "form_Conditions":
|
|
pass
|
|
case "form_StudyDesign":
|
|
extract_study_design(form,version_a,version_b)
|
|
case "form_ArmsandInterventions":
|
|
pass
|
|
case "form_ProtocolOutcomeMeasures":
|
|
pass
|
|
case "form_Eligibility":
|
|
pass
|
|
case "form_ContactsLocations":
|
|
pass
|
|
case "form_IPDSharing":
|
|
pass
|
|
case "form_References":
|
|
pass
|
|
case "form_ParticipantFlow":
|
|
pass
|
|
case "form_BaselineCharacteristics":
|
|
pass
|
|
case "form_ROutcomeMeasures":
|
|
pass
|
|
case "form_AdverseEvents":
|
|
pass
|
|
case "form_LimitationsandCaveats":
|
|
pass
|
|
case "form_MoreInformation":
|
|
pass
|
|
case _ as form_name:
|
|
print("form not matched: {}".format(form_name)) if VERBOSE else ""
|
|
|
|
|
|
### CONSTANTS
|
|
date_MMMM_YYYY = "%B %Y"
|
|
date_MMMM_DD_YYYY = "%B %d, %Y"
|
|
|
|
def get_data_from_versions(nct_id,html, version_a_int, version_b_int):
|
|
soup = BeautifulSoup(html,"lxml")
|
|
print(getting_data_from_versions)
|
|
|
|
version_date_dict = extract_submission_dates(soup)
|
|
print(version_date_dict)
|
|
|
|
#preallocate version data
|
|
version_a = VersionData(nct_id, version_a_int, version_date_dict[version_a_int])
|
|
version_b = VersionData(nct_id, version_b_int, version_date_dict[version_b_int])
|
|
|
|
#extract data from html and put it in the preallocated objects
|
|
get_forms(soup, version_a, version_b)
|
|
|
|
return version_a,version_b
|
|
|
|
|
|
|
|
def run():
|
|
with postgres_conn() as db_connection:
|
|
#pull the requests from the db
|
|
with db_connection.cursor() as curse:
|
|
sql = """
|
|
SELECT nct_id, version_a,version_b, html
|
|
FROM http.responses
|
|
WHERE response_code = 200
|
|
"""
|
|
curse.execute(sql)
|
|
for response in tqdm(curse.fetchall()):
|
|
nct_id, version_a, version_b, html = response
|
|
print(nct_id)
|
|
|
|
print(nct_id, version_a, version_b) if VERBOSE else ""
|
|
|
|
version1, version2 = get_data_from_versions(nct_id, html, version_a, version_b)
|
|
|
|
if version_b == version_a + 1:
|
|
version1.load_to_db(db_connection)
|
|
version2.load_to_db(db_connection)
|
|
else:
|
|
version2.load_to_db(db_connection)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|
|
|
|
"""
|
|
Documentation:
|
|
|
|
TO add a new field to extraction-lib
|
|
|
|
1. Locate the field in the HTML
|
|
- form id (e.g. <form id="form_StudyStatus> gives the form id "form_StudyStatus)
|
|
- Table row's data label. This corresponds to the text of first column in the row and will
|
|
look something like
|
|
<td class="rowLabel" style="min-width: 210px;">Record Verification:</td>.
|
|
"Record Verification:" is the data label in the example above.
|
|
2. Identify what data you will be extracting
|
|
- type (date, text, int, etc)
|
|
- if it contains a category ([Actual] vs [Anticipated] etc)
|
|
3. Add data to:
|
|
- sql table: history.trial_snapshots
|
|
- the VersionData class
|
|
- the VersionData.load_to_db() function
|
|
4. Ensure the field matcher in `get_forms(*)` is matching on the form ID and has a function processing the form
|
|
5. Ensure the function processing the form has a match entry to proceess the row
|
|
- This should match on data label and then process the data by
|
|
- splitting into old and new versions
|
|
- Extracting the data for both old and new
|
|
- add the data to the passed VersionData objects
|
|
"""
|