from collections import namedtuple from copy import copy from datetime import datetime from bs4 import BeautifulSoup from drugtools.env_setup import ENV,postgres_conn #requires Python 3.10 #### GLOBALS VERBOSE = True if ENV["VERBOSE"] == "True" else False ###CLASSES AND CONSTRUCTORS TagDatePair = namedtuple("TagDatePair", ["tag","date"]) TagTextPair = namedtuple("TagTextPair", ["tag","text"]) #superclasses class VersionData(): """ This class holds two types of data: - Data with a 1-to-1 relationship with the trial/version pair. - Data with a child relationship with the trial/version pair. This initializes with None attributes, and implements setter methods to load them (just to double check types) That way I can just pass around the VersionData instance and add data as I go. It will also implement the ability to load the data to the database """ def __init__(self,nct_id,version_id): #identifiers self.nct_id = nct_id.strip() self.version_id = version_id #Study Status self._primary_completion_date = None self._primary_completion_date_category = None self._start_date = None self._start_date_category = None self._completion_date = None self._completion_date_category = None self._overall_status = None self._enrollment = None self._enrollment_category = None self._sponsor = None #self._sponsor_category = None #I don't believe this is included in the raw data self._responsible_party = None #self._responsible_party_category = None #I don't believe this is included in the raw data #self._collaborators = None #currently going to ignore as I've not fount it in AACT def load_to_db(self,db_connection): #load to initial table, then load any extra details into other tables sql = """ INSERT INTO history.trial_snapshots ( nct_id, version, primary_completion_date, primary_completion_date_category, start_date, start_date_category, completion_date, completion_date_category, overall_status, enrollment, enrollment_category, sponsor, responsible_party ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ with db_connection.cursor() as db_cursor: try: db_cursor.execute( sql, ( self.nct_id, self.version_id, self._primary_completion_date, self._primary_completion_date_category, self._start_date, self._start_date_category, self._completion_date, self._completion_date_category, self._overall_status, self._enrollment, self._enrollment_category, self._sponsor, self._responsible_party ) ) except Exception as err: #catch any error, print the applicable information, and raise the error. print(self) raise err db_connection.commit() def optional_strip(possible_string): if type(possible_string) == str: return possible_string.strip() else: return possible_string def extract_study_statuses(study_status_form, version_a,version_b): """ This extracts data from a study_status form and returns one or two StudyStatusData objects, """ #get rows rows = study_status_form.table.tbody.find_all("tr") #iterate through rows, for trow in rows: #matching on rowLabels match tr_to_td(trow): case ["Primary Completion:" as row_label, old,new]: print("row matched: {}".format(row_label)) if VERBOSE else "" tagdate1 = extract_date_and_tag(old.text) version_a._primary_completion_date = tagdate1.date version_a._primary_completion_date_category = optional_strip(tagdate1.tag) tagdate2 = extract_date_and_tag(new.text) version_b._primary_completion_date = tagdate2.date version_b._primary_completion_date_category = optional_strip(tagdate2.tag) case ["Study Start:" as row_label, old, new]: print("row matched: {}".format(row_label)) if VERBOSE else "" tagdate1 = extract_date_and_tag(old.text) version_a._start_date = tagdate1.date version_a._start_date_category = optional_strip(tagdate1.tag) tagdate2 = extract_date_and_tag(new.text) version_b._start_date = tagdate2.date version_b._start_date_category = optional_strip(tagdate2.tag) case ["Study Completion:" as row_label, old,new]: print("row matched: {}".format(row_label)) if VERBOSE else "" tagdate1 = extract_date_and_tag(old.text) version_a._completion_date = tagdate1.date version_a._completion_date_category = optional_strip(tagdate1.tag) tagdate2 = extract_date_and_tag(new.text) version_b._completion_date = tagdate2.date version_b._completion_date_category = optional_strip(tagdate2.tag) case ["Overall Status:" as row_label, old,new]: print("row matched: {}".format(row_label)) if VERBOSE else "" #split out any notes such as "Suspended [reason for suspenstion ]" version_a._overall_status = optional_strip(old.text.split("[")[0]) #split out any notes such as "Suspended [reason for suspenstion ]" version_b._overall_status = optional_strip(new.text.split("[")[0]) #FIX: There is an issue with NCT00789633 where the overall status includes information as to why it was suspended. case _ as row_label: print("row not matched: {}".format(row_label)) if VERBOSE else "" def extract_study_design(study_status_form, version_a,version_b): """ This extracts data from a study_status form and returns one or two StudyStatusData objects, """ #get rows rows = study_status_form.table.tbody.find_all("tr") #iterate through rows, for trow in rows: #matching on rowLabels match tr_to_td(trow): case ["Enrollment:" as row_label, old, new]: print("row matched: {}".format(row_label)) if VERBOSE else "" #Extract tag and text, add them to preallocated object tagtext1 = extract_text_and_tag(old.text) version_a._enrollment = tagtext1.text version_a._enrollment_category = optional_strip(tagtext1.tag) tagtext2 = extract_text_and_tag(new.text) version_b._enrollment = tagtext2.text version_b._enrollment_category = optional_strip(tagtext2.tag) case _ as row_label: print("row not matched: {}".format(row_label)) if VERBOSE else "" def extract_sponsor_data(study_status_form, version_a,version_b): """ This extracts data from a study_status form and returns one or two StudyStatusData objects, """ #get rows rows = study_status_form.table.tbody.find_all("tr") #iterate through rows, for trow in rows: #matching on rowLabels match tr_to_td(trow): case ["Sponsor:" as row_label, old, new]: print("row matched: {}".format(row_label)) if VERBOSE else "" version_a._sponsor = optional_strip(old.text) version_b._sponsor = optional_strip(new.text) case ["Responsible Party:" as row_label, old, new]: print("row matched: {}".format(row_label)) if VERBOSE else "" version_a._responsible_party = optional_strip(old.text) version_b._responsible_party = optional_strip(new.text) case ["Collaborators:" as row_label, old, new]: print("row matched: {}".format(row_label)) if VERBOSE else "" #TODO: find a trial with multiple collaborators and figure out how to identify/count them:w # So far can't figure out where this is in AACT, so I'm going to ignore it. pass case _ as row_label: print("row not matched: {}".format(row_label)) if VERBOSE else "" def split_by_version(tag): ''' OUTDATED: With the new format that separates old and new versions, I don't technically need this. It is a nice place to identify exact changes if those are every needed though and it removes the highlights cleanly. ''' #clone elements and remove sub-tags that are not needed. old = copy(tag) for span in old.find_all(class_="add_hilite"): span.extract() new = copy(tag) for span in new.find_all(class_="drop_hilite"): span.extract() return old,new def extract_date_and_tag(text): """ Extracts a datetype according to the date format and the estimate tag based on """ text = text.strip() #handle various empty cases if not text or text == '': return TagDatePair(None, None) date_split = text.split("[") if len(date_split) > 1: estimate_tag = date_split[1].split("]")[0].strip() else: estimate_tag = None try: date_object = datetime.strptime(date_split[0].strip(), date_MMMM_YYYY) except ValueError as ve: date_object = datetime.strptime(date_split[0].strip(), date_MMMM_DD_YYYY) return TagDatePair(estimate_tag, date_object) def extract_text_and_tag(text): """ Extracts a datetype according to the date format and the estimate tag based on """ text = text.strip() #handle various empty cases if not text or text == '': return TagTextPair(None, None) date_split = text.split("[") if len(date_split) > 1: estimate_tag = date_split[1].split("]")[0].strip() else: estimate_tag = None text_object = date_split[0].strip() return TagTextPair(estimate_tag, text_object) ### FUNCTIONS def tr_to_td(tr) -> tuple[str, str, str]: """ Takes an html data row of interest, extracts the record_name from the first