You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ClinicalTrialsDataProcessing/Parser/extraction-lib.py

288 lines
9.4 KiB
Python

from collections import namedtuple
from copy import copy
from datetime import datetime
from ensurepip import version
from bs4 import BeautifulSoup
import abc
import textprocessing as tp #cuz tp is important
#requires Python 3.10
###CLASSES AND CONSTRUCTORS
TagDatePair = namedtuple("TagDatePair", ["tag","date"])
TagTextPair = namedtuple("TagTextPair", ["tag","text"])
#superclasses
class VersionData():
"""
This class holds two types of data:
- Data with a 1-to-1 relationship with the trial/version pair.
- Data with a child relationship with the trial/version pair.
This initializes with None attributes, and implements setter
methods to load them (just to double check types)
That way I can just pass around the VersionData instance
and add data as I go.
It will also implement the ability to load the data to the database
"""
def __init__(self,nct_id,version_id):
#identifiers
self.nct_id = nct_id
self.version_id = version_id
#Study Status
self._primary_completion_date = None
self._primary_completion_date_category = None
self._completion_date = None
self._completion_date_category = None
self._overall_status = None
#Study Design
self._enrollment = None
self._enrollment_category = None
#Sponsors and Collaborators
self._sponsor = None
#self._sponsor_category = None #I don't believe this is included in the raw data
self._responsible_party = None
#self._responsible_party_category = None #I don't believe this is included in the raw data
#self._collaborators = None #currently going to ignore as I've not fount it in AACT
def extract_study_statuses(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Primary Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text,date_MMMM_YYYY)
version_a._primary_completion_date = tagdate1.date
version_a._primary_completion_date_category = tagdate1.tag
tagdate2 = extract_date_and_tag(new.text,date_MMMM_YYYY)
version_b._primary_completion_date = tagdate2.date
version_b._primary_completion_date_category = tagdate2.tag
case ["Study Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text,date_MMMM_YYYY)
version_a._completion_date = tagdate1.date
version_a._completion_date_category = tagdate1.tag
tagdate2 = extract_date_and_tag(new.text,date_MMMM_YYYY)
version_b._completion_date = tagdate2.date
version_b._completion_date_category = tagdate2.tag
case ["Overall Status:" as row_label, tag]:
old,new = split_by_version(tag)
version_a._overall_status = old.text
version_b._overall_status = new.text
def extract_study_design(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Enrollment:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_text_and_tag(old.text)
version_a._enrollment = tagdate1.text
version_a._enrollment_category = tagdate1.tag
tagdate2 = extract_text_and_tag(new.text)
version_b._enrollment = tagdate2.text
version_b._enrollment_category = tagdate2.tag
def extract_sponsor_data(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Sponsor:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._sponsor = old.text
version_b._sponsor = new.text
case ["Responsible Party:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._responsible_party = old.text
version_b._responsible_party = new.text
case ["Collaborators:" as row_label, tag]:
#old, new = split_by_version(tag)
#TODO: find a trial with multiple collaborators and figure out how to identify/count them:w
# So far can't figure out where this is in AACT, so I'm going to ignore it.
pass
def split_by_version(tag):
#clone elements and remove sub-tags that are not needed.
old = copy(tag)
for span in old.find_all(class_="add_hilite"):
span.extract()
new = copy(tag)
for span in new.find_all(class_="drop_hilite"):
span.extract()
return old,new
def extract_date_and_tag(text, date_format):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
#FIX: Currently, there are multiple (mixed) data formats in use
#Theses can exist in the same data field, in two different versions
#so instead of using a single (passed) data format, I need to
#select between various data formats.
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
date_object = datetime.strptime(date_split[0].strip(), date_format)
return TagDatePair(estimate_tag, date_object)
def extract_text_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
text_object = date_split[0].strip()
return TagTextPair(estimate_tag, text_object)
### FUNCTIONS
def tr_to_td(tr) -> tuple[str, str]:
"""
Takes an html data row of interest, extracts the record_name from the first <td>, and the data from the second <td>.
For the data, it just extracts the text.
The text itself then needs processed separately, based on what it should contain.
"""
#get list of cells
td_list = tr.find_all("td")
if len(td_list) == 2:
return td_list[0].text, td_list[1]
else:
return None, None
def get_forms(soup,version_a,version_b):
#extract all forms
for form in soup.body.find_all("form"):
#Match forms against ID types
if not "id" in form.attrs:
continue
match form.attrs["id"]:
case "form_StudyStatus":
extract_study_statuses(form,version_a,version_b)
case "form_SponsorCollaborators":
extract_sponsor_data(form, version_a, version_b)
case "form_Oversight":
pass
case "form_StudyDescription":
pass
case "form_Conditions":
pass
case "form_StudyDesign":
extract_study_design(form,version_a,version_b)
case "form_ArmsandInterventions":
pass
case "form_ProtocolOutcomeMeasures":
pass
case "form_Eligibility":
pass
case "form_ContactsLocations":
pass
case "form_IPDSharing":
pass
case "form_References":
pass
case "form_ParticipantFlow":
pass
case "form_BaselineCharacteristics":
pass
case "form_ROutcomeMeasures":
pass
case "form_AdverseEvents":
pass
case "form_LimitationsandCaveats":
pass
case "form_MoreInformation":
pass
case _:
print(form.attrs["id"])
### CONSTANTS
date_MMMM_YYYY = "%B %Y"
date_MMMM_DD_YYYY = "%B %d, %Y"
if __name__ == "__main__":
for file in ["./NCT00658567.html", "./NCT01303796.html"]:
with open(file) as fh:
soup = BeautifulSoup(fh, "lxml")
version1 = VersionData("NCT00658567",1)
version2 = VersionData("NCT00658567",2)
get_forms(soup, version1, version2)
print(version1.__dict__) #order messed up somewhere:w
print(version2.__dict__) #order messed up somewhere:w