Source code for scrapemed.paper

"""
ScrapeMed's Paper Module
============================

The scrapemed `paper` module is intended as the primary point of contact for
scrapemed end users.

Paper objects are defined here, as well end-user functionality for scraping
data from PubMed Central without stressing about the details.

..warnings::
    - :class:`emptyTextWarning` - Warned when trying to perform a text
        operation on a Paper which has no text.
    - :class:`pubmedHTTPError` - Warned when unable to retrieve a PMC XML
        repeatedly. Can occasionally happen with PMC due to high traffic.
        Also may be caused by broken XML formatting.
"""

import scrapemed._parse as parse
import lxml.etree as ET
import pandas as pd
import datetime
import chromadb
from langchain.text_splitter import CharacterTextSplitter
from typing import Union, Dict
from difflib import SequenceMatcher
import uuid
import re
import warnings
from urllib.error import HTTPError
import time


[docs] class emptyTextWarning(Warning): """ Warned when trying to perform a text operation on a Paper which has no text. """ pass
[docs] class pubmedHTTPError(Warning): """ Warned when unable to retrieve a PMC XML repeatedly. Can occasionally happen with PMC due to high traffic. Also may be caused by broken XML formatting. """ pass
# --------------------PAPER OBJECT SCHEMA-------------------------------------
[docs] class Paper: """ Class for storing paper data downloaded from PMC. This class provides methods for initializing papers via PMCID and directly from XML, paper chunking and vectorization, conversion to relational format (pandas Series), printing methods, and equality checking. Class data members include all of the data defined via the method :meth:`~Paper.info`. :raises pubmedHTTPError: Raised if there are HTTP errors when retrieving data from PMC. :raises emptyTextWarning: Raised if an attempt is made to vectorize a paper with no text. :Example: To initialize a Paper object with paper_dict: >>> paper = Paper(paper_dict) """ __tablename__ = "Papers" def __init__(self, paper_dict: dict) -> None: """ Initialize a Paper object with paper information parsed from a PMC download. :param dict paper_dict: A dictionary containing paper information, typically obtained from the parse.generate_paper_dict method. """ if not paper_dict: self.has_data = False return None else: self.has_data = True # capture current time as time of last update. Note that this date # may not be synced with PMC paper updates if using # initialization via Paper.from_xml. Use Paper.from_pmc to update # papers directly via PMC current_datetime = datetime.datetime.now() current_year = current_datetime.year current_month = current_datetime.month current_day = current_datetime.day self.last_updated = (current_month, current_day, current_year) # read in the Paper data from the parsed paper_dict self.pmcid = paper_dict["PMCID"] self.title = paper_dict["Title"] self.authors = paper_dict["Authors"] self.non_author_contributors = paper_dict["Non-Author Contributors"] self.abstract = paper_dict["Abstract"] self.body = paper_dict["Body"] self.journal_id = paper_dict["Journal ID"] self.journal_title = paper_dict["Journal Title"] self.issn = paper_dict["ISSN"] self.publisher_name = paper_dict["Publisher Name"] self.publisher_location = paper_dict["Publisher Location"] self.article_id = paper_dict["Article ID"] self.article_types = paper_dict["Article Types"] self.article_categories = paper_dict["Article Categories"] self.published_date = paper_dict["Published Date"] self.volume = paper_dict["Volume"] self.issue = paper_dict["Issue"] self.fpage = paper_dict["First Page"] self.lpage = paper_dict["Last Page"] self.permissions = paper_dict["Permissions"] if self.permissions: self.copyright = self.permissions["Copyright Statement"] self.license = self.permissions["License Type"] else: self.copyright = None self.license = None self.funding = paper_dict["Funding"] self.footnote = paper_dict["Footnote"] self.acknowledgements = paper_dict["Acknowledgements"] self.notes = paper_dict["Notes"] self.custom_meta = paper_dict["Custom Meta"] self.ref_map = paper_dict["Ref Map"] self._ref_map_with_tags = paper_dict["Ref Map With Tags"] self.citations = paper_dict["Citations"] self.tables = paper_dict["Tables"] self.figures = paper_dict["Figures"] self.data_dict = parse.define_data_dict() self.vector_collection = None return None
[docs] @classmethod def from_pmc( cls, pmcid: int, email: str, download: bool = False, validate: bool = True, verbose: bool = False, suppress_warnings: bool = False, suppress_errors: bool = False, ): """ Generate a Paper from a PMCID with optional parameters. :param int pmcid: Unique PMCID for the article to parse. :param str email: Provide your email address for authentication with PMC. :param bool download: Whether or not to download the XML retrieved from PMC. :param bool validate: Whether or not to validate the XML from PMC against NLM articleset 2.0 DTD (HIGHLY RECOMMENDED). :param bool verbose: Whether or not to have verbose output for testing. :param bool suppress_warnings: Whether to suppress warnings while parsing XML. Note: Warnings are frequent, because of the variable nature of PMC XML data. Recommended to suppress when parsing many XMLs at once. :param bool suppress_errors: Return None on failed XML parsing, instead of raising an error. :return: A Paper object initialized via the passed PMCID and optional parameters. :rtype: Paper """ NUM_TRIES = 3 paper_dict = None for i in range(NUM_TRIES): try: paper_dict = parse.paper_dict_from_pmc( pmcid=pmcid, email=email, download=download, validate=validate, verbose=verbose, suppress_warnings=suppress_warnings, suppress_errors=suppress_errors, ) break except HTTPError: time.sleep(5) if not paper_dict: warnings.warn( ( f"Unable to retrieve PMCID {pmcid} from PMC. May be due to " "HTTP traffic or broken XML formatting, try again later if " "the former." ), pubmedHTTPError, ) return None return cls(paper_dict)
[docs] @classmethod def from_xml( cls, pmcid: int, root: ET.Element, verbose: bool = False, suppress_warnings: bool = False, suppress_errors: bool = False, ): """ Generate a Paper straight from PMC XML. :param int pmcid: PMCID for the XML. THis is required intentionally, to ensure trustworthy unique indexing of PMC XMLs. :param ET.Element root: Root element of the PMC XML tree. :param bool verbose: Report verbose output or not. Intended for testing. :param bool suppress_warnings: Suppress warnings while parsing XML or not. Note: Warnings are frequent, because of the variable nature of PMC XML data. Recommended to suppress when parsing many XMLs at once. :param bool suppress_errors: Return None on failed XML parsing, instead of raising an error. Recommended to suppress when parsing many XMLs at once, unless failure is not an option. :returns: A Paper object initialized via the passed XML. :rtype: Paper """ paper_dict = parse.generate_paper_dict( pmcid, root, verbose=verbose, suppress_warnings=suppress_warnings, suppress_errors=suppress_errors, ) return cls(paper_dict)
[docs] def info(self) -> Dict[str, str]: """ Return the data definition dictionary. :return: A dictionary containing paper information. :rtype: dict[str, str] """ return self.data_dict
[docs] def print_abstract(self) -> str: """ Print and return a string representation of the abstract. :return: A string containing the abstract text. :rtype: str """ s = self.abstract_as_str() print(s) return s
[docs] def abstract_as_str(self) -> str: """ Return a string representation of the abstract of a paper. This method retrieves the abstract text without MHTML data references. :return: A string containing the abstract text. :rtype: str """ s = "" if self.abstract: for sec in self.abstract: s += "\n" s += str(sec) return s
[docs] def print_body(self) -> str: """ Print and return a string representation of the body of a paper. This method retrieves the body text without MHTML data references. :return: A string containing the body text. :rtype: str """ s = self.body_as_str() print(s) return s
[docs] def body_as_str(self) -> str: """ Return a string representation of the body of a paper. :return: A string containing the body text. :rtype: str """ s = "" if self.body: for sec in self.body: s += "\n" s += str(sec) return s
def __bool__(self): """ Determine the truth value of a Paper object based on successful initialization. :return: True if the Paper object was successfully initialized with data, False otherwise. :rtype: bool """ return self.has_data
[docs] def full_text(self, print_text: bool = False): """ Return the full abstract and/or body text of this Paper as a string. Optionally, you can choose to print the text. :param bool print_text: If True, print the text; if False, return it as a string. :return: A string containing the full text of the abstract and/or body. :rtype: str """ s = "" if self.abstract: s += "Abstract: \n" s += self.abstract_as_str() if self.body: s += "Body: \n" s += self.body_as_str() if print_text: print(s) return s
def __str__(self): """ Return a string representation of the Paper object. :return: A string containing the PMCID, title, abstract, and body text of the paper. :rtype: str """ s = "" s += f"\nPMCID: {self.pmcid}\n" s += f"Title: {self.title}\n" # Append all text from abstract PaperSections s += "\nAbstract:\n" if self.abstract: for sec in self.abstract: s += str(sec) # Append all text from body PaperSections s += "\nBody:\n" if self.body: for sec in self.body: s += str(sec) return s def __eq__(self, other): """ Check if two Paper objects are equal. Two Paper objects are considered equal if they share the same PMCID and have the same date of last update. Papers with the same content but downloaded or parsed on different dates are not considered equal. To compare Paper objects based solely on their PMCID, use `Paper1.pmcid == Paper2.pmcid`. Note that articles that are not open access on PMC may not have a PMCID, and a unique comparison method will be needed for these cases. However, most papers downloaded via ScrapeMed should have a PMCID. :param other: The other Paper object to compare. :type other: Paper :return: True if the two Paper objects are equal, False otherwise. :rtype: bool """ if not self: return False return self.pmcid == other.pmcid and self.last_updated == other.last_updated
[docs] def to_relational(self) -> pd.Series: """ Generate a pandas Series representation of the paper. This method creates a pandas Series containing a relational representation of the paper's data. Some data may be lost in this process, but most useful text data and metadata will be retained in a structured form. :return: A pandas Series representing the paper's data. :rtype: pd.Series """ data = { "PMCID": self.pmcid, "Last_Updated": self.last_updated, "Title": self.title, "Authors": self._extract_names(self.authors) if isinstance(self.authors, pd.DataFrame) else None, "Non_Author_Contributors": self._extract_names(self.non_author_contributors) if isinstance(self.non_author_contributors, pd.DataFrame) else None, "Abstract": self.abstract_as_str(), "Body": self.body_as_str(), "Journal_ID": self.journal_id, "Journal_Title": self.journal_title, "ISSN": self.issn, "Publisher_Name": self.publisher_name, "Publisher_Location": self.publisher_location, "Article_ID": self.article_id, "Article_Types": self.article_types, "Article_Categories": self.article_categories, "Published_Date": self._serialize_dict(self.published_date) if isinstance(self.published_date, dict) else None, "Volume": self.volume, "Issue": self.issue, "First_Page": self.fpage, "Last_Page": self.lpage, "Copyright": self.copyright, "License": self.license, "Funding": self.funding, "Footnote": self.footnote, "Acknowledgements": self.acknowledgements, "Notes": self.notes, "Custom_Meta": self.custom_meta, "Ref_Map": self.ref_map, "Citations": [ self._serialize_dict(c) for c in self.citations if isinstance(c, dict) ], "Tables": [ self._serialize_df(t) for t in self.tables if isinstance(t, (pd.io.formats.style.Styler, pd.DataFrame)) ], "Figures": self.figures, } return pd.Series(data)
# ---------------Helper functions for to_relational--------------------- def _extract_names(self, df): """ Extract and format names from a DataFrame. :param df: The DataFrame containing name data. :type df: pd.DataFrame :return: A list of formatted names. :rtype: List[str] """ return df.apply( lambda row: f"{row['First_Name']} {row['Last_Name']}", axis=1 ).tolist() def _serialize_dict(self, data_dict): """ Serialize a dictionary into a string. :param data_dict: The dictionary to serialize. :type data_dict: dict :return: A string representation of the serialized dictionary. :rtype: str """ return "; ".join([f"{key}: {value}" for key, value in data_dict.items()]) def _serialize_df(self, df): """ Serialize a DataFrame into an HTML string. :param df: The DataFrame to serialize. :type df: pd.DataFrame :return: An HTML representation of the serialized DataFrame. :rtype: str """ return df.to_html() # ---------------End Helper functions for to_relational--------------------
[docs] def vectorize( self, chunk_size: int = 100, chunk_overlap: int = 20, refresh: bool = False ): """ Generate an in-memory vector database representation of the paper. This method generates an in-memory vector database representation of the paper, stored in `paper.vector_collection`. It focuses on vectorizing the abstract and body text. :param int chunk_size: An approximate chunk size to split the paper into (measured in characters). :param int chunk_overlap: An approximate desired chunk overlap (measured in characters). :param bool refresh: Whether or not to clear and re-vectorize the paper with new settings. :return: None """ if not refresh and self.vector_collection: print( ( "Paper already vectorized! To re-vectorize with new " "settings, pass refresh=True." ) ) return None print("Vectorizing Paper (This may take a little while)...") if len(self.full_text()) == 0: warnings.warn( "Attempted to vectorize a Paper with no text. Aborting.", emptyTextWarning, ) return None # Set up an in-memory chromadb collection for this paper client = chromadb.Client() try: self.vector_collection = client.get_or_create_collection( f"Paper-PMCID-{self.pmcid}" ) except AttributeError: self.vector_collection = client.get_or_create_collection( f"Paper-Random-UUID-{uuid.uuid4()}" ) # setup chunk model chunk_model = CharacterTextSplitter( separator="\\n\\n|\\n|\\.|\\s", is_separator_regex=True, chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len, keep_separator=True, ) # chunk the text, add metadata for the PMCID each chunk # originates from, add unique chunk ids p_chunks = chunk_model.split_text(self.full_text()) p_metadatas = [{"pmcid": self.pmcid}] * len(p_chunks) try: pmcid = self.pmcid except AttributeError: pmcid = uuid.uuid4() p_ids = [self._generate_chunk_id(pmcid, i) for i in range(len(p_chunks))] # upload the chunked texts into the vector collection self.vector_collection.add(documents=p_chunks, metadatas=p_metadatas, ids=p_ids) print( ( "Done Vectorizing Paper! Natural language query with " "Paper.query() now available." ) ) return None
# -----------------helper funcs for self.vectorize----------------- def _generate_chunk_id(self, pmcid: str, index: Union[int, str]): """ Generate an ID for a PMC text chunk using the PMCID and the chunk's index. The chunk indices should be unique. It is recommended to use indexes from the result of the chunk model. :param str pmcid: The PMCID of the paper. :param Union[int, str] index: The index of the chunk. :return: A unique chunk ID. :rtype: str """ return f"pmcid-{pmcid}-chunk-{str(index)}" def _get_chunk_index_from_chunk_id(self, chunk_id: str) -> str: """ Given a PMCID Chunk ID in the format generated by `_generate_chunk_id`, extract the index of the chunk. :param str chunk_id: The chunk ID. :return: The index of the chunk. :rtype: str """ pattern = re.compile(r"chunk-(\d+)") # Compile the regex pattern match = pattern.search(chunk_id) index = None if match: index = match.group(1) return index def _get_pmcid_from_chunk_id(self, chunk_id: str) -> str: """ Given a PMCID Chunk ID in the format generated by `_generate_chunk_id`, extract the PMCID of the chunk. :param str chunk_id: The chunk ID. :return: The PMCID of the chunk. :rtype: str """ pattern = re.compile(r"pmcid-(\d+)") # Compile the regex pattern match = pattern.search(chunk_id) pmcid = None if match: pmcid = match.group(1) return pmcid # -----------------end helper funcs for self.vectorize-----------------
[docs] def query( self, query: str, n_results: int = 1, n_before: int = 2, n_after: int = 2 ) -> Dict[str, str]: """ Query the paper with natural language questions. :param str query: The natural language question/query. :param int n_results: The number of most semantically similar paper sections to retrieve. :param int n_before: The number of chunks before the match to include in the combined output. :param int n_after: The number of chunks after the match to include in the combined output. :return: A dictionary with keys representing the most semantically similar result chunk(s) and values representing the paper text(s) around the most semantically similar result chunk(s). The text length is determined by the chunk size used in `self.vectorize()` and the params `n_before` and `n_after`. :rtype: dict[str, str] """ result = self.expanded_query( query=query, n_results=n_results, n_before=n_before, n_after=n_after ) return result
# -----------------helper funcs for self.query----------------------
[docs] def expanded_query( self, query: str, n_results: int = 1, n_before: int = 2, n_after: int = 2 ) -> Dict[str, str]: """ Query the paper with an expanded natural language question/query. This method matches a natural language query with the vectorized Paper. It retrieves and expands the text sections around the most semantically similar result chunk(s). :param str query: The natural language query. :param int n_results: The number of most semantically similar paper sections to retrieve. :param int n_before: The number of chunks before the match to include in the combined output. :param int n_after: The number of chunks after the match to include in the combined output. :return: A dictionary with keys representing the most semantically similar result chunk(s) and values representing the expanded paper text(s) around the result chunk(s). :rtype: dict[str, str] """ # if the paper has not already been vectorized, vectorize if not self.vector_collection: self.vectorize() # if vectorization fails, abort if not self.vector_collection: return None result = self.vector_collection.query( query_texts=[query], include=["documents"], n_results=n_results ) expanded_results = {} for id in result["ids"][0]: chunk_index = self._get_chunk_index_from_chunk_id(id) pmcid = self._get_pmcid_from_chunk_id(id) # get the texts before and after the result chunk expanded_ids = [] for i in range(1, n_before + 1): expanded_ids.append( self._generate_chunk_id(pmcid, int(chunk_index) - i) ) expanded_ids.append(id) for i in range(1, n_after + 1): expanded_ids.append( self._generate_chunk_id(pmcid, int(chunk_index) + i) ) expanded_results[f"Match on {id}"] = self.vector_collection.get( ids=expanded_ids, )["documents"] cleaned_results = {} # append docs together two at a time, removing overlap for match, docs in expanded_results.items(): combined_result = "" # combined docs together if len(docs) == 0: combined_result = None elif len(docs) == 1: combined_result = docs[0] else: # combine first two docs, removing overlap, to # start the combined result substring_match = SequenceMatcher( None, docs[0], docs[1] ).find_longest_match(0, len(docs[0]), 0, len(docs[1])) combined_docs = ( docs[0][: substring_match.a] + docs[1][substring_match.b :] ) combined_result += combined_docs # eat these first two docs if len(docs) >= 3: docs = docs[2:] else: docs = [] # continue eating the rest one by one while len(docs) >= 1: substring_match = SequenceMatcher(None, combined_result, docs[0]) substring_match = substring_match.find_longest_match( 0, len(combined_result), 0, len(docs[0]) ) combined_result = ( combined_result[: substring_match.a] + docs[0][substring_match.b :] ) # eat the processed doc if len(docs) >= 2: docs = docs[1:] else: docs = [] cleaned_results[match] = "..." + combined_result + "..." return cleaned_results
# --------------------END PAPER OBJECT SCHEMA-------------------------------