You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ClinicalTrialsDataProcessing/Parser/extraction-lib.py

293 lines
9.4 KiB
Python

from collections import namedtuple
from copy import copy
from datetime import datetime
from bs4 import BeautifulSoup
import textprocessing as tp #cuz tp is important
#requires Python 3.10
###CLASSES AND CONSTRUCTORS
TagDatePair = namedtuple("TagDatePair", ["tag","date"])
TagTextPair = namedtuple("TagTextPair", ["tag","text"])
#superclasses
class VersionData():
"""
This class holds two types of data:
- Data with a 1-to-1 relationship with the trial/version pair.
- Data with a child relationship with the trial/version pair.
This initializes with None attributes, and implements setter
methods to load them (just to double check types)
That way I can just pass around the VersionData instance
and add data as I go.
It will also implement the ability to load the data to the database
"""
def __init__(self,nct_id,version_id):
#identifiers
self.nct_id = nct_id
self.version_id = version_id
#Study Status
self._primary_completion_date = None
self._primary_completion_date_category = None
self._completion_date = None
self._completion_date_category = None
self._overall_status = None
#Study Design
self._enrollment = None
self._enrollment_category = None
#Sponsors and Collaborators
self._sponsor = None
#self._sponsor_category = None #I don't believe this is included in the raw data
self._responsible_party = None
#self._responsible_party_category = None #I don't believe this is included in the raw data
#self._collaborators = None #currently going to ignore as I've not fount it in AACT
def load_to_db(db_cursor):
#load to initial table, then load any extra details into other tables
pass
def extract_study_statuses(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Primary Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text)
version_a._primary_completion_date = tagdate1.date
version_a._primary_completion_date_category = tagdate1.tag
tagdate2 = extract_date_and_tag(new.text)
version_b._primary_completion_date = tagdate2.date
version_b._primary_completion_date_category = tagdate2.tag
case ["Study Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text)
version_a._completion_date = tagdate1.date
version_a._completion_date_category = tagdate1.tag
tagdate2 = extract_date_and_tag(new.text)
version_b._completion_date = tagdate2.date
version_b._completion_date_category = tagdate2.tag
case ["Overall Status:" as row_label, tag]:
old,new = split_by_version(tag)
version_a._overall_status = old.text
version_b._overall_status = new.text
def extract_study_design(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
match tr_to_td(trow):
case ["Enrollment:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_text_and_tag(old.text)
version_a._enrollment = tagdate1.text
version_a._enrollment_category = tagdate1.tag
tagdate2 = extract_text_and_tag(new.text)
version_b._enrollment = tagdate2.text
version_b._enrollment_category = tagdate2.tag
def extract_sponsor_data(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
match tr_to_td(trow):
case ["Sponsor:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._sponsor = old.text
version_b._sponsor = new.text
case ["Responsible Party:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._responsible_party = old.text
version_b._responsible_party = new.text
case ["Collaborators:" as row_label, tag]:
#old, new = split_by_version(tag)
#TODO: find a trial with multiple collaborators and figure out how to identify/count them:w
# So far can't figure out where this is in AACT, so I'm going to ignore it.
pass
def split_by_version(tag):
#clone elements and remove sub-tags that are not needed.
old = copy(tag)
for span in old.find_all(class_="add_hilite"):
span.extract()
new = copy(tag)
for span in new.find_all(class_="drop_hilite"):
span.extract()
return old,new
def extract_date_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
try:
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_YYYY)
except ValueError as ve:
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_DD_YYYY)
return TagDatePair(estimate_tag, date_object)
def extract_text_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
text_object = date_split[0].strip()
return TagTextPair(estimate_tag, text_object)
### FUNCTIONS
def tr_to_td(tr) -> tuple[str, str]:
"""
Takes an html data row of interest, extracts the record_name from the first <td>, and the data from the second <td>.
For the data, it just extracts the text.
The text itself then needs processed separately, based on what it should contain.
"""
#get list of cells
td_list = tr.find_all("td")
if len(td_list) == 2:
return td_list[0].text, td_list[1]
else:
return None, None
def get_forms(soup,version_a,version_b):
#extract all forms
for form in soup.body.find_all("form"):
#Match forms against ID types
if not "id" in form.attrs:
continue
match form.attrs["id"]:
case "form_StudyStatus":
extract_study_statuses(form,version_a,version_b)
case "form_SponsorCollaborators":
extract_sponsor_data(form, version_a, version_b)
case "form_Oversight":
pass
case "form_StudyDescription":
pass
case "form_Conditions":
pass
case "form_StudyDesign":
extract_study_design(form,version_a,version_b)
case "form_ArmsandInterventions":
pass
case "form_ProtocolOutcomeMeasures":
pass
case "form_Eligibility":
pass
case "form_ContactsLocations":
pass
case "form_IPDSharing":
pass
case "form_References":
pass
case "form_ParticipantFlow":
pass
case "form_BaselineCharacteristics":
pass
case "form_ROutcomeMeasures":
pass
case "form_AdverseEvents":
pass
case "form_LimitationsandCaveats":
pass
case "form_MoreInformation":
pass
case _:
print(form.attrs["id"])
### CONSTANTS
date_MMMM_YYYY = "%B %Y"
date_MMMM_DD_YYYY = "%B %d, %Y"
def get_data_from_versions(nct_id,html, version_a_int, version_b_int):
soup = BeautifulSoup(html,"lxml")
version_a = VersionData(nct_id, version_a_int)
version_b = VersionData(nct_id, version_b_int)
get_forms(soup, version_a, version_b)
return version_a,version_b
if __name__ == "__main__":
for file in ["./NCT00658567.html", "./NCT01303796.html"]:
with open(file) as fh:
version1, version2 = get_data_from_versions(file, fh.read(), 1,2)
print(version1.__dict__) #order messed up somewhere:w
print(version2.__dict__) #order messed up somewhere:w