"""
Copyright 2021 Gabriele Pisciotta - ga.pisciotta@gmail.com
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted,
provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
OF THIS SOFTWARE.
"""
__author__ = 'Gabriele Pisciotta'
import json
import os
import re
import sys
import time
import unicodedata
from abc import ABC, abstractmethod
from time import sleep
from urllib.parse import quote
import Levenshtein
import requests
import requests_cache
from oc_ocdm.graph.graph_entity import GraphEntity
from requests.exceptions import ReadTimeout, ConnectTimeout
[docs]class QueryInterface(ABC):
"""
This class is a sort of interface that you can implement in your own class
"""
def __init__(self):
requests_cache.install_cache('GraphEnricher_cache')
[docs] @abstractmethod
def query(self, entity):
raise NotImplementedError
[docs]class VIAF(QueryInterface):
"""
This class let you extract the VIAF of an author, by querying the viaf.org API
"""
def __init__(self):
super().__init__()
self.headers = {
"User-Agent": "GraphEnricher (via OpenCitations - http://opencitations.net; mailto:contact@opencitations.net)",
"Accept": "application/json"}
self.api_url = 'http://www.viaf.org/viaf/search?local.title+all+"{}"&query=local.names+all+"{}"&sortKeys=holdingscount&recordSchema=BriefVIAF'
[docs] def query(self, given_name: str, family_name: str, title: str):
"""
Having specified the author's names and the title of a paper, extract a VIAF
:param given_name: author's given name
:param family_name: author's family name
:param title: paper's title
:return: VIAF, if exists, otherwise None
"""
try:
name = f"{given_name} {family_name}".strip()
query = self.api_url.format(quote(title), quote(name))
r_cr = requests.get(query, headers=self.headers, timeout=60)
hdrs_cr = r_cr.headers
try:
r = r_cr.json()
if int(r['searchRetrieveResponse']['numberOfRecords']) != 1:
return None
else:
return r['searchRetrieveResponse']['records'][0]['record']['recordData']['viafID']['#text']
except Exception as ex1:
if hdrs_cr["content-type"] == 'text/plain' or hdrs_cr["content-type"] == 'text/html':
r = r_cr.text
if "503" in r:
time.sleep(5.0)
solution = self.query(given_name, family_name, title)
return solution
else:
print("[GraphEnricher-VIAF]:" + repr(ex1) + "__" + query + "__" + r)
else:
print(
"[GraphEnricher-VIAF]:" + repr(ex1) + "__" + query + "__" + hdrs_cr["content-type"])
except Exception as ex0:
if "ConnectTimeout" in repr(ex0):
print("[GraphEnricher-Crossref]:" + repr(ex0) + "__" + query)
time.sleep(5.0)
solution = self.query(given_name, family_name, title)
return solution
[docs]class WikiData(QueryInterface):
"""
This class let you query WikiData by means of another identifier, in order to check the existance of a related
entity on WikiData
"""
def __init__(self):
super().__init__()
self.headers = {
"User-Agent": "GraphEnricher (via OpenCitations - http://opencitations.net; mailto:contact@opencitations.net)",
"Accept": "application/json"}
self.api_url = 'https://query.wikidata.org/sparql'
self.base_query = '''
SELECT ?item WHERE {{
?item p:{property} ?x.
?x ps:{property} "{literal}".
}} LIMIT 1
'''
self.doi_property = "P356"
self.issn_property = "P236"
self.orcid_property = "P496"
self.viaf_property = "P214"
self.pmid_property = "P698"
self.pmcid_property = "P932"
[docs] def query(self, entity: str, schema: str):
"""
Method to query WikiData, given the literal of an identifier and its schema
:param entity: the literal of the given identifier
:param schema: the schema of the given identifier
:return: Wikidata ID if found, otherwise None
"""
if schema == 'doi':
query = self.base_query.format(property=self.doi_property, literal=entity.upper())
elif schema == 'issn':
query = self.base_query.format(property=self.issn_property, literal=entity)
elif schema == 'orcid':
query = self.base_query.format(property=self.orcid_property, literal=entity)
elif schema == 'viaf':
query = self.base_query.format(property=self.viaf_property, literal=entity)
elif schema == 'pmid':
query = self.base_query.format(property=self.pmid_property, literal=entity)
elif schema == 'pmcid':
query = self.base_query.format(property=self.pmcid_property, literal=entity)
r = requests.get(self.api_url, headers=self.headers, timeout=60, params={'format': 'json', 'query': query})
headers = r.headers
try:
data = r.json()
return data['results']['bindings'][0]['item']['value'].split("/")[-1]
except IndexError:
return None
except Exception as ex1:
if headers["content-type"] == 'text/plain' or headers["content-type"] == 'text/html':
r = r.text
if "503" in r:
time.sleep(5.0)
solution = self.query(entity, schema)
return solution
else:
# ex1.with_traceback()
print("[GraphEnricher-WikiData]:" + repr(ex1) + "__" + query + "__" + r)
else:
# ex1.with_traceback()
print("[GraphEnricher-WikiData]:" + repr(ex1) + "__" + query + "__" + headers["content-type"])
[docs]class Crossref(QueryInterface):
"""
This class let you query Crossref in order to extract DOIs, ISSNs and publishers' IDs
"""
def __init__(self,
crossref_min_similarity_score=0.95,
max_iteration=6,
sec_to_wait=10,
headers={"User-Agent": "GraphEnricher (via OpenCitations - http://opencitations.net; "
"mailto:contact@opencitations.net)"},
timeout=30,
is_json=True):
super().__init__()
self.max_iteration = max_iteration
self.sec_to_wait = sec_to_wait
self.headers = headers
self.timeout = timeout
self.is_json = is_json
self.crossref_min_similarity_score = crossref_min_similarity_score
self.__crossref_doi_url = 'https://api.crossref.org/works/'
self.__crossref_entry_url = 'https://api.crossref.org/works?query.bibliographic='
self.__crossref_journal_url = 'https://api.crossref.org/journals/'
with open(os.path.join(str(__file__).replace("__init__.py", ""), "stopwords-it.txt"),
'rt', encoding='utf-8') as f:
self.stoplist = set([line.strip() for line in f])
[docs] def _cleaning_title(self, title: str):
""" Clean a given title, filtering the words according to a stoplist
and extracting a subset of the keywords
:param title: the title string
:return: the cleaned title
"""
n = 4
keywords = [w for w in title.split(" ") if w not in self.stoplist]
keywords = " ".join(keywords[:n])
return keywords
[docs] @staticmethod
def _cleaning_name(name_raw: str):
""" Clean the name of an author
:param name_raw: the name string
:return: the cleaned name
"""
name_clean = u"".join([c for c in unicodedata.normalize("NFKD", name_raw) if not unicodedata.combining(c)])
name_clean = name_clean.lower()
name_clean = re.sub(r"[^\w\d\s]", "", name_clean)
return name_clean
[docs] def query_journal(self, issn: str):
""" Query Crossref to get a list of any other ISSN known, related to an entity described by an ISSN to give
in input. The list of ISSNs retur will be cleaned from the ISSN already known.
:param issn: the ISSN of the bibliographic entity
:return: a list that contains any other ISSN found, otherwise an empty list
"""
query = self.__crossref_journal_url + issn
try:
r_cr = requests.get(query, headers=self.headers, timeout=60)
hdrs_cr = r_cr.headers
try:
r = r_cr.json()
if r["message"]["ISSN"]:
new_issn = r["message"]["ISSN"]
if issn in new_issn:
new_issn.remove(issn)
return new_issn
except Exception as ex1:
if hdrs_cr["content-type"] == 'text/plain' or hdrs_cr["content-type"] == 'text/html':
r = r_cr.text
if "Resource not found" in r:
return None
if "503" in r:
time.sleep(5.0)
solution = self.query_journal(issn)
return solution
else:
# ex1.with_traceback()
print("[GraphEnricher-Crossref]:" + repr(ex1) + "__" + query + "__" + r)
else:
# ex1.with_traceback()
print("[GraphEnricher-Crossref]:" + repr(ex1) + "__" + query + "__" + hdrs_cr["content-type"])
except Exception as ex0:
# ex0.with_traceback()
if "ConnectTimeout" in repr(ex0):
print("[GraphEnricher-Crossref]:" + repr(ex0) + "__" + query)
time.sleep(5.0)
solution = self.query_journal(issn)
return solution
[docs] def query_publisher(self, doi:str):
""" Method to extract the identifier of a publisher starting from a given DOI.
:param doi: the DOI of the paper
:return: a string representing the ID of the publisher, otherwise None
"""
url_cr = self.__crossref_doi_url + doi
try:
r_cr = requests.get(url_cr, headers=self.headers, timeout=60)
hdrs_cr = r_cr.headers
try:
r = r_cr.json()
if "message" in r and "member" in r["message"]:
return r["message"]["member"]
except Exception as ex1:
# ex1.with_traceback()
if hdrs_cr["content-type"] == 'text/plain' or hdrs_cr["content-type"] == 'text/html':
r = r_cr.text
if "503" in r:
time.sleep(5.0)
solution = self.query_publisher(doi)
return solution
else:
print("[GraphEnricher-Crossref-publisher]:" + repr(ex1) + "__" + url_cr + "__" + r)
else:
print("[GraphEnricher-Crossref-publisher]:" + repr(ex1) + "__" + url_cr + "__" + hdrs_cr[
"content-type"])
except Exception as ex0:
# ex0.with_traceback()
if "ConnectTimeout" in repr(ex0):
print("[GraphEnricher-Crossref-publisher]:" + repr(ex0) + "__" + url_cr)
time.sleep(5.0)
solution = self.query_publisher(doi)
return solution
[docs] def query(self, fullnames: list, title: str, year: str):
"""
Method to extract the DOI, given the names of the authors, the title of the paper and the year of publication
:param fullnames: a list composed of a tuple of <name, family_name> (e.g.: [ ("Gabriele", "Pisciotta") ]
:param title: the title of the paper
:param year: a string that represent the year of publication
:return: the DOI found, otherwise None
"""
keywords = self._cleaning_title(title)
query = f"query.bibliographic={keywords}"
exist_author = False
if fullnames is not None:
for fullname in fullnames:
if isinstance(fullname, str):
surname = self._cleaning_name(fullname[0].split(" ")[-1])
name = self._cleaning_name(fullname[1].split(" ")[0])
else:
surname = ""
name = ""
separator = ""
if fullname[0] is not None:
name += fullname[0].lower()
separator = " "
if fullname[1] is not None:
surname += fullname[1].lower()
exist_author = True
query += f"&query.author={name}{separator}{surname}"
query += f"&rows=4&select=DOI,title,author,issued"
url_cr = f"https://api.crossref.org/works?{query}"
try:
r_cr = requests.get(url_cr, headers=self.headers, timeout=60)
hdrs_cr = r_cr.headers
try:
r = r_cr.json()
possible = []
if "message" in r and "items" in r["message"]:
if r["message"]["items"]:
idx = 0
while idx < len(r["message"]["items"]):
point_year = 0
point_authors = 0
point_title = 0
if year is not None:
if "-" in str(year):
year_tokens = str(year).split("-")
for element_of_year in year_tokens:
if len(element_of_year) == 4:
year = int(element_of_year)
break
year = int(year)
if "issued" in r["message"]["items"][idx].keys():
if "date-parts" in r["message"]["items"][idx]["issued"].keys():
if r["message"]["items"][idx]["issued"]["date-parts"][0][0] is not None:
paper_year = int(r["message"]["items"][idx]["issued"]["date-parts"][0][0])
if paper_year == year:
point_year += 3
if exist_author:
if "author" in r["message"]["items"][idx].keys():
for n in r["message"]["items"][idx]["author"]:
if "family" in n.keys():
if "given" in n.keys():
if n["family"].lower() == surname and n["given"].lower() == name:
point_authors += 2
elif n["family"].lower() == surname and n["given"].lower()[0] == name[
0]:
point_authors += 1
elif n["family"].lower() == surname:
point_authors += 1
if "title" in r["message"]["items"][idx].keys():
title_pub = r["message"]["items"][idx]["title"][0].lower()
point_title = Levenshtein.ratio(title, title_pub)
possible.append((point_title, point_authors, point_year, idx))
idx += 1
sort = sorted(possible)
if sort[-1][0] > 0.8:
if exist_author and sort[-1][1] < 1:
return None
# if year is not None and sort[-1][2] < 1:
# return None
res = r["message"]["items"][sort[-1][3]]
return res["DOI"]
except Exception as ex1:
# ex1.with_traceback()
if hdrs_cr["content-type"] == 'text/plain' or hdrs_cr["content-type"] == 'text/html':
r = r_cr.text
if "503" in r:
time.sleep(5.0)
solution = self.query(fullnames, title, year)
return solution
else:
print("[GraphEnricher-Crossref-std1]:" + repr(ex1) + "__" + url_cr + "__" + r)
else:
print("[GraphEnricher-Crossref-std2]:" + repr(ex1) + "__" + url_cr + "__" + hdrs_cr["content-type"])
except Exception as ex0:
# ex0.with_traceback()
if "ConnectTimeout" in repr(ex0):
print("[GraphEnricher-Crossref]:" + repr(ex0) + "__" + url_cr)
time.sleep(5.0)
solution = self.query(fullnames, title, year)
return solution
[docs]class ORCID(QueryInterface):
"""
This class let you query ORCID in order to extract ORCID IDs
"""
def __init__(self,
max_iteration=6,
sec_to_wait=10,
headers={"User-Agent": "GraphEnricher (via OpenCitations - http://opencitations.net; "
"mailto:contact@opencitations.net)",
"Content-Type": "application/json"},
timeout=30,
repok=None,
reperr=None,
is_json=True):
super().__init__()
self.max_iteration = max_iteration
self.sec_to_wait = sec_to_wait
self.headers = headers
self.timeout = timeout
self.is_json = is_json
self.__orcid_api_url = 'https://pub.orcid.org/v2.1/search?q='
self.__personal_url = "https://pub.orcid.org/v2.1/%s/personal-details"
[docs] def query(self, authors: list, identifiers: list):
"""
Given a list of authors and a list of identifiers, returns the ORCIDs in the list of authors
:param authors: a list of tuples in the following form [ (name, family_name, ORCID, ar_object) ]
:param identifiers: a list of identifiers of the bibliographic resource
:return: the authors list enriched with the ORCID identifier
"""
to_return = {}
if len(identifiers) == 0:
return None
returned_orcids = 0
records = self._get_orcid_records(identifiers, authors)
if records is not None:
for orcid_id in self.__dict_get(records, ["result", "orcid-identifier", "path"]):
personal_details = self.__get_data(self.__personal_url % orcid_id.upper())
if personal_details is not None:
given_name = self.__dict_get(personal_details, ["name", "given-names", "value"])
family_name = self.__dict_get(personal_details, ["name", "family-name", "value"])
for a in authors:
if a[2] is None:
if to_return.get((a[0], a[1])) is None and a[1] is not None and family_name is not None:
if a[1].lower() in family_name:
to_return[(a[0], a[1])] = orcid_id.upper()
if a[0] is not None and given_name is not None:
if a[0].lower() in given_name:
to_return[(a[0], a[1])] = orcid_id.upper()
authors_to_return = []
for a in authors:
orcid = to_return.get((a[0], a[1]))
if orcid is not None:
returned_orcids += 1
authors_to_return.append((a[0], a[1], orcid, a[3]))
return authors_to_return
[docs] def _get_orcid_records(self, identifiers: list, family_names: list =[]):
cur_query = ""
i_counter = 0
for i in identifiers:
if i[0] == GraphEntity.iri_doi:
if i[1] is None:
continue
if i_counter == 0:
cur_query += "("
if i_counter >= 1:
cur_query += " OR "
doi_string = i[1]
cur_query += "doi-self:\"%s\"" % doi_string
doi_string_l = doi_string.lower()
doi_string_u = doi_string.upper()
if doi_string_l != doi_string or doi_string_u != doi_string:
if doi_string_l != doi_string:
cur_query += " OR doi-self:\"%s\"" % doi_string_l
if doi_string_u != doi_string:
cur_query += " OR doi-self:\"%s\"" % doi_string_u
elif i[0] == GraphEntity.iri_isbn:
if i_counter == 0:
cur_query += "("
if i_counter >= 1:
cur_query += " OR "
isbn_string = i[1]
cur_query += "isbn:\"%s\"" % isbn_string
elif i[0] == GraphEntity.iri_pmid:
if i_counter == 0:
cur_query += "( "
if i_counter >= 1:
cur_query += " OR "
pmid_string = i[1]
cur_query += "pmid-self:\"%s\"" % pmid_string
else:
continue
i_counter += 1
if i_counter > 0:
cur_query += ") "
if family_names:
first_name = True
for idx, full_name in enumerate(family_names):
family_name = full_name[1]
given_names = full_name[0]
if family_name is not None:
if first_name:
first_name = False
if len(identifiers) and cur_query != "":
cur_query += "AND ("
elif cur_query != "":
cur_query += " OR "
if family_name:
cur_query += "family-name:\"%s\"" % \
unicodedata.normalize('NFKD', "" + family_name). \
encode("ASCII", "ignore").decode("utf-8")
if given_names:
cur_query += " AND "
cur_query += "given-names:\"%s\"" % \
unicodedata.normalize('NFKD', "" + given_names). \
encode("ASCII", "ignore").decode("utf-8")
# close query if has started with the doi thing
if len(identifiers):
cur_query += ")"
if cur_query != "":
self.__last_query_done = self.__orcid_api_url + quote(cur_query)
returned_data = self.__get_data(self.__orcid_api_url + quote(cur_query))
return returned_data
else:
return None
def __dict_get(self, d, key_list):
if key_list:
if type(d) is dict:
k = key_list[0]
if k in d:
return self.__dict_get(d[k], key_list[1:])
else:
return None
elif type(d) is list:
result = []
for item in d:
value = [self.__dict_get(item, key_list)]
if value is not None:
result += value
return result
else:
return None
else:
return d.lower()
@staticmethod
def __dict_add(d):
result = {}
for k in d:
value = d[k]
if value is not None:
result[k] = value
return result
def __get_data(self, get_url):
"""
Method to send requests
:param get_url: the URL to query
:return: results if found, otherwise None
"""
tentative = 0
error_no_200 = False
error_read = False
error_connection = False
error_generic = False
errors = []
while tentative < self.max_iteration:
if tentative != 0:
sleep(self.sec_to_wait)
tentative += 1
try:
response = requests.get(get_url, headers=self.headers, timeout=self.timeout)
if response.status_code == 200:
if self.is_json:
return json.loads(response.text)
else:
return response.text
else:
err_string = "We got an HTTP error when retrieving data (HTTP status code: %s)." % \
str(response.status_code)
if not error_no_200:
error_no_200 = True
if response.status_code == 404:
# print(err_string + " However, the process could continue anyway.")
# If the resource has not found, we can break the process immediately,
# by returning None so as to allow the callee to continue (or not) the process
return None
else:
errors += [err_string]
except ReadTimeout as e:
if not error_read:
error_read = True
errors += ["A timeout error happened when reading results from the API "
"when retrieving data. %s" % e]
except ConnectTimeout as e:
if not error_connection:
error_connection = True
errors += ["A timeout error happened when connecting to the API "
"when retrieving data. %s" % e]
except Exception:
if not error_generic:
error_generic = True
errors += ["A generic error happened when trying to use the API "
"when retrieving data. %s" % sys.exc_info()[0]]
# If the process comes here, no valid result has been returned
print(" | ".join(errors) + "\n\tRequested URL: " + get_url)