You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
97 lines
2.7 KiB
Python
97 lines
2.7 KiB
Python
import json
|
|
from psycopg2.extras import execute_values
|
|
import datetime as dt
|
|
from drugtools.env_setup import postgres_conn, ENV
|
|
import requests
|
|
import zipfile
|
|
import io
|
|
|
|
URL_STEM = 'https://download.open.fda.gov/other/nsde/'
|
|
NUMBER_OF_NSDE_FILES = int(ENV["NUMBER_OF_NSDE_FILES"])
|
|
|
|
def filename_generator(max_num):
|
|
for itt in range(1,max_num+1):
|
|
filename = "other-nsde-{:0>4}-of-{:0>4}.json.zip".format(itt,max_num)
|
|
yield filename
|
|
|
|
def get_date(result,key):
|
|
r = result.get(key)
|
|
if r:
|
|
return dt.datetime.strptime(r, "%Y%m%d")
|
|
else:
|
|
return None
|
|
|
|
def build_values(result):
|
|
#adjust types
|
|
proprietary_name = result.get("proprietary_name")
|
|
application_number_or_citation = result.get("application_number_or_citation")
|
|
product_type = result.get("product_type")
|
|
package_ndc = result.get("package_ndc")
|
|
marketing_category = result.get("marketing_category")
|
|
package_ndc11 = result.get("package_ndc11")
|
|
dosage_form = result.get("dosage_form")
|
|
billing_unit = result.get("billing_unit")
|
|
marketing_start_date = get_date(result,"marketing_start_date")
|
|
marketing_end_date = get_date(result, "marketing_end_date")
|
|
inactivation_date = get_date(result, "inactivation_date")
|
|
reactivation_date = get_date(result,"reactivation_date")
|
|
|
|
return (
|
|
proprietary_name
|
|
,application_number_or_citation
|
|
,product_type
|
|
,package_ndc
|
|
,marketing_category
|
|
,package_ndc11
|
|
,dosage_form
|
|
,billing_unit
|
|
,marketing_start_date
|
|
,marketing_end_date
|
|
,inactivation_date
|
|
,reactivation_date
|
|
)
|
|
|
|
def download_and_extract_zip(base_url,filename):
|
|
response = requests.get(base_url + filename)
|
|
|
|
with zipfile.ZipFile(io.BytesIO(response.content)) as the_zip:
|
|
contents_list = the_zip.infolist()
|
|
for content_name in contents_list:
|
|
return the_zip.read(content_name)
|
|
|
|
def run():
|
|
for filename in filename_generator(NUMBER_OF_NSDE_FILES):
|
|
#It would be nice to replace this^^ file_generator with something that retrieves and unzips the files directly.
|
|
with (postgres_conn() as con , con.cursor() as curse):
|
|
print(filename)
|
|
|
|
j = download_and_extract_zip(URL_STEM, filename)
|
|
results = json.loads(j)["results"]
|
|
query = """
|
|
INSERT INTO spl.nsde (
|
|
proprietary_name
|
|
,application_number_or_citation
|
|
,product_type
|
|
,package_ndc
|
|
,marketing_category
|
|
,package_ndc11
|
|
,dosage_form
|
|
,billing_unit
|
|
,marketing_start_date
|
|
,marketing_end_date
|
|
,inactivation_date
|
|
,reactivation_date
|
|
)
|
|
VALUES %s;
|
|
"""
|
|
|
|
values = [build_values(y) for y in results]
|
|
execute_values(curse,query,values)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|