Mostly working data extraction, removed dependency on text processing (will remove in future commit)

parser
will king 4 years ago
parent 71e87a9abe
commit b1c146d550

@ -1,79 +1,227 @@
from tokenize import String
from collections import namedtuple
from copy import copy
from datetime import datetime
from ensurepip import version
from bs4 import BeautifulSoup
import abc
import textprocessing as tp #cuz tp is important
#requires Python 3.10
def extract_data_from_tr(tr) -> tuple[String, String]:
"""
Takes an html data row of interest, extracts the record_name from the first <td>, and the data from the second <td>.
For the data, it will split between old and new data, making copies of each and returnign them.
###CLASSES AND CONSTRUCTORS
Uses functionality from ./textprocessing.py (separated because it is important to test that functionality)
to get extract data from tags.
"""
#get list of cells
#for cell in cells
#if class_=="rowLabel", extract text
#else parse out new and old text
#return triple: row_lable, old, new
pass
TagDatePair = namedtuple("TagDatePair", ["tag","date"])
TagTextPair = namedtuple("TagTextPair", ["tag","text"])
#superclasses
class VersionData{abc.ABC}:
class VersionData():
"""
This abstract class holds two types of data:
This class holds two types of data:
- Data with a 1-to-1 relationship with the trial/version pair.
- Data with a child relationship with the trial/version pair.
Each subclass will return the 1-to-1 data for another system to add to the DB.
This is so that a single record can be created in one go.
Each subclass will load the child data to the database directly.
This initializes with None attributes, and implements setter
methods to load them (just to double check types)
That way I can just pass around the VersionData instance
and add data as I go.
It will also implement the ability to load the data to the database
"""
@abc.abstractmethod
def version_fields(self):
def __init__(self,nct_id,version_id):
#identifiers
self.nct_id = nct_id
self.version_id = version_id
#Study Status
self._primary_completion_date = None
self._primary_completion_date_category = None
self._completion_date = None
self._completion_date_category = None
self._overall_status = None
#Study Design
self._enrollment = None
self._enrollment_category = None
#Sponsors and Collaborators
self._sponsor = None
#self._sponsor_category = None #I don't believe this is included in the raw data
self._responsible_party = None
#self._responsible_party_category = None #I don't believe this is included in the raw data
#self._collaborators = None #currently going to ignore as I've not fount it in AACT
def extract_study_statuses(study_status_form, version_a,version_b):
"""
This function returns data that should be included in a standard table
related to version_x of the record.
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
It also returns the columns?
"""
pass
@abc.abstractmethod
def version_records(self, foreign_key, db_cursor):
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Primary Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text,date_MMMM_YYYY)
version_a._primary_completion_date = tagdate1.date
version_a._primary_completion_date_category = tagdate1.tag
tagdate2 = extract_date_and_tag(new.text,date_MMMM_YYYY)
version_b._primary_completion_date = tagdate2.date
version_b._primary_completion_date_category = tagdate2.tag
case ["Study Completion:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_date_and_tag(old.text,date_MMMM_YYYY)
version_a._completion_date = tagdate1.date
version_a._completion_date_category = tagdate1.tag
tagdate2 = extract_date_and_tag(new.text,date_MMMM_YYYY)
version_b._completion_date = tagdate2.date
version_b._completion_date_category = tagdate2.tag
case ["Overall Status:" as row_label, tag]:
old,new = split_by_version(tag)
version_a._overall_status = old.text
version_b._overall_status = new.text
def extract_study_design(study_status_form, version_a,version_b):
"""
This function loads data that needs to be held in auxilary tables
into the database.
For example, the list of sponsors will need to be tracked separatly from
trial status.
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
pass
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Enrollment:" as row_label, tag]:
old,new = split_by_version(tag)
tagdate1 = extract_text_and_tag(old.text)
version_a._enrollment = tagdate1.text
version_a._enrollment_category = tagdate1.tag
tagdate2 = extract_text_and_tag(new.text)
version_b._enrollment = tagdate2.text
version_b._enrollment_category = tagdate2.tag
class StudyStatusData(VersionData):
columns = ["primary_completion_date", "completion_date", "last_update_posted_date"]
def __init__(self ,primary_completion_date, completion_date, last_update_posted_date) -> None:
def extract_sponsor_data(study_status_form, version_a,version_b):
"""
This extracts data from a study_status form and returns one or two
StudyStatusData objects,
"""
#get rows
rows = study_status_form.table.tbody.find_all("tr")
#iterate through rows,
for trow in rows:
#matching on rowLabels
#print(trow.__str__()[:80])
match tr_to_td(trow):
case ["Sponsor:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._sponsor = old.text
version_b._sponsor = new.text
case ["Responsible Party:" as row_label, tag]:
old, new = split_by_version(tag)
version_a._responsible_party = old.text
version_b._responsible_party = new.text
case ["Collaborators:" as row_label, tag]:
#old, new = split_by_version(tag)
#TODO: find a trial with multiple collaborators and figure out how to identify/count them:w
# So far can't figure out where this is in AACT, so I'm going to ignore it.
pass
def extract_study_statuses(study_status_form, version_a,version_b):
def split_by_version(tag):
#clone elements and remove sub-tags that are not needed.
old = copy(tag)
for span in old.find_all(class_="add_hilite"):
span.extract()
new = copy(tag)
for span in new.find_all(class_="drop_hilite"):
span.extract()
return old,new
def extract_date_and_tag(text, date_format):
"""
This extracts data from a study_status form and returns one or two StudyStatusData objects
Extracts a datetype according to the date format
and the estimate tag based on
"""
pass
#FIX: Currently, there are multiple (mixed) data formats in use
#Theses can exist in the same data field, in two different versions
#so instead of using a single (passed) data format, I need to
#select between various data formats.
class SponsorCollaboratorsData(VersionData):
columns=[]
def __init__(self) -> None:
pass
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
date_object = datetime.strptime(date_split[0].strip(), date_format)
return TagDatePair(estimate_tag, date_object)
def extract_text_and_tag(text):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
text = text.strip()
#handle various empty cases
if not text or text == '':
return TagDatePair(None, None)
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
text_object = date_split[0].strip()
return TagTextPair(estimate_tag, text_object)
def get_forms(soup):
### FUNCTIONS
def tr_to_td(tr) -> tuple[str, str]:
"""
Takes an html data row of interest, extracts the record_name from the first <td>, and the data from the second <td>.
For the data, it just extracts the text.
The text itself then needs processed separately, based on what it should contain.
"""
#get list of cells
td_list = tr.find_all("td")
if len(td_list) == 2:
return td_list[0].text, td_list[1]
else:
return None, None
data_list = []
def get_forms(soup,version_a,version_b):
#extract all forms
for form in soup.body.find_all("form"):
@ -83,9 +231,9 @@ def get_forms(soup):
match form.attrs["id"]:
case "form_StudyStatus":
print("test successful 2")
extract_study_statuses(form,version_a,version_b)
case "form_SponsorCollaborators":
pass
extract_sponsor_data(form, version_a, version_b)
case "form_Oversight":
pass
case "form_StudyDescription":
@ -93,7 +241,7 @@ def get_forms(soup):
case "form_Conditions":
pass
case "form_StudyDesign":
pass
extract_study_design(form,version_a,version_b)
case "form_ArmsandInterventions":
pass
case "form_ProtocolOutcomeMeasures":
@ -121,7 +269,19 @@ def get_forms(soup):
case _:
print(form.attrs["id"])
### CONSTANTS
date_MMMM_YYYY = "%B %Y"
date_MMMM_DD_YYYY = "%B %d, %Y"
if __name__ == "__main__":
with open("./NCT00658567.html") as fh:
for file in ["./NCT00658567.html", "./NCT01303796.html"]:
with open(file) as fh:
soup = BeautifulSoup(fh, "lxml")
get_forms(soup)
version1 = VersionData("NCT00658567",1)
version2 = VersionData("NCT00658567",2)
get_forms(soup, version1, version2)
print(version1.__dict__) #order messed up somewhere:w
print(version2.__dict__) #order messed up somewhere:w

@ -6,9 +6,9 @@ CREATE TABLE history.versions
nct_id
version
--Study Status
overall_status
primary_completion_date
completion_date
overall_status^
primary_completion_date^
completion_date^
last_update_submitted_date
--SponsorCollaborators
sponsor (multi?)
@ -31,7 +31,7 @@ CREATE TABLE history.versions
Number of Arms
Masking
Allocation
Enrollment
Enrollment ^
--ArmsAndInterventions
Arms (multiple) (Ignore)
--ProtocolOutcomeMeasures

@ -1,4 +1,6 @@
from cgitb import html
from copy import copy
from datetime import datetime
from bs4 import BeautifulSoup
import re
form = """
@ -106,16 +108,77 @@ entry1 = """
"""
drop_old_re = re.compile('<span class="drop_hilite">\w*</span>\s?')
drop_new_re = re.compile('<span class="add_hilite">\w*</span>\s?')
drop_tags_re = re.compile('<[=-_,.:;"/\w\s]+>')
entry2 = '<td> <span class="add_hilite">December 2009 [Actual]</span> </td>'
DROP_HILITE_re = re.compile('<span class="drop_hilite">[\[\]\w]*</span>\s?')
ADD_HILITE_re = re.compile('<span class="add_hilite">\w*</span>\s?')
TAGS_RE = re.compile('<[=-_,.:;"/\w\s]+>')
print(drop_new_re.sub("",entry1))
print(drop_old_re.sub("",entry1))
print(drop_tags_re.sub("",entry1))
def extract_new_data(td):
text = td.__str__()
return TAGS_RE.sub("",DROP_HILITE_re.sub(" ",text)).strip()
print(drop_tags_re.sub("",drop_new_re.sub("",entry1)))
def extract_old_data(td):
text = td.__str__()
return TAGS_RE.sub("",ADD_HILITE_re.sub(" ",text)).strip()
def delete_tags(td):
text = td.__str__()
return TAGS_RE.sub(" ",text).strip()
print(drop_tags_re.sub("",drop_new_re.sub("",form)))
def extract_date_and_tag(text, date_format):
"""
Extracts a datetype according to the date format
and the estimate tag based on
"""
if not text:
return " "
date_split = text.split("[")
if len(date_split) > 1:
estimate_tag = date_split[1].split("]")[0].strip()
else:
estimate_tag = None
date_object = datetime.strptime(date_split[0].strip(), date_format)
return estimate_tag, date_object
#TODO: Write test
def extract_text_and_tag(text):
"""
"""
pass
if __name__ == "__main__":
Entry = BeautifulSoup(entry1, "lxml")
Form = BeautifulSoup(form, "lxml")
print(extract_new_data(Entry.find_all("td")[1]))
print(extract_old_data(Entry.find_all("td")[1]))
for tr in Form.find_all("tr"):
data = tr.find_all("td")
match len(data):
case 0: print("no data")
case 1: print("1\t",data[0])
case _: print(len(data), "\t", extract_new_data(data[1]) ,"\t|\t", extract_old_data(data[1]))
#print(extract_date_and_tag(extract_old_data(Entry.find_all("td")[1]), "%B %Y"))
print(extract_date_and_tag("April 2008 [ test ]", "%B %Y"))
Entry2 = BeautifulSoup(entry2,"lxml")
print(extract_old_data(Entry2)) #error here.
print(extract_new_data(Entry2))
Entry3 = copy(Entry2)
print(Entry3)
Entry4 = Entry3.find_all(class_="add_hilite")[0].extract()
print(Entry3.text)
print(Entry4.text)
Loading…
Cancel
Save