You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ClinicalTrialsDataProcessing/Parser/extraction-lib.py

405 lines
14 KiB
Python

from collections import namedtuple
from copy import copy
from datetime import datetime
import psycopg2
from bs4 import BeautifulSoup
#import textprocessing as tp #cuz tp is important
#requires Python 3.10
###CLASSES AND CONSTRUCTORS
TagDatePair = namedtuple("TagDatePair", ["tag","date"])
TagTextPair = namedtuple("TagTextPair", ["tag","text"])
#superclasses
class VersionData():
"""
This class holds two types of data:
- Data with a 1-to-1 relationship with the trial/version pair.
- Data with a child relationship with the trial/version pair.
This initializes with None attributes, and implements setter
methods to load them (just to double check types)
That way I can just pass around the VersionData instance
and add data as I go.
It will also implement the ability to load the data to the database
"""
def __init__(self,nct_id,version_id):
#identifiers
self.nct_id = nct_id.strip()
self.version_id = version_id
#Study Status
self._primary_completion_date = None
self._primary_completion_date_category = None
self._start_date = None
self._start_date_category = None
self._completion_date = None
self._completion_date_category = None
self._overall_status = None
self._enrollment = None
self._enrollment_category = None
self._sponsor = None
#self._sponsor_category = None #I don't believe this is included in the raw data
self._responsible_party = None
#self._responsible_party_category = None #I don't believe this is included in the raw data
#self._collaborators = None #currently going to ignore as I've not fount it in AACT
def load_to_db(self,db_connection):
#load to initial table, then load any extra details into other tables
sql = """
INSERT INTO history.trial_snapshots
(
nct_id,
version,
primary_completion_date,
primary_completion_date_category,
start_date,
start_date_category,
completion_date,
completion_date_category,
overall_status,
enrollment,
enrollment_category,
sponsor,
responsible_party
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
)
"""
with db_connection.cursor() as db_cursor:
try:
db_cursor.execute(
sql,
(
self.nct_id,
self.version_id,
self._primary_completion_date,
self._primary_completion_date_category,
self._start_date,
self._start_date_category,
self._completion_date,
self._completion_date_category,
self._overall_status,
self._enrollment,
self._enrollment_category,
self._sponsor,
self._responsible_party
)
)
except Exception as err:
#catch any error, print the applicable information, and raise the error.
print(self)
raise err
def optional_strip(possible_string):
if type(possible_string) == str:
return possible_string.strip()
else:
return possible_string
def extract_study_statuses(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Primary Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text)
version_a._primary_completion_date = tagdate1.date
version_a._primary_completion_date_category = optional_strip(tagdate1.tag)
tagdate2 = extract_date_and_tag(new.text)
version_b._primary_completion_date = tagdate2.date
version_b._primary_completion_date_category = optional_strip(tagdate2.tag)
case ["Study Start:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text)
version_a._start_date = tagdate1.date
version_a._start_date_category = optional_strip(tagdate1.tag)
tagdate2 = extract_date_and_tag(new.text)
version_b._start_date = tagdate2.date
version_b._start_date_category = optional_strip(tagdate2.tag)
case ["Study Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text)
version_a._completion_date = tagdate1.date
version_a._completion_date_category = optional_strip(tagdate1.tag)
tagdate2 = extract_date_and_tag(new.text)
version_b._completion_date = tagdate2.date
version_b._completion_date_category = optional_strip(tagdate2.tag)
case ["Overall Status:" as row_label, tag]:
old,new = split_by_version(tag)
version_a._overall_status = optional_strip(old.text)
version_b._overall_status = optional_strip(new.text)
def extract_study_design(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
match tr_to_td(trow):
case ["Enrollment:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_text_and_tag(old.text)
version_a._enrollment = tagdate1.text
version_a._enrollment_category = optional_strip(tagdate1.tag)
tagdate2 = extract_text_and_tag(new.text)
version_b._enrollment = tagdate2.text
version_b._enrollment_category = optional_strip(tagdate2.tag)
def extract_sponsor_data(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
match tr_to_td(trow):
case ["Sponsor:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._sponsor = optional_strip(old.text)
version_b._sponsor = optional_strip(new.text)
case ["Responsible Party:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._responsible_party = optional_strip(old.text)
version_b._responsible_party = optional_strip(new.text)
case ["Collaborators:" as row_label, tag]:
#old, new = split_by_version(tag)
#TODO: find a trial with multiple collaborators and figure out how to identify/count them:w
# So far can't figure out where this is in AACT, so I'm going to ignore it.
pass
def split_by_version(tag):
#clone elements and remove sub-tags that are not needed.
old = copy(tag)
for span in old.find_all(class_="add_hilite"):
span.extract()
new = copy(tag)
for span in new.find_all(class_="drop_hilite"):
span.extract()
return old,new
def extract_date_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
try:
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_YYYY)
except ValueError as ve:
date_object = datetime.strptime(date_split[0].strip(), date_MMMM_DD_YYYY)
return TagDatePair(estimate_tag, date_object)
def extract_text_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
text_object = date_split[0].strip()
return TagTextPair(estimate_tag, text_object)
### FUNCTIONS
def tr_to_td(tr) -> tuple[str, str]:
"""
Takes an html data row of interest, extracts the record_name from the first <td>, and the data from the second <td>.
For the data, it just extracts the text.
The text itself then needs processed separately, based on what it should contain.
"""
#get list of cells
td_list = tr.find_all("td")
if len(td_list) == 2:
return td_list[0].text, td_list[1]
else:
return None, None
def get_forms(soup,version_a,version_b):
#extract all forms
for form in soup.body.find_all("form"):
#Match forms against ID types
if not "id" in form.attrs:
continue
match form.attrs["id"]:
case "form_StudyStatus":
extract_study_statuses(form,version_a,version_b)
case "form_SponsorCollaborators":
extract_sponsor_data(form, version_a, version_b)
case "form_Oversight":
pass
case "form_StudyDescription":
pass
case "form_Conditions":
pass
case "form_StudyDesign":
extract_study_design(form,version_a,version_b)
case "form_ArmsandInterventions":
pass
case "form_ProtocolOutcomeMeasures":
pass
case "form_Eligibility":
pass
case "form_ContactsLocations":
pass
case "form_IPDSharing":
pass
case "form_References":
pass
case "form_ParticipantFlow":
pass
case "form_BaselineCharacteristics":
pass
case "form_ROutcomeMeasures":
pass
case "form_AdverseEvents":
pass
case "form_LimitationsandCaveats":
pass
case "form_MoreInformation":
pass
case _:
print(form.attrs["id"])
### CONSTANTS
date_MMMM_YYYY = "%B %Y"
date_MMMM_DD_YYYY = "%B %d, %Y"
def get_data_from_versions(nct_id,html, version_a_int, version_b_int):
soup = BeautifulSoup(html,"lxml")
version_a = VersionData(nct_id, version_a_int)
version_b = VersionData(nct_id, version_b_int)
get_forms(soup, version_a, version_b)
return version_a,version_b
if __name__ == "__main__":
with psycopg2.connect(dbname="aact_db", user="root", password="root",host="localhost") as db_connection:
#pull the requests from the db
with db_connection.cursor() as curse:
sql = """
SELECT nct_id, version_a,version_b, html
FROM http.responses
"""
responses = curse.execute(sql)
for response in responses.fetch_all():
#
nct_id, version_a, version_b, html = response
version1, version2 = get_data_from_versions(nct_id, html, version_a, version_b)
if version_b == version_a + 1:
version1.load_to_db(db_connection)
version2.load_to_db(db_connection)
else:
version2.load_to_db(db_connection)
"""
Documentation:
TO add a new field to extraction-lib
1. Locate the field in the HTML
- form id (e.g. <form id="form_StudyStatus> gives the form id "form_StudyStatus)
- Table row's data label. This corresponds to the text of first column in the row and will
look something like
<td class="rowLabel" style="min-width: 210px;">Record Verification:</td>.
"Record Verification:" is the data label in the example above.
2. Identify what data you will be extracting
- type (date, text, int, etc)
- if it contains a category ([Actual] vs [Anticipated] etc)
3. Add data to:
- sql table: history.trial_snapshots
- the VersionData class
- the VersionData.load_to_db() function
4. Ensure the field matcher in `get_forms(*)` is matching on the form ID and has a function processing the form
5. Ensure the function processing the form has a match entry to proceess the row
- This should match on data label and then process the data by
- splitting into old and new versions
- Extracting the data for both old and new
- add the data to the passed VersionData objects
"""