Skip to content

Commit

Permalink
Made pipeline compatible with other ads pipelines (#9)
Browse files Browse the repository at this point in the history
* changed  to  and updated logging for compat

* changed settings to ref_extract_paths to avoid conflict with settings variable

* added graylog-friendly logging to all scripts

* renaming output files to copy over to proj for testing

---------

Co-authored-by: Mugdha Polimera <mugdhapolimera@dhcp-10-250-231-106.harvard.edu>
  • Loading branch information
mugdhapolimera and Mugdha Polimera committed Mar 19, 2024
1 parent d2e0fa8 commit ebcd36e
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 224 deletions.
113 changes: 73 additions & 40 deletions ads_ref_extract/classic_analytics.py
Expand Up @@ -8,13 +8,15 @@
import hashlib
import logging
import math
import os
from pathlib import Path
import subprocess
from typing import List, Optional, Set

from .config import Config
from .ref_extract_paths import Filepaths
from .resolver_cache import ResolverCache
from .utils import split_item_path
from adsputils import setup_logging, load_config

__all__ = [
"ClassicSessionAnalytics",
Expand All @@ -26,9 +28,14 @@
]

default_logger = logging.getLogger(__name__)
proj_home = os.path.realpath(os.path.join(os.path.dirname(__file__), '../'))
config = load_config(proj_home=proj_home)
ads_logger = setup_logging(__name__, proj_home=proj_home,
level=config.get('LOGGING_LEVEL', 'INFO'),
attach_stdout=config.get('LOG_STDOUT', False))


def _target_refs_for_session(extractrefs_out_path, reconstruct_targets, config, logger):
def _target_refs_for_session(extractrefs_out_path, reconstruct_targets, filepaths, logger, ads_logger):
"""
Yields sequence of ``(item_stem, item_ext, target_refs_path)``, eg:
Expand All @@ -40,14 +47,15 @@ def _target_refs_for_session(extractrefs_out_path, reconstruct_targets, config,
bits = line.strip().split()
if not bits:
logger.warn(f"unexpected empty line in `{extractrefs_out_path}`")
ads_logger.warn(f"unexpected empty line in `{extractrefs_out_path}`")
continue

item_stem, item_ext = split_item_path(bits[0])

if len(bits) < 2:
p = None
elif reconstruct_targets:
p = config.target_refs_base / (item_stem + ".raw")
p = filepaths.target_refs_base / (item_stem + ".raw")
else:
p = Path(bits[1])

Expand Down Expand Up @@ -151,8 +159,9 @@ def as_csv_row(self):

def analyze_session(
session_id,
config,
filepaths,
logger=default_logger,
ads_logger=ads_logger,
reconstruct_targets=False,
check_resolved=True,
):
Expand All @@ -172,7 +181,7 @@ def analyze_session(
session that you are analyzing.
"""
log_dir = config.classic_session_log_path(session_id)
log_dir = filepaths.classic_session_log_path(session_id)

# First: analyze items that were in the update

Expand All @@ -188,6 +197,7 @@ def analyze_session(
bits = line.strip().split()
if not bits:
logger.warn(f"unexpected empty line in `{fth_path}`")
ads_logger.warn(f"unexpected empty line in `{fth_path}`")
continue

n_items += 1
Expand All @@ -201,14 +211,17 @@ def analyze_session(
logger.warn(
f"unexpected Arxiv item source type `{item_ext}` in `{fth_path}`"
)
ads_logger.warn(
f"unexpected Arxiv item source type `{item_ext}` in `{fth_path}`"
)

if len(bits) > 3 and bits[3] == short_sid:
n_new += 1

# Next: analyze results of that update

tref_info = _target_refs_for_session(
log_dir / "extractrefs.out", reconstruct_targets, config, logger
log_dir / "extractrefs.out", reconstruct_targets, filepaths, logger, ads_logger
)
raw_paths = [t[2] for t in tref_info if t[2] is not None]
n_emitted = len(raw_paths)
Expand Down Expand Up @@ -247,11 +260,17 @@ def analyze_session(
logger.warn(
f"unexpected missing ref target file `{raw_path}` for Arxiv session `{session_id}`"
)
ads_logger.warn(
f"unexpected missing ref target file `{raw_path}` for Arxiv session `{session_id}`"
)
continue
except Exception as e:
logger.warn(
f"exception parsing ref target file `{raw_path}` for Arxiv session `{session_id}`: {e} ({e.__class__.__name__})"
)
ads_logger.warn(
f"exception parsing ref target file `{raw_path}` for Arxiv session `{session_id}`: {e} ({e.__class__.__name__})"
)
continue

# Resolved
Expand All @@ -269,6 +288,7 @@ def analyze_session(
bits = line.strip().split()
if not bits:
logger.warn(f"unexpected empty line in `{resolved_path}`")
ads_logger.warn(f"unexpected empty line in `{resolved_path}`")
continue

if bits[0] == b"1":
Expand All @@ -279,11 +299,17 @@ def analyze_session(
logger.warn(
f"unexpected missing ref resolved file `{resolved_path}` for Arxiv session `{session_id}`"
)
ads_logger.warn(
f"unexpected missing ref resolved file `{resolved_path}` for Arxiv session `{session_id}`"
)
continue
except Exception as e:
logger.warn(
f"exception parsing ref resolved file `{resolved_path}` for Arxiv session `{session_id}`: {e} ({e.__class__.__name__})"
)
ads_logger.warn(
f"exception parsing ref resolved file `{resolved_path}` for Arxiv session `{session_id}`: {e} ({e.__class__.__name__})"
)
continue

# All done
Expand All @@ -301,7 +327,7 @@ def analyze_session(


def compare_outcomes(
session_id, A_config, B_config, logger=default_logger, ignore_pdfonly: bool = False
session_id, A_config, B_config, logger=default_logger, ads_logger=ads_logger, ignore_pdfonly: bool = False
):
"""
Given two processing passes of a single Arxiv update session, generate
Expand All @@ -316,7 +342,7 @@ def compare_outcomes(
The ``session_id`` is a string session ID, something like ``"2021-11-07"``.
The ``A_config`` and ``B_config`` variables are Config objects that give
The ``A_config`` and ``B_config`` variables are Filepaths objects that give
path information about the files produced by the two processing runs.
"""

Expand All @@ -327,10 +353,10 @@ def compare_outcomes(
er2 = B_config.classic_session_log_path(session_id) / "extractrefs.out"

A_results = dict(
(t[0], t[1:]) for t in _target_refs_for_session(er1, True, A_config, logger)
(t[0], t[1:]) for t in _target_refs_for_session(er1, True, A_config, logger, ads_logger)
)
B_results = dict(
(t[0], t[1:]) for t in _target_refs_for_session(er2, True, B_config, logger)
(t[0], t[1:]) for t in _target_refs_for_session(er2, True, B_config, logger, ads_logger)
)

# Set up to deal with withdrawals. They're not failures, but they can't
Expand Down Expand Up @@ -411,7 +437,7 @@ def compare_outcomes(


def compare_refstrings(
session_id, A_config, B_config, show_diff=False, logger=default_logger
session_id, A_config, B_config, show_diff=False, logger=default_logger, ads_logger=ads_logger
):
"""
Given two processing passes of a single Arxiv update session, generate
Expand All @@ -428,7 +454,7 @@ def compare_refstrings(
The ``session_id`` is a string session ID, something like ``"2021-11-07"``.
The ``A_config`` and ``B_config`` variables are Config objects that give
The ``A_config`` and ``B_config`` variables are Filepaths objects that give
path information about the files produced by the two processing runs.
"""

Expand All @@ -439,10 +465,10 @@ def compare_refstrings(
er2 = B_config.classic_session_log_path(session_id) / "extractrefs.out"

A_results = dict(
(t[0], t[1:]) for t in _target_refs_for_session(er1, True, A_config, logger)
(t[0], t[1:]) for t in _target_refs_for_session(er1, True, A_config, logger, ads_logger)
)
B_results = dict(
(t[0], t[1:]) for t in _target_refs_for_session(er2, True, B_config, logger)
(t[0], t[1:]) for t in _target_refs_for_session(er2, True, B_config, logger, ads_logger)
)

# Set up to deal with withdrawals. They're not failures, but they can't
Expand Down Expand Up @@ -657,7 +683,7 @@ class ClassicSessionReprocessor(object):
image_name = None
"The name of the Docker image with the classic-style reference extractor."

config = None
filepaths = None
"The data path configuration."

logs_out_base = None
Expand All @@ -675,9 +701,9 @@ class ClassicSessionReprocessor(object):
extra_args: Optional[List[str]] = None
"Extra CLI arguments to pass to the processing program"

def __init__(self, config=None, image_name=None, logs_out_base=None):
if config is not None:
self.config = config
def __init__(self, filepaths=None, image_name=None, logs_out_base=None):
if filepaths is not None:
self.filepaths = filepaths

if image_name is not None:
self.image_name = image_name
Expand All @@ -688,11 +714,11 @@ def __init__(self, config=None, image_name=None, logs_out_base=None):
def _validate(self):
if self.image_name is None:
raise Exception("must set `image_name` before reprocessing")
if self.config is None:
raise Exception("must set `config` before reprocessing")
if str(self.config.target_refs_base).startswith("/proj/ads/"):
if self.filepaths is None:
raise Exception("must set `filepaths` before reprocessing")
if str(self.filepaths.target_refs_base).startswith("/proj/ads/"):
raise Exception(
f"refusing to reprocess into target ref basedir `{self.config.target_refs_base}`"
f"refusing to reprocess into target ref basedir `{self.filepaths.target_refs_base}`"
)
if self.logs_out_base is None:
raise Exception("must set `logs_out_base` before reprocessing")
Expand All @@ -716,14 +742,14 @@ def reprocess(self, session_id):
self._validate()

# The *input* log directory, which we use to know what items to process,
# is derived from `config.logs_base`. This is not the same thing as
# is derived from `filepaths.logs_base`. This is not the same thing as
# `self.logs_out_base`, where the pipeline log files should land. We
# have to get the session's correct log directory (which may be in a
# year-based subdirectory) and make sure to mount it into the container
# so that the pipeline can actually access it. The in-container filename
# has to be in a directory whose name is `session_id` since the pipeline
# code infers the session ID from that name.
log_dir = self.config.classic_session_log_path(session_id)
log_dir = self.filepaths.classic_session_log_path(session_id)

argv = [
"docker",
Expand All @@ -732,7 +758,7 @@ def reprocess(self, session_id):
"--name",
f"arxiv_refextract_repro_{session_id}",
"-v",
f"{self.config.fulltext_base}:/fulltext:ro,Z",
f"{self.filepaths.fulltext_base}:/fulltext:ro,Z",
"-v",
f"{log_dir}:/input_logs/{session_id}:ro,Z",
]
Expand All @@ -746,7 +772,7 @@ def reprocess(self, session_id):

argv += [
"-v",
f"{self.config.target_refs_base}:/{spfx}/results/testing/references/sources:rw,Z",
f"{self.filepaths.target_refs_base}:/{spfx}/results/testing/references/sources:rw,Z",
"-v",
f"{self.logs_out_base}:/{spfx}/logs:rw,Z",
"-e",
Expand Down Expand Up @@ -775,7 +801,7 @@ def reprocess(self, session_id):
subprocess.check_call(argv, shell=False, close_fds=True)


def _maybe_load_raw_file(path, logger) -> Set[str]:
def _maybe_load_raw_file(path, logger, ads_logger) -> Set[str]:
MAX_RS_LEN = 512
refstrings = set()

Expand All @@ -796,6 +822,9 @@ def _maybe_load_raw_file(path, logger) -> Set[str]:
logger.debug(
f'truncating reference string "{rs[:20]}..." in `{path}`'
)
ads_logger.debug(
f'truncating reference string "{rs[:20]}..." in `{path}`'
)
rs = rs[:MAX_RS_LEN]

refstrings.add(rs)
Expand Down Expand Up @@ -837,11 +866,12 @@ def md5(text: str) -> bytes:

def compare_resolved(
session_id: str,
A_config: Config,
B_config: Config,
A_config: Filepaths,
B_config: Filepaths,
rcache: ResolverCache,
max_resolves: Optional[int] = None,
logger=default_logger,
ads_logger=default_logger,
**kwargs,
):
"""
Expand All @@ -865,10 +895,10 @@ def compare_resolved(
er2 = B_config.classic_session_log_path(session_id) / "extractrefs.out"

A_results = dict(
(t[0], t[1:]) for t in _target_refs_for_session(er1, True, A_config, logger)
(t[0], t[1:]) for t in _target_refs_for_session(er1, True, A_config, logger, ads_logger)
)
B_results = dict(
(t[0], t[1:]) for t in _target_refs_for_session(er2, True, B_config, logger)
(t[0], t[1:]) for t in _target_refs_for_session(er2, True, B_config, logger, ads_logger)
)

stems = set(A_results.keys())
Expand Down Expand Up @@ -900,8 +930,8 @@ def source():
A_ext, A_path = A_results.get(stem, (None, None))
B_ext, B_path = B_results.get(stem, (None, None))

A_refstrings = _maybe_load_raw_file(A_path, logger)
B_refstrings = _maybe_load_raw_file(B_path, logger)
A_refstrings = _maybe_load_raw_file(A_path, logger, ads_logger)
B_refstrings = _maybe_load_raw_file(B_path, logger, ads_logger)

A_uniques[stem] = A_refstrings - B_refstrings
B_uniques[stem] = B_refstrings - A_refstrings
Expand Down Expand Up @@ -936,7 +966,9 @@ def source():
logger.warn(
f"stopping at {len(results)} items (out of {len(stems)}) to keep number of resolutions below {max_resolves}"
)

ads_logger.warn(
f"stopping at {len(results)} items (out of {len(stems)}) to keep number of resolutions below {max_resolves}"
)
# Resolve all the things!

resolved = rcache.resolve(to_resolve, **kwargs)
Expand Down Expand Up @@ -976,16 +1008,17 @@ def source():

def compare_item_resolutions(
stem: str,
A_config: Config,
B_config: Config,
A_config: Filepaths,
B_config: Filepaths,
rcache: ResolverCache,
logger=default_logger,
ads_logger=default_logger
):
A_path = A_config.target_refs_base / (stem + ".raw")
B_path = B_config.target_refs_base / (stem + ".raw")
A_path = A_config.target_refs_base / (stem + "_pipeline.raw")
B_path = B_config.target_refs_base / (stem + "_pipeline.raw")

A_refstrings = _maybe_load_raw_file(A_path, logger)
B_refstrings = _maybe_load_raw_file(B_path, logger)
A_refstrings = _maybe_load_raw_file(A_path, logger, ads_logger)
B_refstrings = _maybe_load_raw_file(B_path, logger, ads_logger)

resolved = rcache.resolve(A_refstrings | B_refstrings)

Expand Down

0 comments on commit ebcd36e

Please sign in to comment.