"""
Copyright 2021 Gabriele Pisciotta - ga.pisciotta@gmail.com
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted,
provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
OF THIS SOFTWARE.
"""
__author__ = "Gabriele Pisciotta"
import contextlib
import datetime
import sys
from typing import Union
import requests_cache
from oc_graphenricher.APIs import Crossref, ORCID, VIAF, WikiData
from oc_ocdm import Storer
from oc_ocdm.graph import GraphSet
from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource
from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent
from oc_ocdm.graph.graph_entity import GraphEntity
from oc_ocdm.prov import ProvSet
from tqdm import tqdm
from tqdm.contrib import DummyTqdmFile
[docs]class GraphEnricher:
"""
The GraphEnricher class is the one responsible to enrich all the entities in a given graph set compliant to
the OpenCitations Data Model (OCDM). You have to specify in input the graph set, the output file name of the
enriched graph and the provenance file name. It's also possible to specify a debug flag to get more details
about the enrichment process.
"""
def __init__(self,
g_set: GraphSet,
graph_filename: str = "enriched.rdf",
provenance_filename: str = "provenance.rdf",
info_dir: str = "",
debug: bool = False,
serialize_in_the_middle: bool = False):
"""
:param g_set: graph set to be enriched
:param graph_filename: file name of the enriched graph set that will be serialized
:param provenance_filename: file name of the provenance that will be serialized
:param info_dir: the path to the counters directory
:param debug: a bool flag to enable richer output
:param serialize_in_the_middle: a bool flag to enable the serialization each 50 Bibliographic Resources (BRs)
processed (the resulting file will be always overwritten, this may slow the whole process)
"""
requests_cache.install_cache('GraphEnricher_cache')
self.resp_agent = 'https://w3id.org/oc/meta/prov/pa/2'
self.crossref_api = Crossref()
self.orcid_api = ORCID()
self.viaf_api = VIAF()
self.wikidata_api = WikiData()
self.g_set = g_set
self.debug = debug
self.new_id_found = 0
self.graph_filename = graph_filename
self.provenance_filename = provenance_filename
self.info_dir = info_dir
self.serialize_in_the_middle = serialize_in_the_middle
[docs] def enrich(self) -> None:
""" The enricher iterates each BR contained in the graph set.
For each BR (avoiding issues and journals), get the list of the identifiers already
contained in the graph set and check if it already has a DOI, an ISSN and a Wikidata ID:
- If an ISSN is specified, it query Crossref to extract other ISSNs.
- If there's no DOI, it query Crossref to get one by means of all the other data extracted
- If there's no Wikidata ID, it query Wikidata to get one by means of all the other identifiers
Any new identifier found will be added to the BR.
Then, for each AR related to the BR, get the list of all the identifier already contained and:
- If doesn't have an ORCID, it query ORCID to get it
- If doesn't have a VIAF, it query VIAF to get it
- If doesn't have a Wikidata ID, it query Wikidata by means of all the other identifier to get one
- If the AR is related to a publisher, it query Crossref to get its ID by means of its DOI
Any new identifier found will be added to the AR.
In the end it will store a new graph set and its provenance.
NB: Even if it's not possible to have an identifier duplicated for the same entity, it's possible that in
the whole graph set you could find different identifiers that share the same schema and literal. For this
purpose, you should use the `instancematching` module after that you've enriched the graph set.
"""
br_enriched_counter = 0
with self.__std_out_err_redirect_tqdm() as orig_stdout:
progress_bar = tqdm(self.g_set.get_br(), file=orig_stdout, dynamic_ncols=True)
for br in progress_bar:
progress_bar.set_description(desc=f"New ID found: {self.new_id_found}")
br_enriched_counter += 1
if br_enriched_counter % 50 == 0 and self.serialize_in_the_middle:
gs_storer = Storer(self.g_set, output_format="nt11")
gs_storer.store_graphs_in_file(self.graph_filename, "")
if GraphEntity.iri_journal_issue in br.get_types() or GraphEntity.iri_journal_volume in br.get_types():
continue
authors = []
publisher_has_crossrefid = False
# Extract br's identifiers
has_doi = None
has_issn = []
has_wikidata = []
for i in br.get_identifiers():
if i.get_scheme() == br.iri_doi:
has_doi = i.get_literal_value()
elif i.get_scheme() == br.iri_issn:
has_issn.append(i.get_literal_value())
elif i.get_scheme() == br.iri_wikidata:
has_wikidata.append(i.get_literal_value())
# Get more ISSNs
if len(has_issn) > 0:
for issn in has_issn:
res = self.crossref_api.query_journal(issn)
if res:
for r in res:
# To avoid to add already present ISSNs
if r not in has_issn:
self._add_id(br, r, 'issn', "its ISSN {}".format(issn))
break
# If no DOI try to get it
if has_doi is None:
res = self.crossref_api.query(authors, br.get_title(), br.get_pub_date())
if res:
self._add_id(br, res, 'doi', "Crossref query")
has_doi = res
# If it hasn't a Wikidata ID, extract br's identifiers and search on wikidata for that IDs
if len(has_wikidata) == 0:
for i in br.get_identifiers():
if i.get_scheme() == br.iri_doi:
res = self.wikidata_api.query(i.get_literal_value(), 'doi')
if res:
self._add_id(br, res, 'wikidata', "its DOI".format(i.get_literal_value()))
break
elif i.get_scheme() == br.iri_issn:
res = self.wikidata_api.query(i.get_literal_value(), 'issn')
if res:
self._add_id(br, res, 'wikidata', "its ISSN {}".format(i.get_literal_value()))
break
elif i.get_scheme() == br.iri_pmid:
res = self.wikidata_api.query(i.get_literal_value(), 'pmid')
if res:
self._add_id(br, res, 'wikidata', "its PMID {}".format(i.get_literal_value()))
break
elif i.get_scheme() == br.iri_pmcid:
res = self.wikidata_api.query(i.get_literal_value(), 'pmcid')
if res:
self._add_id(br, res, 'wikidata', "its PMCID {}".format(i.get_literal_value()))
break
for ar in br.get_contributors():
role = ar.get_role_type()
ra: ResponsibleAgent = ar.get_is_held_by()
# Extract Authors, with their info and their identifiers
if role == GraphEntity.iri_author:
authors.append((ra.get_given_name(), ra.get_family_name(), ra))
has_orcid = None
has_viaf = None
has_wikidata = None
author_id_found = []
for author_identifier in ra.get_identifiers():
if ra.iri_orcid in author_identifier.get_scheme():
has_orcid = author_identifier.get_literal_value()
author_id_found.append((author_identifier.get_literal_value(), 'orcid'))
if br.iri_viaf in author_identifier.get_scheme():
has_viaf = author_identifier.get_literal_value()
author_id_found.append((author_identifier.get_literal_value(), 'viaf'))
if br.iri_wikidata in author_identifier.get_scheme():
has_wikidata = author_identifier.get_literal_value()
if has_viaf is not None and has_orcid is not None and has_wikidata is not None:
break
if has_orcid is None:
res = self.orcid_api.query(
[(ra.get_given_name(), ra.get_family_name(), None, ra)],
[(x.get_scheme(), x.get_literal_value()) for x in br.get_identifiers()])
if res:
for res_tuple in res:
given_name, family_name, orcid, ra = res_tuple
if orcid is not None:
self._add_id(ra, orcid, 'orcid')
author_id_found.append((orcid, 'orcid'))
# Search for the author on Wikidata
if not has_viaf:
viaf = self.viaf_api.query(ra.get_given_name(), ra.get_family_name(), br.get_title())
if viaf is not None:
self._add_id(ra, viaf, 'viaf')
author_id_found.append((viaf, 'viaf'))
# If the author doesn't have Wikidata ID
if not has_wikidata:
for literal, scheme in author_id_found:
res = self.wikidata_api.query(literal, scheme)
if res:
self._add_id(ra, res, 'wikidata', "its {} {}".format(scheme.upper(), literal))
break
# Get Publisher and its identifiers
if role == GraphEntity.iri_publisher:
for publisher_id in ra.get_identifiers():
if GraphEntity.iri_crossref in publisher_id.get_scheme():
publisher_has_crossrefid = True
break
# If crossref-id not found, search it
if not publisher_has_crossrefid and has_doi is not None:
crossref_id = self.crossref_api.query_publisher(has_doi)
if crossref_id:
self._add_id(ra, crossref_id, 'crossref')
gs_storer = Storer(self.g_set, output_format="nt11")
gs_storer.store_graphs_in_file(self.graph_filename, "")
prov = ProvSet(self.g_set, self.g_set.base_iri, info_dir=self.info_dir)
prov.generate_provenance()
prov_storer = Storer(prov, output_format="nquads")
prov_storer.store_graphs_in_file(self.provenance_filename, "")
[docs] def _add_id(self, entity: Union[BibliographicResource, ResponsibleAgent], literal: str, schema: str,
by_means_of: str = None) -> None:
""" Method that let you add a new identifier to an entity,
having specified the literal value, the schema and optionally the API used
:param entity: a bibliographic resource or an agent role
:param literal: the literal value of the identifier
:param schema: the schema of the identifier
:param by_means_of: an optional string that let you specify the API used
"""
old_identifiers = entity.get_identifiers()
# Check if the ID is already associated to the entity
for i in old_identifiers:
if i.get_literal_value() == literal:
if self.debug:
print("Identifier {} already present".format(literal))
return
self.new_id_found += 1
new_id = self.g_set.add_id(self.resp_agent)
if schema == 'issn':
new_id.create_issn(literal)
elif schema == 'doi':
new_id.create_doi(literal)
elif schema == 'orcid':
new_id.create_orcid(literal)
elif schema == 'viaf':
new_id.create_viaf(literal)
elif schema == 'crossref':
new_id.create_crossref(literal)
elif schema == 'wikidata':
new_id.create_wikidata(literal)
entity.has_identifier(new_id)
if self.debug:
# To pretty print it with tqdm's write()
to_print = "[{}] FOUND {}: {}".format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'), schema,
literal)
if by_means_of is not None:
to_print += ", by means of {}".format(by_means_of)
print(to_print)
print("\tOLD: {}".format([x.get_literal_value() for x in old_identifiers]))
print("\tNEW : {}".format([x.get_literal_value() for x in entity.get_identifiers()]))
@contextlib.contextmanager
def __std_out_err_redirect_tqdm(self):
""" This method is used to print messages with the TQDM progress bar"""
orig_out_err = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = map(DummyTqdmFile, orig_out_err)
yield orig_out_err[0]
except Exception as exc:
raise exc
finally:
sys.stdout, sys.stderr = orig_out_err