Skip to main content

doiget_core/
provenance.rs

1//! JSON Lines + SHA-256 hash-chained provenance log.
2//!
3//! Binding spec: `docs/PROVENANCE_LOG.md` (NORMATIVE, §3 row schema, §4 hash
4//! chain). Failure semantics: **fail-closed** — callers MUST abort the fetch
5//! if a log write returns `Err`. See `docs/SECURITY.md` §1.8 and ADR-0006.
6//!
7//! # On-disk format
8//!
9//! - JSON Lines (`.jsonl`): one JSON object per line, terminated by `\n` (LF).
10//! - UTF-8. Timestamps are RFC3339 in UTC.
11//! - Each row is appended via a single `write_all` whose payload always ends
12//!   in `\n`, so a partially-written row is detectable as a missing trailing
13//!   newline rather than a torn JSON record.
14//! - In audit-grade mode (the only mode shipped here), the writer flushes the
15//!   `BufWriter` and `fsync`s the file after every row.
16//!
17//! # Hash chain (PROVENANCE_LOG.md §4)
18//!
19//! Each row carries a `prev_hash` and a `this_hash`. The first row's
20//! `prev_hash` is the literal string `"GENESIS"`. Every subsequent row's
21//! `prev_hash` MUST equal the previous row's `this_hash`.
22//!
23//! When a log file rotates (§6 — not yet implemented in this crate; see TODO
24//! below), the first row of the NEW log file also uses `prev_hash =
25//! "GENESIS"`, restarting the chain.
26//!
27//! `this_hash` is computed as:
28//!
29//! ```text
30//! this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
31//! ```
32//!
33//! where `canonical_json` is **compact JSON (no whitespace) with object keys
34//! sorted lexicographically** (PROVENANCE_LOG.md §4). For a row with fields
35//! `{ts: "...", ts_seq: 1, event: "fetch", ...}`, the canonical bytes begin
36//! with `{"capability":...` because `capability` is the lex-first top-level
37//! key. Downstream `doiget audit-log --verify` (Phase 1+) relies on this
38//! exact rule — do not change the canonicalization without bumping the spec.
39//!
40//! # In-process serialization
41//!
42//! `ProvenanceLog` holds a `Mutex<LogState>`. All `append` calls within the
43//! same process serialize on this mutex, satisfying the "process-local mutex
44//! on log appender" requirement of `docs/SECURITY.md` §1.8. Cross-process
45//! coordination (multiple `doiget` invocations) is out of scope here and
46//! handled by the higher-level `flock`-based store layer.
47//!
48//! # Session id
49//!
50//! `session_id` (PROVENANCE_LOG.md §3) is a 26-char ULID generated **once per
51//! process invocation** by the caller and stamped into every row written
52//! through the resulting [`ProvenanceLog`]. This crate does not generate the
53//! ULID itself — see [`ProvenanceLog::open`] for the contract.
54//!
55//! # Log rotation and retention (§6)
56//!
57//! Implemented (PROVENANCE_LOG.md §6): when `access.log` exceeds
58//! `ROTATE_BYTES` (100 MiB) a subsequent [`ProvenanceLog::append`]
59//! gzip-compresses the full file to `access.log.<YYYY-MM-DD-HHMMSS>.gz`,
60//! removes the old `access.log`, and writes the incoming row as the
61//! first row of a fresh file with `prev_hash = "GENESIS"` (the hash
62//! chain **restarts** per segment — segments are NOT linked). Rotation
63//! is fail-closed: any gzip / rename / unlink failure aborts the
64//! `append` (the caller's fetch aborts) so the chain never silently
65//! skips. At [`ProvenanceLog::open`], rotated `.gz` segments older than
66//! the retention window (`DOIGET_LOG_RETENTION_DAYS`, default 90; `0`
67//! disables) are deleted **best-effort** (a prune failure is logged,
68//! not fatal — pruning is housekeeping, not integrity).
69//! [`verify_all`] verifies the current file plus every rotated `.gz`
70//! segment (each its own GENESIS-rooted chain).
71
72use std::collections::BTreeMap;
73use std::fs::{File, OpenOptions};
74use std::io::{BufRead, BufReader, BufWriter, Write};
75use std::sync::Mutex;
76
77use flate2::read::GzDecoder;
78use flate2::write::GzEncoder;
79use flate2::Compression;
80
81use camino::{Utf8Path, Utf8PathBuf};
82use chrono::{DateTime, Utc};
83use serde::{Deserialize, Serialize};
84use sha2::{Digest, Sha256};
85
86/// One row of the provenance log (PROVENANCE_LOG.md §3).
87///
88/// The on-disk wire field names match the spec table; struct-field order is
89/// **not** load-bearing for the hash because canonicalization sorts keys
90/// lexicographically (see PROVENANCE_LOG.md §4).
91///
92/// **Schema version**: this struct is the **v2** row shape (ADR-0024).
93/// Every v2 row carries `schema_version = "v2"` literally; the
94/// `canonical_digest` field carries the ADR-0021 §1 audit identity of
95/// the fetch on rows where one applies (`Fetch` / `Resolve` /
96/// `StoreWrite`) and is `None` on session bookend rows
97/// (`SessionStart` / `SessionEnd` / `CapabilityResolved`) that have no
98/// ref. v1 rows (pre-Slice-4) lack both fields and MUST be migrated via
99/// [`migrate_v1_to_v2`] before the v2 binary can read them — the
100/// `deny_unknown_fields` + non-defaulted `schema_version` shape ensures
101/// v1 rows fail to parse loudly rather than producing silent hash-chain
102/// mismatches.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(deny_unknown_fields)]
105pub struct LogRow {
106    /// RFC3339 UTC timestamp of the append (millisecond precision).
107    pub ts: DateTime<Utc>,
108    /// Per-session monotonic sequence number, starting at 1.
109    pub ts_seq: u64,
110    /// Event class (see [`LogEvent`]).
111    pub event: LogEvent,
112    /// Optional reference (DOI / arXiv id). Wire field name is `ref`.
113    #[serde(rename = "ref")]
114    pub ref_: Option<String>,
115    /// Optional source name (e.g. `unpaywall`).
116    pub source: Option<String>,
117    /// Result (see [`LogResult`]).
118    pub result: LogResult,
119    /// OA license string (`event=fetch`, `result=ok`); `None` otherwise.
120    pub license: Option<String>,
121    /// Bytes written / fetched, on success rows.
122    pub size_bytes: Option<u64>,
123    /// Path to the stored payload, relative to the store root
124    /// (`event=fetch`, `result=ok`); `None` otherwise.
125    pub store_path: Option<String>,
126    /// Capability under which the row was written (REQUIRED, every row).
127    pub capability: Capability,
128    /// 26-char ULID identifying the process invocation (REQUIRED).
129    pub session_id: String,
130    /// Stable error code on failure rows.
131    pub error_code: Option<String>,
132    /// Row schema version. Always [`LOG_SCHEMA_VERSION`] (`"v2"`) for
133    /// new rows written by this build (ADR-0024). v1 rows lack this
134    /// field; they MUST be migrated via [`migrate_v1_to_v2`] first.
135    pub schema_version: String,
136    /// Canonical-digest of the fetch's audit identity (ADR-0021 §1) as
137    /// 64 lowercase hex chars. Present on rows with a `ref` (`Fetch`,
138    /// `Resolve`, `StoreWrite`); `None` on session bookend rows. The
139    /// digest is computed from a [`crate::CanonicalRef`] whose
140    /// `resolver_profile` matches this row's `source` field for
141    /// migrated v1 rows; new v2 rows MAY pass an explicit
142    /// `resolver_profile` distinct from `source`.
143    pub canonical_digest: Option<String>,
144    /// 64 lowercase hex chars, OR the literal string `"GENESIS"` for the
145    /// first row of a fresh log file.
146    pub prev_hash: String,
147    /// 64 lowercase hex chars. SHA-256 of canonical JSON of THIS row with
148    /// the `this_hash` field removed. See module docs.
149    pub this_hash: String,
150}
151
152/// Provenance-log row schema version this build writes
153/// (`docs/PROVENANCE_LOG.md` §3, ADR-0024).
154///
155/// Bumped from `"v1"` (implicit; pre-Slice-4 rows had no
156/// `schema_version` field) to `"v2"` when the `canonical_digest` column
157/// landed. The v1→v2 migration is one-shot, idempotent, and dry-runnable
158/// via [`migrate_v1_to_v2`].
159pub const LOG_SCHEMA_VERSION: &str = "v2";
160
161/// Event class for a log row (PROVENANCE_LOG.md §3).
162///
163/// Note: result-status (`ok`/`err`/`denied`) lives in [`LogResult`], NOT in
164/// the event variant. So `Fetch` covers both successful and failed fetch
165/// attempts; the row's `result` distinguishes them.
166///
167/// `non_exhaustive` so adding new variants is non-breaking.
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
169#[serde(rename_all = "snake_case")]
170#[non_exhaustive]
171pub enum LogEvent {
172    /// Process started; first row of a new session.
173    SessionStart,
174    /// Capability resolution finished (allowed / denied / which env var).
175    CapabilityResolved,
176    /// Reference resolved to a fetch URL.
177    Resolve,
178    /// Fetch attempt (success or failure determined by `result`).
179    Fetch,
180    /// Store write attempt (success or failure determined by `result`).
181    StoreWrite,
182    /// Process ended cleanly.
183    SessionEnd,
184}
185
186/// Per-row outcome (PROVENANCE_LOG.md §3). `non_exhaustive` for forward
187/// compatibility.
188#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
189#[serde(rename_all = "snake_case")]
190#[non_exhaustive]
191pub enum LogResult {
192    /// The operation succeeded.
193    Ok,
194    /// The operation failed with an error.
195    Err,
196    /// The operation was denied (e.g. capability gate).
197    Denied,
198}
199
200/// Capability under which a row was written (PROVENANCE_LOG.md §3).
201///
202/// `kebab-case` serde rename emits `oa`, `metadata`, `tdm-elsevier`,
203/// `tdm-aps`, `tdm-springer` exactly as the spec requires. `non_exhaustive`
204/// for forward compatibility.
205#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
206#[serde(rename_all = "kebab-case")]
207#[non_exhaustive]
208pub enum Capability {
209    /// Open access tier.
210    Oa,
211    /// Metadata-only access.
212    Metadata,
213    /// Elsevier TDM (Tier 3, opt-in build).
214    TdmElsevier,
215    /// APS TDM (Tier 3, opt-in build).
216    TdmAps,
217    /// Springer TDM (Tier 3, opt-in build).
218    TdmSpringer,
219}
220
221/// Errors emitted by the provenance log writer. Callers MUST treat any
222/// variant as a fail-closed signal and abort the surrounding fetch.
223#[derive(Debug, thiserror::Error)]
224#[non_exhaustive]
225pub enum LogError {
226    /// I/O error opening, reading, writing, or syncing the log file. Includes
227    /// recovery-time corruption detection where the synthetic message is
228    /// `"corrupted log at line N: …"`.
229    #[error("provenance log io error: {0}")]
230    Io(#[from] std::io::Error),
231    /// Serialization of a row to canonical JSON failed.
232    #[error("provenance log serialization error: {0}")]
233    Serialize(#[from] serde_json::Error),
234    /// Path supplied to [`ProvenanceLog::open`] exists but is not a regular
235    /// file (e.g. a directory or symlink).
236    #[error("provenance log path is not a regular file: {0}")]
237    NotARegularFile(Utf8PathBuf),
238}
239
240/// Append-only writer with in-process serialization.
241#[derive(Debug)]
242pub struct ProvenanceLog {
243    path: Utf8PathBuf,
244    state: Mutex<LogState>,
245    session_id: String,
246    /// §6 rotation threshold, resolved ONCE at [`ProvenanceLog::open`]
247    /// (not per-`append`). Reading `DOIGET_LOG_ROTATE_BYTES` once at
248    /// open — rather than on every append — means a log opened without
249    /// the env set keeps the real 100 MiB threshold for its whole life
250    /// even if another (test) thread later mutates that process-global
251    /// env var; this removes a parallel-test race without serializing
252    /// every multi-append test. `0` = rotation disabled.
253    rotate_threshold: u64,
254}
255
256/// Mutable internal state, guarded by [`ProvenanceLog::state`].
257#[derive(Debug)]
258struct LogState {
259    /// `ts_seq` of the **next** row to be appended.
260    next_seq: u64,
261    /// 64 lowercase hex chars; [`GENESIS_HASH`] if the log is empty.
262    last_hash: String,
263}
264
265/// The genesis sentinel used as `prev_hash` for the first row of a log file
266/// (PROVENANCE_LOG.md §3, §6). Also written verbatim as the prev-hash of the
267/// first row after a log rotation (the chain restarts per segment).
268const GENESIS_HASH: &str = "GENESIS";
269
270/// Rotate `access.log` once it reaches this size (PROVENANCE_LOG.md §6:
271/// "100 MB"). 100 MiB. Overridable via the `DOIGET_LOG_ROTATE_BYTES`
272/// env var — an internal ops/testing knob (NOT a documented public
273/// surface): tests set it tiny to exercise rotation without writing
274/// 100 MiB; a value of `0` disables rotation.
275const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
276
277/// Default rotated-segment retention (PROVENANCE_LOG.md §6: "90 days").
278/// Overridable via `DOIGET_LOG_RETENTION_DAYS`; `0` disables pruning.
279const DEFAULT_RETENTION_DAYS: i64 = 90;
280
281/// Resolve the rotation threshold: `DOIGET_LOG_ROTATE_BYTES` if set and
282/// parseable, else [`ROTATE_BYTES`]. `0` (or unparsable) → returns the
283/// value as-is (`0` means "never rotate").
284fn rotate_threshold_bytes() -> u64 {
285    match std::env::var("DOIGET_LOG_ROTATE_BYTES") {
286        Ok(s) => s.trim().parse::<u64>().unwrap_or(ROTATE_BYTES),
287        Err(_) => ROTATE_BYTES,
288    }
289}
290
291/// Resolve retention days from `DOIGET_LOG_RETENTION_DAYS`
292/// (default [`DEFAULT_RETENTION_DAYS`]). `0` disables pruning. A
293/// negative / unparsable value falls back to the default with a warn.
294fn retention_days() -> i64 {
295    match std::env::var("DOIGET_LOG_RETENTION_DAYS") {
296        Ok(s) => match s.trim().parse::<i64>() {
297            Ok(n) if n >= 0 => n,
298            _ => {
299                tracing::warn!(
300                    value = %s,
301                    "DOIGET_LOG_RETENTION_DAYS is not a non-negative integer; \
302                     using the {DEFAULT_RETENTION_DAYS}-day default"
303                );
304                DEFAULT_RETENTION_DAYS
305            }
306        },
307        Err(_) => DEFAULT_RETENTION_DAYS,
308    }
309}
310
311/// gzip-compress `path` to `<file_name>.<YYYY-MM-DD-HHMMSS>.gz` (in the
312/// same directory) and unlink `path` (PROVENANCE_LOG.md §6).
313///
314/// Atomic & fail-closed: the gzip is written to a `.tmp`, fsynced, then
315/// `rename`d into place (so a partial `.gz` is never observable), and
316/// only then is the original removed. Every step propagates its error
317/// to the caller (`ProvenanceLog::append`), which is fail-closed — a
318/// rotation failure aborts the surrounding fetch. Crash safety: a crash
319/// after the rename but before the unlink leaves both the full `.gz`
320/// and the (over-size) `access.log`; the next `append` simply rotates
321/// again, producing a second independently-valid segment — wasteful but
322/// never lossy or corrupt.
323fn rotate_log(path: &Utf8Path) -> Result<(), LogError> {
324    let file_name = path.file_name().ok_or_else(|| {
325        LogError::Io(std::io::Error::other(
326            "provenance log path has no file name; cannot rotate",
327        ))
328    })?;
329    let ts = Utc::now().format("%Y-%m-%d-%H%M%S");
330    let gz_name = format!("{file_name}.{ts}.gz");
331    let dir = path.parent().unwrap_or_else(|| Utf8Path::new("."));
332    let gz_path = dir.join(&gz_name);
333    let tmp_path = dir.join(format!("{gz_name}.tmp"));
334
335    {
336        let mut src = File::open(path)?;
337        let tmp = File::create(&tmp_path)?;
338        let mut enc = GzEncoder::new(BufWriter::new(tmp), Compression::default());
339        std::io::copy(&mut src, &mut enc)?;
340        let bufw = enc.finish()?;
341        let tmp = bufw.into_inner().map_err(|e| {
342            LogError::Io(std::io::Error::other(format!(
343                "gz tmp buf flush failed: {}",
344                e.error()
345            )))
346        })?;
347        tmp.sync_all()?;
348    }
349    std::fs::rename(&tmp_path, &gz_path)?;
350    std::fs::remove_file(path)?;
351    Ok(())
352}
353
354/// Rotated `.gz` segments siblings of `current`, sorted ascending. The
355/// embedded `YYYY-MM-DD-HHMMSS` timestamp makes lexicographic order ==
356/// chronological order.
357fn rotated_segments(current: &Utf8Path) -> Vec<Utf8PathBuf> {
358    let Some(file_name) = current.file_name() else {
359        return Vec::new();
360    };
361    let dir = current.parent().unwrap_or_else(|| Utf8Path::new("."));
362    let prefix = format!("{file_name}.");
363    let mut segs: Vec<Utf8PathBuf> = match std::fs::read_dir(dir.as_std_path()) {
364        Ok(rd) => rd
365            .filter_map(|e| e.ok())
366            .filter_map(|e| Utf8PathBuf::from_path_buf(e.path()).ok())
367            .filter(|p| {
368                p.file_name()
369                    .map(|n| n.starts_with(&prefix) && n.ends_with(".gz"))
370                    .unwrap_or(false)
371            })
372            .collect(),
373        Err(_) => Vec::new(),
374    };
375    segs.sort();
376    segs
377}
378
379/// Delete rotated `.gz` segments older than `days` (PROVENANCE_LOG.md
380/// §6 retention). `days <= 0` is a no-op (disabled). **Best-effort**:
381/// pruning is housekeeping, not integrity, so any failure is logged and
382/// skipped — `ProvenanceLog::open` still succeeds.
383fn prune_rotated_segments(current: &Utf8Path, days: i64) {
384    if days <= 0 {
385        return;
386    }
387    let Some(cutoff) = std::time::SystemTime::now()
388        .checked_sub(std::time::Duration::from_secs(days as u64 * 86_400))
389    else {
390        return;
391    };
392    for seg in rotated_segments(current) {
393        let aged = std::fs::metadata(seg.as_std_path())
394            .and_then(|m| m.modified())
395            .map(|mt| mt < cutoff)
396            .unwrap_or(false);
397        if !aged {
398            continue;
399        }
400        match std::fs::remove_file(seg.as_std_path()) {
401            Ok(()) => tracing::info!(
402                segment = %seg,
403                "provenance: pruned rotated segment past retention"
404            ),
405            Err(e) => tracing::warn!(
406                segment = %seg, error = %e,
407                "provenance: failed to prune rotated segment (best-effort; continuing)"
408            ),
409        }
410    }
411}
412
413/// Verify the full provenance history: every rotated `.gz` segment
414/// (oldest→newest) followed by the current `access.log`. Each segment
415/// is its own GENESIS-rooted hash chain (segments are deliberately NOT
416/// linked across a rotation, PROVENANCE_LOG.md §6), so they are
417/// verified independently and reported per-segment.
418///
419/// The audited [`verify`] function itself is unchanged; this only
420/// orchestrates it over the segment set (gunzipping each `.gz` to a
421/// tempfile first).
422///
423/// # Errors
424///
425/// [`LogError::Io`] on a gunzip / tempfile failure. A missing current
426/// `access.log` is not an error ([`verify`] reports it empty).
427pub fn verify_all(current: &Utf8Path) -> Result<Vec<(Utf8PathBuf, VerifyReport)>, LogError> {
428    let mut out = Vec::new();
429    for seg in rotated_segments(current) {
430        let gz = File::open(seg.as_std_path())?;
431        let mut dec = GzDecoder::new(gz);
432        let tmp = tempfile::NamedTempFile::new().map_err(|e| {
433            LogError::Io(std::io::Error::other(format!(
434                "verify_all: tempfile for {seg}: {e}"
435            )))
436        })?;
437        {
438            let mut w = File::create(tmp.path())?;
439            std::io::copy(&mut dec, &mut w)?;
440            w.sync_all()?;
441        }
442        let tmp_utf8 = Utf8Path::from_path(tmp.path()).ok_or_else(|| {
443            LogError::Io(std::io::Error::other("verify_all: non-utf8 tempfile path"))
444        })?;
445        let report = verify(tmp_utf8)?;
446        out.push((seg, report));
447        // `tmp` (and the gunzipped file) drop here, after verify.
448    }
449    let report = verify(current)?;
450    out.push((current.to_path_buf(), report));
451    Ok(out)
452}
453
454/// Caller-supplied fields for a row. The writer fills in `ts`, `ts_seq`,
455/// `session_id`, `prev_hash`, `this_hash`, and the literal
456/// `schema_version = "v2"` (`LOG_SCHEMA_VERSION`).
457///
458/// Callers SHOULD populate [`Self::canonical_digest`] on rows that have
459/// a meaningful audit identity (`Fetch` / `Resolve` / `StoreWrite` rows
460/// with a `ref`), leaving it `None` on session bookend rows. The digest
461/// is produced by [`crate::CanonicalRef::digest_hex`] from a
462/// `(source_type, source_id, resolver_profile, version)` tuple — see
463/// ADR-0021 §1 for the algorithm and ADR-0024 for the implementation
464/// surface.
465#[derive(Debug, Clone)]
466pub struct RowInput<'a> {
467    /// Event class.
468    pub event: LogEvent,
469    /// Result.
470    pub result: LogResult,
471    /// Capability under which the row is written (REQUIRED for every row).
472    pub capability: Capability,
473    /// Optional DOI / arXiv id.
474    pub ref_: Option<&'a str>,
475    /// Optional source name.
476    pub source: Option<&'a str>,
477    /// Optional error code on failure rows.
478    pub error_code: Option<&'a str>,
479    /// Optional payload size in bytes.
480    pub size_bytes: Option<u64>,
481    /// Optional OA license string (set on `event=fetch`, `result=ok`).
482    pub license: Option<&'a str>,
483    /// Optional store path relative to the store root (set on `event=fetch`,
484    /// `result=ok`).
485    pub store_path: Option<&'a str>,
486    /// Optional canonical-digest (ADR-0021 §1) as 64 lowercase hex
487    /// chars. `None` for session bookend / capability-resolution rows;
488    /// SHOULD be `Some` for `Fetch` / `Resolve` / `StoreWrite` rows
489    /// whose `source` field names the resolver. Build via
490    /// [`crate::Ref::promote`] + [`crate::CanonicalRef::digest_hex`].
491    pub canonical_digest: Option<&'a str>,
492}
493
494// ---------------------------------------------------------------------------
495// Canonical-JSON helper (PROVENANCE_LOG.md §4)
496//
497// Hashing rule (CRITICAL — this is the spec contract for `audit-log --verify`):
498//
499//   this_hash = lower_hex(SHA-256(canonical_json(row \ {this_hash})))
500//
501// Canonical JSON = **compact (no whitespace), keys sorted lexicographically,
502// no trailing whitespace** (§4). Struct field order is deliberately NOT
503// load-bearing here; the canonicalizer sorts the resulting object keys via
504// `BTreeMap<String, Value>`, which serializes in lex-sorted key order.
505//
506// Worked example: for the row fragment `{ts_seq: 1, ts: "..."}` (input order),
507// the canonical bytes after lex sort are `{"ts":"...","ts_seq":1}` because
508// `"ts"` < `"ts_seq"` lexicographically. In v2 (ADR-0024) the lex-first
509// top-level key is `"canonical_digest"` — `"canonical_digest"` < `"capability"`
510// because 'n'(110) < 'p'(112) at byte index 2 (both share the `"ca"`
511// prefix). The pre-v2 lex-first key was `"capability"`.
512// ---------------------------------------------------------------------------
513
514/// Serializable shadow of [`LogRow`] **without** `this_hash`. Used solely as
515/// an intermediate to compute the canonical bytes that `this_hash` is the
516/// SHA-256 of. The wire key names match [`LogRow`]'s `serde` attributes.
517///
518/// v2 shape (ADR-0024): includes `schema_version` and
519/// `canonical_digest`. Both fields participate in the hash chain — a
520/// tampered `canonical_digest` is detected by `audit-log --verify`
521/// exactly like a tampered `ref` or `source` would be.
522#[derive(Serialize)]
523struct RowForHash<'a> {
524    ts: DateTime<Utc>,
525    ts_seq: u64,
526    event: LogEvent,
527    #[serde(rename = "ref")]
528    ref_: Option<&'a str>,
529    source: Option<&'a str>,
530    result: LogResult,
531    license: Option<&'a str>,
532    size_bytes: Option<u64>,
533    store_path: Option<&'a str>,
534    capability: Capability,
535    session_id: &'a str,
536    error_code: Option<&'a str>,
537    schema_version: &'a str,
538    canonical_digest: Option<&'a str>,
539    prev_hash: &'a str,
540}
541
542/// Produce canonical-JSON bytes for a row-without-hash, with object keys
543/// sorted lexicographically per PROVENANCE_LOG.md §4.
544///
545/// Implementation: serialize via `serde_json::to_value` to get a `Value`,
546/// require it be an object, then move its entries into a
547/// `BTreeMap<String, Value>` (which serializes with lex-sorted keys) and
548/// re-serialize compactly. No new dependency required.
549fn canonical_json_for_hash(rfh: &RowForHash<'_>) -> Result<Vec<u8>, LogError> {
550    let value = serde_json::to_value(rfh)?;
551    let map = match value {
552        serde_json::Value::Object(m) => m,
553        // RowForHash is always a struct, so this branch is unreachable in
554        // practice; surface as a serde error if it ever changes.
555        _ => {
556            return Err(LogError::Serialize(serde::de::Error::custom(
557                "RowForHash did not serialize to a JSON object",
558            )));
559        }
560    };
561    let sorted: BTreeMap<String, serde_json::Value> = map.into_iter().collect();
562    Ok(serde_json::to_vec(&sorted)?)
563}
564
565/// Compute `this_hash` for the given row-without-hash. Returns 64 lowercase
566/// hex chars.
567fn compute_this_hash(rfh: &RowForHash<'_>) -> Result<String, LogError> {
568    let bytes = canonical_json_for_hash(rfh)?;
569    let digest = Sha256::digest(&bytes);
570    Ok(hex::encode(digest))
571}
572
573impl ProvenanceLog {
574    /// Open or create the log at `path`, stamping every row with
575    /// `session_id`.
576    ///
577    /// `session_id` MUST be a 26-char ULID generated **once per process**
578    /// invocation by the caller. Re-opening the log within the same process
579    /// reuses the same `session_id`; re-opening in a new process gets a new
580    /// one. This crate intentionally does NOT generate the ULID itself —
581    /// callers are responsible for creating one (e.g. via the `ulid` crate
582    /// already present in the workspace) and threading it through.
583    ///
584    /// If the file exists, scan it once to recover the last `ts_seq` and
585    /// `this_hash`. If the file is missing or empty, the first row will use
586    /// `prev_hash = "GENESIS"` and `ts_seq = 1`.
587    ///
588    /// # Errors
589    ///
590    /// Returns [`LogError::Io`] for I/O failures or if any line fails to
591    /// parse as a [`LogRow`] (synthetic message: `"corrupted log at line N: …"`).
592    /// The writer never silently truncates a corrupt log.
593    ///
594    /// Returns [`LogError::NotARegularFile`] if `path` exists but is not a
595    /// regular file (e.g. a directory).
596    pub fn open(path: impl Into<Utf8PathBuf>, session_id: String) -> Result<Self, LogError> {
597        // Production path: the §6 threshold comes from
598        // `DOIGET_LOG_ROTATE_BYTES` (default 100 MiB), resolved ONCE here.
599        Self::open_with_rotate_threshold(path, session_id, rotate_threshold_bytes())
600    }
601
602    /// [`open`](Self::open) with an explicit rotation threshold instead
603    /// of reading `DOIGET_LOG_ROTATE_BYTES`.
604    ///
605    /// This exists so the rotation tests inject a tiny threshold WITHOUT
606    /// mutating the process-global env var: a global env knob raced
607    /// non-`#[serial]` tests (a concurrent test's `open` would cache the
608    /// tiny threshold and spuriously rotate). `#[serial]` only
609    /// serializes `#[serial]` tests, so injection — not serialization —
610    /// is the robust fix. `0` disables rotation.
611    pub(crate) fn open_with_rotate_threshold(
612        path: impl Into<Utf8PathBuf>,
613        session_id: String,
614        rotate_threshold: u64,
615    ) -> Result<Self, LogError> {
616        let path: Utf8PathBuf = path.into();
617
618        // Reject obvious non-files up front so later `OpenOptions::append`
619        // doesn't produce a confusing platform-dependent error.
620        if path.exists() {
621            let md = std::fs::metadata(&path)?;
622            if !md.is_file() {
623                return Err(LogError::NotARegularFile(path));
624            }
625        }
626
627        let (next_seq, last_hash) = recover_state(&path)?;
628
629        // §6 retention: prune rotated `.gz` segments older than the
630        // window. Best-effort — pruning is housekeeping, not integrity,
631        // so a failure is logged and `open` still succeeds (unlike
632        // rotation, which is fail-closed).
633        prune_rotated_segments(&path, retention_days());
634
635        Ok(Self {
636            path,
637            state: Mutex::new(LogState {
638                next_seq,
639                last_hash,
640            }),
641            session_id,
642            rotate_threshold,
643        })
644    }
645
646    /// Append a row. Computes `prev_hash`, `ts_seq`, `ts`, `session_id`, and
647    /// `this_hash`; the caller only supplies the semantic fields via
648    /// [`RowInput`].
649    ///
650    /// Returns the assigned `ts_seq` on success.
651    ///
652    /// # Errors
653    ///
654    /// Returns [`LogError`] on serialization, I/O, or fsync failure. Callers
655    /// MUST treat this as fail-closed and abort the surrounding fetch.
656    pub fn append(&self, input: RowInput<'_>) -> Result<u64, LogError> {
657        // Hold the mutex for the entire append: serialize + write + flush +
658        // fsync + state update. This is the in-process serialization point
659        // promised by `docs/SECURITY.md` §1.8.
660        //
661        // A poisoned mutex only happens if a previous `append` panicked
662        // mid-write. Surface that as an I/O error rather than propagating
663        // a panic.
664        let mut state = self
665            .state
666            .lock()
667            .map_err(|_| LogError::Io(std::io::Error::other("provenance log mutex poisoned")))?;
668
669        // §6 rotation, BEFORE this row is written. If `access.log` has
670        // reached the threshold, gzip+rename it and reset the in-memory
671        // chain state so this row becomes the GENESIS-rooted first row of
672        // a fresh file. Fail-closed: a rotation error aborts the append
673        // (the `?`), so the caller's fetch aborts and the chain never
674        // silently continues in an over-size or half-rotated file. The
675        // `state` mutex is held, so rotation is serialized with appends.
676        let threshold = self.rotate_threshold;
677        if threshold > 0 {
678            let size = match std::fs::metadata(&self.path) {
679                Ok(m) => m.len(),
680                Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
681                Err(e) => return Err(LogError::Io(e)),
682            };
683            if size >= threshold {
684                rotate_log(&self.path)?;
685                state.next_seq = 1;
686                state.last_hash = GENESIS_HASH.to_string();
687            }
688        }
689
690        let ts_seq = state.next_seq;
691        let prev_hash = state.last_hash.clone();
692        let ts = Utc::now();
693
694        let rfh = RowForHash {
695            ts,
696            ts_seq,
697            event: input.event,
698            ref_: input.ref_,
699            source: input.source,
700            result: input.result,
701            license: input.license,
702            size_bytes: input.size_bytes,
703            store_path: input.store_path,
704            capability: input.capability,
705            session_id: &self.session_id,
706            error_code: input.error_code,
707            schema_version: LOG_SCHEMA_VERSION,
708            canonical_digest: input.canonical_digest,
709            prev_hash: &prev_hash,
710        };
711
712        let this_hash = compute_this_hash(&rfh)?;
713
714        // Build the on-disk row. Owned strings here because `LogRow` does
715        // not borrow.
716        let row = LogRow {
717            ts,
718            ts_seq,
719            event: input.event,
720            ref_: input.ref_.map(str::to_string),
721            source: input.source.map(str::to_string),
722            result: input.result,
723            license: input.license.map(str::to_string),
724            size_bytes: input.size_bytes,
725            store_path: input.store_path.map(str::to_string),
726            capability: input.capability,
727            session_id: self.session_id.clone(),
728            error_code: input.error_code.map(str::to_string),
729            schema_version: LOG_SCHEMA_VERSION.to_string(),
730            canonical_digest: input.canonical_digest.map(str::to_string),
731            prev_hash,
732            this_hash: this_hash.clone(),
733        };
734
735        // Serialize, append `\n`, write_all in one syscall, flush BufWriter,
736        // fsync the underlying file. `\n` is part of the same buffer, so a
737        // crash mid-write leaves at most a partial line (no trailing `\n`),
738        // which is detectable on recovery as a corrupted final line.
739        let mut bytes = serde_json::to_vec(&row)?;
740        bytes.push(b'\n');
741
742        let file = OpenOptions::new()
743            .create(true)
744            .append(true)
745            .open(&self.path)?;
746        let mut writer = BufWriter::new(file);
747        writer.write_all(&bytes)?;
748        writer.flush()?;
749        // `into_inner` to recover the underlying File for `sync_all`.
750        let file = writer.into_inner().map_err(|e| {
751            LogError::Io(std::io::Error::other(format!(
752                "buf writer flush failed: {}",
753                e.error()
754            )))
755        })?;
756        file.sync_all()?;
757
758        // Only after a successful fsync do we advance the in-memory state.
759        // If any of the above fails, the next `append` retries from the
760        // same `(ts_seq, prev_hash)` — at most a torn last line on disk.
761        state.next_seq = ts_seq + 1;
762        state.last_hash = this_hash;
763
764        Ok(ts_seq)
765    }
766
767    /// Returns the path the log was opened at. Useful for tests and audit tooling.
768    pub fn path(&self) -> &Utf8Path {
769        &self.path
770    }
771
772    /// Returns the session id stamped into every row written through this
773    /// writer.
774    pub fn session_id(&self) -> &str {
775        &self.session_id
776    }
777}
778
779/// Scan an existing log to recover `(next_seq, last_hash)`.
780///
781/// Walk every line, parse as [`LogRow`], track the last successfully parsed
782/// row. If parsing fails, return [`LogError::Io`] with a synthetic
783/// `"corrupted log at line N: …"` message — never silently truncate.
784fn recover_state(path: &Utf8Path) -> Result<(u64, String), LogError> {
785    let file = match File::open(path) {
786        Ok(f) => f,
787        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
788            return Ok((1, GENESIS_HASH.to_string()));
789        }
790        Err(e) => return Err(LogError::Io(e)),
791    };
792
793    let reader = BufReader::new(file);
794    let mut last_seq: u64 = 0;
795    let mut last_hash: String = GENESIS_HASH.to_string();
796
797    for (idx, line_res) in reader.lines().enumerate() {
798        let line_no = idx + 1;
799        let line = line_res?;
800        if line.is_empty() {
801            // Tolerate trailing/empty lines silently — they are not data.
802            continue;
803        }
804        let row: LogRow = serde_json::from_str(&line).map_err(|e| {
805            LogError::Io(std::io::Error::new(
806                std::io::ErrorKind::InvalidData,
807                format!("corrupted log at line {}: {}", line_no, e),
808            ))
809        })?;
810        last_seq = row.ts_seq;
811        last_hash = row.this_hash;
812    }
813
814    if last_seq == 0 {
815        Ok((1, GENESIS_HASH.to_string()))
816    } else {
817        Ok((last_seq + 1, last_hash))
818    }
819}
820
821// ---------------------------------------------------------------------------
822// Verification (`doiget audit-log --verify`)
823//
824// The provenance log is a JSON Lines file with a SHA-256 hash chain
825// (PROVENANCE_LOG.md §4). Tampering is detected by recomputing every row's
826// `this_hash` and validating the chain. This module provides the offline
827// verifier; the CLI wrapper lives in `doiget-cli::commands::audit_log`.
828//
829// Failure model: returning `Err` is reserved for I/O failures opening / reading
830// the file. Per-row issues (parse failures, hash/chain mismatches, sequence
831// regressions) are accumulated into [`VerifyReport::errors`] so callers can
832// report them all in one pass — this is the contract Phase 1 ships.
833// ---------------------------------------------------------------------------
834
835/// Outcome of [`verify`]: per-row chain status across the entire log.
836#[derive(Debug, Clone)]
837#[non_exhaustive]
838pub struct VerifyReport {
839    /// Total non-empty lines processed (1-based count).
840    pub total_rows: usize,
841    /// Rows whose hash, chain link, and `ts_seq` all validated.
842    pub ok_rows: usize,
843    /// Issues encountered, in encounter order. Line numbers are 1-based.
844    pub errors: Vec<VerifyIssue>,
845}
846
847impl VerifyReport {
848    /// An empty, all-clear report — used when the log file is absent.
849    fn empty() -> Self {
850        Self {
851            total_rows: 0,
852            ok_rows: 0,
853            errors: Vec::new(),
854        }
855    }
856}
857
858/// A single issue discovered by [`verify`].
859#[derive(Debug, Clone)]
860#[non_exhaustive]
861pub struct VerifyIssue {
862    /// 1-based line number where the issue was detected.
863    pub line: usize,
864    /// Classification of the issue (see [`VerifyIssueKind`]).
865    pub kind: VerifyIssueKind,
866    /// Human-readable description (caller may format for stderr/stdout).
867    pub message: String,
868}
869
870/// Classification of a [`VerifyIssue`]. `non_exhaustive` for forward
871/// compatibility — future kinds may include `SessionIdChange`, etc.
872#[derive(Debug, Clone, Copy, PartialEq, Eq)]
873#[non_exhaustive]
874pub enum VerifyIssueKind {
875    /// Row failed to parse as [`LogRow`] (corrupted JSON or unknown field).
876    ParseError,
877    /// `prev_hash` did not match the previous row's `this_hash` (or the
878    /// genesis sentinel on row 1).
879    PrevHashMismatch,
880    /// Row's stored `this_hash` did not match the recomputed canonical-JSON
881    /// SHA-256.
882    ThisHashMismatch,
883    /// `ts_seq` did not increase strictly monotonically (within a session;
884    /// see PROVENANCE_LOG.md §3 + §6 — chain restarts after rotation are
885    /// permitted to reset `ts_seq` and are detected via the genesis sentinel).
886    SequenceJump,
887}
888
889/// Verify the entire log file at `path`.
890///
891/// Returns `Ok(VerifyReport)` regardless of whether the chain validates;
892/// callers inspect `report.errors.is_empty()` to determine pass/fail.
893/// Returns `Err` only when the file itself cannot be opened or read at the
894/// I/O level.
895///
896/// Behavior:
897///
898/// - A missing file is treated as a clean, empty log (no tampering possible
899///   on bytes that don't exist) and returns an empty report after a `warn!`.
900/// - Empty / blank lines are skipped — they are not data per the writer's
901///   on-disk format (PROVENANCE_LOG.md §2).
902/// - On a row that fails to parse as [`LogRow`], a `ParseError` is recorded
903///   and verification continues on the next line. The chain anchor does NOT
904///   advance through an unparsable row, so the next valid row's `prev_hash`
905///   is checked against the last successfully parsed row (or against
906///   `"GENESIS"` if no valid row has been seen yet).
907/// - A `prev_hash == "GENESIS"` sentinel marks a chain restart (first row of
908///   a fresh / rotated log per §6) and resets the `ts_seq` monotonicity
909///   anchor — `ts_seq` is NOT compared to the prior row across a restart.
910pub fn verify(path: &Utf8Path) -> Result<VerifyReport, LogError> {
911    let file = match File::open(path) {
912        Ok(f) => f,
913        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
914            tracing::warn!(
915                path = %path,
916                "audit-log verify: log file does not exist; reporting empty"
917            );
918            return Ok(VerifyReport::empty());
919        }
920        Err(e) => return Err(LogError::Io(e)),
921    };
922
923    let reader = BufReader::new(file);
924    let mut report = VerifyReport::empty();
925
926    // Anchor for the chain check: the LAST SUCCESSFULLY PARSED row. The chain
927    // is anchored to the bytes on disk, not to a hypothetical "should have
928    // been". This matches the spec — tampering at row N must surface both as
929    // a hash mismatch on N and as a chain break on N+1.
930    let mut prev_row: Option<LogRow> = None;
931
932    for (idx, line_res) in reader.lines().enumerate() {
933        let line_no = idx + 1;
934        let line = line_res?;
935        if line.is_empty() {
936            continue;
937        }
938
939        report.total_rows += 1;
940
941        let row: LogRow = match serde_json::from_str(&line) {
942            Ok(r) => r,
943            Err(e) => {
944                report.errors.push(VerifyIssue {
945                    line: line_no,
946                    kind: VerifyIssueKind::ParseError,
947                    message: format!("failed to parse row as LogRow: {e}"),
948                });
949                // Chain anchor cannot advance through an unparsable row;
950                // leave `prev_row` untouched so the next valid row's
951                // `prev_hash` is checked against the last-known anchor (or
952                // GENESIS if we never had one).
953                continue;
954            }
955        };
956
957        let mut row_ok = true;
958
959        // 1. Recompute `this_hash` from canonical JSON (row \ {this_hash}).
960        let rfh = RowForHash {
961            ts: row.ts,
962            ts_seq: row.ts_seq,
963            event: row.event,
964            ref_: row.ref_.as_deref(),
965            source: row.source.as_deref(),
966            result: row.result,
967            license: row.license.as_deref(),
968            size_bytes: row.size_bytes,
969            store_path: row.store_path.as_deref(),
970            capability: row.capability,
971            session_id: &row.session_id,
972            error_code: row.error_code.as_deref(),
973            schema_version: &row.schema_version,
974            canonical_digest: row.canonical_digest.as_deref(),
975            prev_hash: &row.prev_hash,
976        };
977        match compute_this_hash(&rfh) {
978            Ok(recomputed) => {
979                if recomputed != row.this_hash {
980                    report.errors.push(VerifyIssue {
981                        line: line_no,
982                        kind: VerifyIssueKind::ThisHashMismatch,
983                        message: format!(
984                            "this_hash mismatch: stored={}, recomputed={}",
985                            row.this_hash, recomputed
986                        ),
987                    });
988                    row_ok = false;
989                }
990            }
991            Err(e) => {
992                // Canonicalization itself failed — surface as a hash
993                // mismatch with the underlying error in the message.
994                report.errors.push(VerifyIssue {
995                    line: line_no,
996                    kind: VerifyIssueKind::ThisHashMismatch,
997                    message: format!("failed to recompute this_hash: {e}"),
998                });
999                row_ok = false;
1000            }
1001        }
1002
1003        // 2. Chain link: `prev_hash` matches anchor (GENESIS on row 1 / after
1004        //    a chain restart, prior row's `this_hash` otherwise).
1005        let is_genesis = row.prev_hash == GENESIS_HASH;
1006        match &prev_row {
1007            None => {
1008                // First non-empty row in the file: must declare GENESIS.
1009                if !is_genesis {
1010                    report.errors.push(VerifyIssue {
1011                        line: line_no,
1012                        kind: VerifyIssueKind::PrevHashMismatch,
1013                        message: format!(
1014                            "first row must have prev_hash=\"GENESIS\", got {:?}",
1015                            row.prev_hash
1016                        ),
1017                    });
1018                    row_ok = false;
1019                }
1020            }
1021            Some(prev) => {
1022                if is_genesis {
1023                    // Chain restart (rotation per §6) — accepted, no link
1024                    // check, and the `ts_seq` monotonicity anchor resets
1025                    // (handled below via `is_genesis`).
1026                } else if row.prev_hash != prev.this_hash {
1027                    report.errors.push(VerifyIssue {
1028                        line: line_no,
1029                        kind: VerifyIssueKind::PrevHashMismatch,
1030                        message: format!(
1031                            "prev_hash mismatch: row stores {}, previous row's this_hash is {}",
1032                            row.prev_hash, prev.this_hash
1033                        ),
1034                    });
1035                    row_ok = false;
1036                }
1037            }
1038        }
1039
1040        // 3. ts_seq monotonicity — strictly greater than the previous row's
1041        //    `ts_seq`, EXCEPT across a chain restart (where `ts_seq` resets).
1042        if let Some(prev) = &prev_row {
1043            if !is_genesis && row.ts_seq <= prev.ts_seq {
1044                report.errors.push(VerifyIssue {
1045                    line: line_no,
1046                    kind: VerifyIssueKind::SequenceJump,
1047                    message: format!(
1048                        "ts_seq did not increase strictly: previous={}, current={}",
1049                        prev.ts_seq, row.ts_seq
1050                    ),
1051                });
1052                row_ok = false;
1053            }
1054        }
1055
1056        if row_ok {
1057            report.ok_rows += 1;
1058        }
1059
1060        // Advance the anchor to the just-parsed row (whether or not it had
1061        // issues — the on-disk bytes ARE the chain).
1062        prev_row = Some(row);
1063    }
1064
1065    Ok(report)
1066}
1067
1068// ---------------------------------------------------------------------------
1069// v1 → v2 migration (ADR-0024, `docs/PROVENANCE_LOG.md` §"Schema migration").
1070//
1071// v1 rows lack `schema_version` and `canonical_digest`; the v2 binary
1072// fails loudly when asked to read them (see `recover_state` /
1073// `verify`). The migration recovers a v2 log from a v1 file by:
1074//
1075//   1. Parsing every v1 row via the [`V1LogRow`] shadow struct.
1076//   2. Deriving a [`crate::CanonicalRef`] from the v1 `(ref, source)`
1077//      pair — `source` becomes `resolver_profile`, `version` is `None`
1078//      (ADR-0021 §1 → ADR-0024 migration recipe).
1079//   3. Re-computing the SHA-256 hash chain across the new row
1080//      payloads. The v1 chain is invalidated by the schema change; the
1081//      v2 chain restarts at the first row's stored `prev_hash` (which
1082//      is `"GENESIS"` on a fresh log).
1083//   4. Writing the new rows to `<log_path>.v2-migrated`, then
1084//      atomically renaming it onto `<log_path>` after backing up the
1085//      original to `<log_path>.v1-backup`.
1086//
1087// The migration is **idempotent**: running it on an already-v2 log
1088// re-parses every row as v2, recomputes the same hash chain, and
1089// produces a byte-equivalent output.
1090//
1091// The migration is **dry-runnable**: `dry_run = true` returns a
1092// [`MigrationReport`] summarizing what would change without touching
1093// disk.
1094// ---------------------------------------------------------------------------
1095
1096/// Summary of a [`migrate_v1_to_v2`] run.
1097///
1098/// Marked `#[non_exhaustive]` so future fields (e.g. a per-row error
1099/// list, an aborted-row count) can be added without breaking callers
1100/// that pattern-match.
1101///
1102/// `Serialize` enables `provenance migrate --mode json` (#204) — the
1103/// wire form is `{"rows_rewritten": N, "dry_run": bool,
1104/// "first_row_v1_chain_hash": "...", "first_row_v2_chain_hash": "..."}`.
1105///
1106/// # Wire-format stability (post-#208 self-review §1)
1107///
1108/// Once a release ships with the [`Serialize`] derive, the field
1109/// **names** below become part of the public API. Renaming a field is
1110/// then a semver minor bump and warrants a CHANGELOG \[BREAKING\] note;
1111/// new fields are still safe (per `#[non_exhaustive]`).
1112#[derive(Debug, Clone, Serialize)]
1113#[non_exhaustive]
1114pub struct MigrationReport {
1115    /// Number of rows rewritten (or that WOULD be rewritten under
1116    /// `dry_run`).
1117    pub rows_rewritten: u64,
1118    /// Whether this was a dry-run preview (`true`) or a live rewrite
1119    /// (`false`).
1120    pub dry_run: bool,
1121    /// Stored `this_hash` of the first input row (the v1 chain anchor).
1122    /// `"GENESIS"` is reported as the literal `"GENESIS"` when the log
1123    /// was empty.
1124    pub first_row_v1_chain_hash: String,
1125    /// Recomputed `this_hash` of the first migrated row under the v2
1126    /// canonicalization. Equal to [`Self::first_row_v1_chain_hash`]
1127    /// only if the input was already v2 (idempotent case).
1128    pub first_row_v2_chain_hash: String,
1129}
1130
1131/// v1 row shadow struct used ONLY by [`migrate_v1_to_v2`]. The
1132/// non-defaulted v2 fields (`schema_version`, `canonical_digest`) are
1133/// absent here; `deny_unknown_fields` rejects unexpected v2 fields so a
1134/// v2 row on disk fails to parse as v1, letting the migrator detect
1135/// already-v2 input via fallback to the v2 parser.
1136#[derive(Debug, Clone, Deserialize, Serialize)]
1137#[serde(deny_unknown_fields)]
1138struct V1LogRow {
1139    ts: DateTime<Utc>,
1140    ts_seq: u64,
1141    event: LogEvent,
1142    #[serde(rename = "ref")]
1143    ref_: Option<String>,
1144    source: Option<String>,
1145    result: LogResult,
1146    license: Option<String>,
1147    size_bytes: Option<u64>,
1148    store_path: Option<String>,
1149    capability: Capability,
1150    session_id: String,
1151    error_code: Option<String>,
1152    prev_hash: String,
1153    this_hash: String,
1154}
1155
1156/// Minimal in-memory representation a v1 OR v2 row can be promoted to
1157/// before re-hashing.
1158#[derive(Debug, Clone)]
1159struct MigrationRowSeed {
1160    ts: DateTime<Utc>,
1161    ts_seq: u64,
1162    event: LogEvent,
1163    ref_: Option<String>,
1164    source: Option<String>,
1165    result: LogResult,
1166    license: Option<String>,
1167    size_bytes: Option<u64>,
1168    store_path: Option<String>,
1169    capability: Capability,
1170    session_id: String,
1171    error_code: Option<String>,
1172    /// `None` for v1 inputs (the digest is computed during migration);
1173    /// `Some(...)` for already-v2 inputs (carried through verbatim for
1174    /// idempotency).
1175    canonical_digest_in: Option<String>,
1176    /// As stored on disk in the input. Used only for the
1177    /// `first_row_v1_chain_hash` field of [`MigrationReport`].
1178    stored_this_hash: String,
1179}
1180
1181/// Migrate a v1 provenance log to v2 (ADR-0024).
1182///
1183/// Returns a [`MigrationReport`] describing how many rows were (or
1184/// would be) rewritten and the first-row chain-anchor delta. The
1185/// migration is idempotent: running it twice produces byte-equivalent
1186/// output the second time.
1187///
1188/// On a missing log file, returns a no-op report (`rows_rewritten = 0`,
1189/// `first_row_v1_chain_hash = "GENESIS"`, `first_row_v2_chain_hash =
1190/// "GENESIS"`) — there is nothing to migrate.
1191///
1192/// # Errors
1193///
1194/// Returns [`LogError::Io`] on I/O failures and on rows that fail to
1195/// parse as either v1 or v2 (the synthetic message names the line
1196/// number). Returns [`LogError::Serialize`] on canonicalization
1197/// failures.
1198pub fn migrate_v1_to_v2(log_path: &Utf8Path, dry_run: bool) -> Result<MigrationReport, LogError> {
1199    use std::io::BufRead;
1200
1201    // -- 1. Read the input log, parsing each line as v1 OR (idempotent
1202    //       fallback) v2. --------------------------------------------------
1203    let file = match File::open(log_path) {
1204        Ok(f) => f,
1205        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1206            return Ok(MigrationReport {
1207                rows_rewritten: 0,
1208                dry_run,
1209                first_row_v1_chain_hash: GENESIS_HASH.to_string(),
1210                first_row_v2_chain_hash: GENESIS_HASH.to_string(),
1211            });
1212        }
1213        Err(e) => return Err(LogError::Io(e)),
1214    };
1215    let reader = BufReader::new(file);
1216    let mut seeds: Vec<MigrationRowSeed> = Vec::new();
1217
1218    for (idx, line_res) in reader.lines().enumerate() {
1219        let line_no = idx + 1;
1220        let line = line_res?;
1221        if line.is_empty() {
1222            continue;
1223        }
1224        // Try v1 first. If it fails, try v2 (idempotency: re-migrating
1225        // a v2 log MUST succeed and produce equivalent output).
1226        let seed = if let Ok(v1) = serde_json::from_str::<V1LogRow>(&line) {
1227            MigrationRowSeed {
1228                ts: v1.ts,
1229                ts_seq: v1.ts_seq,
1230                event: v1.event,
1231                ref_: v1.ref_,
1232                source: v1.source,
1233                result: v1.result,
1234                license: v1.license,
1235                size_bytes: v1.size_bytes,
1236                store_path: v1.store_path,
1237                capability: v1.capability,
1238                session_id: v1.session_id,
1239                error_code: v1.error_code,
1240                canonical_digest_in: None,
1241                stored_this_hash: v1.this_hash,
1242            }
1243        } else {
1244            match serde_json::from_str::<LogRow>(&line) {
1245                Ok(v2) => MigrationRowSeed {
1246                    ts: v2.ts,
1247                    ts_seq: v2.ts_seq,
1248                    event: v2.event,
1249                    ref_: v2.ref_,
1250                    source: v2.source,
1251                    result: v2.result,
1252                    license: v2.license,
1253                    size_bytes: v2.size_bytes,
1254                    store_path: v2.store_path,
1255                    capability: v2.capability,
1256                    session_id: v2.session_id,
1257                    error_code: v2.error_code,
1258                    canonical_digest_in: v2.canonical_digest,
1259                    stored_this_hash: v2.this_hash,
1260                },
1261                Err(e) => {
1262                    return Err(LogError::Io(std::io::Error::new(
1263                        std::io::ErrorKind::InvalidData,
1264                        format!("migration: line {line_no} is neither v1 nor v2: {e}"),
1265                    )));
1266                }
1267            }
1268        };
1269        seeds.push(seed);
1270    }
1271
1272    // -- 2. Derive `canonical_digest` for each seed that lacks one. ------
1273    //
1274    // For v1 rows: build a CanonicalRef from
1275    //   - source_type from `event`/`ref` shape (DOI prefix `10.` vs
1276    //     arXiv) — we use a heuristic that matches `Ref::parse`'s rule
1277    //     (`starts_with "10."` ⇒ DOI; else arXiv).
1278    //   - source_id = ref value (verbatim).
1279    //   - resolver_profile = source value (verbatim, ADR-0021 §3
1280    //     migration recipe).
1281    //   - version = None.
1282    //
1283    // Rows without a `ref` (session bookend) keep `canonical_digest =
1284    // None` per the v2 row contract.
1285
1286    fn derive_digest(seed: &MigrationRowSeed) -> Option<String> {
1287        let ref_str = seed.ref_.as_deref()?;
1288        let source_key = seed.source.as_deref().unwrap_or("");
1289        // Heuristic: bare DOIs always start `10.`; everything else is
1290        // treated as an arXiv id. Mirrors `Ref::parse` rule 3/4.
1291        let source_type = if ref_str.starts_with("10.") {
1292            crate::SourceType::Doi
1293        } else {
1294            crate::SourceType::Arxiv
1295        };
1296        let c = crate::CanonicalRef::new(source_type, ref_str, source_key, None);
1297        Some(c.digest_hex())
1298    }
1299
1300    let digests: Vec<Option<String>> = seeds
1301        .iter()
1302        .map(|s| s.canonical_digest_in.clone().or_else(|| derive_digest(s)))
1303        .collect();
1304
1305    // -- 3. Rebuild the hash chain across the v2 payloads. ----------------
1306    let mut out_rows: Vec<LogRow> = Vec::with_capacity(seeds.len());
1307    let mut prev_hash: String = GENESIS_HASH.to_string();
1308
1309    for (seed, digest) in seeds.iter().zip(digests.iter()) {
1310        let rfh = RowForHash {
1311            ts: seed.ts,
1312            ts_seq: seed.ts_seq,
1313            event: seed.event,
1314            ref_: seed.ref_.as_deref(),
1315            source: seed.source.as_deref(),
1316            result: seed.result,
1317            license: seed.license.as_deref(),
1318            size_bytes: seed.size_bytes,
1319            store_path: seed.store_path.as_deref(),
1320            capability: seed.capability,
1321            session_id: &seed.session_id,
1322            error_code: seed.error_code.as_deref(),
1323            schema_version: LOG_SCHEMA_VERSION,
1324            canonical_digest: digest.as_deref(),
1325            prev_hash: &prev_hash,
1326        };
1327        let this_hash = compute_this_hash(&rfh)?;
1328        let row = LogRow {
1329            ts: seed.ts,
1330            ts_seq: seed.ts_seq,
1331            event: seed.event,
1332            ref_: seed.ref_.clone(),
1333            source: seed.source.clone(),
1334            result: seed.result,
1335            license: seed.license.clone(),
1336            size_bytes: seed.size_bytes,
1337            store_path: seed.store_path.clone(),
1338            capability: seed.capability,
1339            session_id: seed.session_id.clone(),
1340            error_code: seed.error_code.clone(),
1341            schema_version: LOG_SCHEMA_VERSION.to_string(),
1342            canonical_digest: digest.clone(),
1343            prev_hash: prev_hash.clone(),
1344            this_hash: this_hash.clone(),
1345        };
1346        prev_hash = this_hash;
1347        out_rows.push(row);
1348    }
1349
1350    // -- 4. Build the report. --------------------------------------------
1351    let first_v1_hash = seeds
1352        .first()
1353        .map(|s| s.stored_this_hash.clone())
1354        .unwrap_or_else(|| GENESIS_HASH.to_string());
1355    let first_v2_hash = out_rows
1356        .first()
1357        .map(|r| r.this_hash.clone())
1358        .unwrap_or_else(|| GENESIS_HASH.to_string());
1359    let report = MigrationReport {
1360        rows_rewritten: out_rows.len() as u64,
1361        dry_run,
1362        first_row_v1_chain_hash: first_v1_hash,
1363        first_row_v2_chain_hash: first_v2_hash,
1364    };
1365
1366    if dry_run {
1367        return Ok(report);
1368    }
1369
1370    // -- 5. Live write: stage to `<log_path>.v2-migrated`, back up the
1371    //       v1, then atomically rename. -----------------------------------
1372    let staged_path = with_suffix(log_path, ".v2-migrated");
1373    let backup_path = with_suffix(log_path, ".v1-backup");
1374
1375    {
1376        let staged_file = OpenOptions::new()
1377            .create(true)
1378            .write(true)
1379            .truncate(true)
1380            .open(&staged_path)?;
1381        let mut writer = BufWriter::new(staged_file);
1382        for row in &out_rows {
1383            let mut bytes = serde_json::to_vec(row)?;
1384            bytes.push(b'\n');
1385            writer.write_all(&bytes)?;
1386        }
1387        writer.flush()?;
1388        let file = writer.into_inner().map_err(|e| {
1389            LogError::Io(std::io::Error::other(format!(
1390                "migration buf writer flush failed: {}",
1391                e.error()
1392            )))
1393        })?;
1394        file.sync_all()?;
1395    }
1396
1397    // Sanity-check: the staged file MUST verify clean before we
1398    // commit the swap. If it doesn't, the migration is buggy — abort
1399    // without touching the live log.
1400    let verify_report = verify(&staged_path)?;
1401    if !verify_report.errors.is_empty() {
1402        return Err(LogError::Io(std::io::Error::other(format!(
1403            "migration: staged v2 log failed verify; first issue: {:?}",
1404            verify_report.errors.first()
1405        ))));
1406    }
1407
1408    // Move the original aside as `<log_path>.v1-backup`. Overwriting
1409    // any prior backup is intentional — the user re-running migrate
1410    // expects the most recent original preserved.
1411    if log_path.exists() {
1412        if backup_path.exists() {
1413            std::fs::remove_file(&backup_path)?;
1414        }
1415        std::fs::rename(log_path, &backup_path)?;
1416    }
1417    // Atomically promote the staged file to the live path.
1418    std::fs::rename(&staged_path, log_path)?;
1419
1420    Ok(report)
1421}
1422
1423/// Append a literal suffix to a [`Utf8Path`], producing a sibling path
1424/// in the same directory. Avoids `std::path::PathBuf` per the workspace
1425/// posture rule (`docs/SECURITY.md` §3 — camino-only file paths in
1426/// production code).
1427fn with_suffix(path: &Utf8Path, suffix: &str) -> Utf8PathBuf {
1428    let s = format!("{path}{suffix}");
1429    Utf8PathBuf::from(s)
1430}
1431
1432// ---------------------------------------------------------------------------
1433// Tests
1434// ---------------------------------------------------------------------------
1435
1436#[cfg(test)]
1437#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1438mod tests {
1439    use super::*;
1440    use std::fs;
1441    use std::sync::Arc;
1442    use std::thread;
1443
1444    use tempfile::TempDir;
1445
1446    /// Convert a `TempDir`'s `&std::path::Path` to a `Utf8PathBuf`. Tests
1447    /// always run on UTF-8 temp paths in CI; if the OS returns a non-UTF-8
1448    /// path we panic, which is acceptable for a unit test.
1449    fn tmp_dir_utf8(dir: &TempDir) -> Utf8PathBuf {
1450        Utf8PathBuf::from_path_buf(dir.path().to_path_buf()).expect("temp dir path must be UTF-8")
1451    }
1452
1453    /// A fixed 26-char ULID-shaped string used in tests. Real callers use
1454    /// the `ulid` crate; tests pin a constant so output is reproducible.
1455    const TEST_SESSION_ID: &str = "01JCKZ7Q0000000000000000AB";
1456
1457    fn open_log(path: &Utf8Path) -> ProvenanceLog {
1458        ProvenanceLog::open(path, TEST_SESSION_ID.to_string()).expect("open")
1459    }
1460
1461    fn empty_input() -> RowInput<'static> {
1462        RowInput {
1463            event: LogEvent::Fetch,
1464            result: LogResult::Ok,
1465            capability: Capability::Oa,
1466            ref_: None,
1467            source: None,
1468            error_code: None,
1469            size_bytes: None,
1470            license: None,
1471            store_path: None,
1472            canonical_digest: None,
1473        }
1474    }
1475
1476    /// Read the on-disk log and parse every line into a `LogRow`.
1477    fn read_rows(path: &Utf8Path) -> Vec<LogRow> {
1478        let raw = fs::read_to_string(path).expect("read log");
1479        raw.lines()
1480            .filter(|l| !l.is_empty())
1481            .map(|l| serde_json::from_str::<LogRow>(l).expect("valid LogRow"))
1482            .collect()
1483    }
1484
1485    /// Recompute `this_hash` for a stored row and assert it matches the
1486    /// stored value. Walks the same canonicalization rule as
1487    /// [`compute_this_hash`].
1488    fn verify_this_hash(row: &LogRow) {
1489        let rfh = RowForHash {
1490            ts: row.ts,
1491            ts_seq: row.ts_seq,
1492            event: row.event,
1493            ref_: row.ref_.as_deref(),
1494            source: row.source.as_deref(),
1495            result: row.result,
1496            license: row.license.as_deref(),
1497            size_bytes: row.size_bytes,
1498            store_path: row.store_path.as_deref(),
1499            capability: row.capability,
1500            session_id: &row.session_id,
1501            error_code: row.error_code.as_deref(),
1502            schema_version: &row.schema_version,
1503            canonical_digest: row.canonical_digest.as_deref(),
1504            prev_hash: &row.prev_hash,
1505        };
1506        let recomputed = compute_this_hash(&rfh).expect("hash");
1507        assert_eq!(
1508            recomputed, row.this_hash,
1509            "this_hash mismatch on ts_seq {}",
1510            row.ts_seq
1511        );
1512    }
1513
1514    #[test]
1515    fn first_row_uses_genesis_prev_hash() {
1516        let dir = TempDir::new().expect("tmp");
1517        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1518        let log = open_log(&path);
1519        let seq = log.append(empty_input()).expect("append");
1520        assert_eq!(seq, 1);
1521
1522        let rows = read_rows(&path);
1523        assert_eq!(rows.len(), 1);
1524        assert_eq!(rows[0].ts_seq, 1);
1525        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1526        assert_eq!(rows[0].this_hash.len(), 64);
1527        assert_eq!(rows[0].session_id, TEST_SESSION_ID);
1528        verify_this_hash(&rows[0]);
1529    }
1530
1531    #[test]
1532    fn subsequent_rows_chain_correctly() {
1533        let dir = TempDir::new().expect("tmp");
1534        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1535        let log = open_log(&path);
1536
1537        for _ in 0..3 {
1538            log.append(empty_input()).expect("append");
1539        }
1540
1541        let rows = read_rows(&path);
1542        assert_eq!(rows.len(), 3);
1543        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1544        assert_eq!(rows[1].prev_hash, rows[0].this_hash);
1545        assert_eq!(rows[2].prev_hash, rows[1].this_hash);
1546        for r in &rows {
1547            verify_this_hash(r);
1548        }
1549        assert_eq!(rows[0].ts_seq, 1);
1550        assert_eq!(rows[1].ts_seq, 2);
1551        assert_eq!(rows[2].ts_seq, 3);
1552    }
1553
1554    #[test]
1555    fn recovery_after_reopen() {
1556        let dir = TempDir::new().expect("tmp");
1557        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1558
1559        {
1560            let log = open_log(&path);
1561            for _ in 0..3 {
1562                log.append(empty_input()).expect("append");
1563            }
1564        } // drop writer
1565
1566        let log2 = open_log(&path);
1567        let seq = log2.append(empty_input()).expect("append after reopen");
1568        assert_eq!(seq, 4);
1569
1570        let rows = read_rows(&path);
1571        assert_eq!(rows.len(), 4);
1572        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1573        for i in 1..rows.len() {
1574            assert_eq!(
1575                rows[i].prev_hash,
1576                rows[i - 1].this_hash,
1577                "chain break at row {}",
1578                i + 1
1579            );
1580        }
1581        for (i, r) in rows.iter().enumerate() {
1582            assert_eq!(r.ts_seq, (i + 1) as u64);
1583            verify_this_hash(r);
1584        }
1585    }
1586
1587    #[test]
1588    fn concurrent_writers_in_same_process_serialize() {
1589        let dir = TempDir::new().expect("tmp");
1590        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1591        let log = Arc::new(open_log(&path));
1592
1593        let mut handles = Vec::with_capacity(8);
1594        for _ in 0..8 {
1595            let log = Arc::clone(&log);
1596            handles.push(thread::spawn(move || {
1597                log.append(empty_input()).expect("append")
1598            }));
1599        }
1600        let mut returned: Vec<u64> = handles
1601            .into_iter()
1602            .map(|h| h.join().expect("join"))
1603            .collect();
1604        returned.sort_unstable();
1605        assert_eq!(returned, vec![1, 2, 3, 4, 5, 6, 7, 8]);
1606
1607        let rows = read_rows(&path);
1608        assert_eq!(rows.len(), 8);
1609
1610        // The in-process mutex serializes appends, so file order MUST equal
1611        // ts_seq order: row N (0-indexed) on disk has ts_seq = N+1.
1612        for (i, r) in rows.iter().enumerate() {
1613            assert_eq!(r.ts_seq, (i + 1) as u64, "ts_seq gap at file row {}", i + 1);
1614        }
1615        // Hash chain follows file order.
1616        assert_eq!(rows[0].prev_hash, GENESIS_HASH);
1617        for i in 1..rows.len() {
1618            assert_eq!(
1619                rows[i].prev_hash,
1620                rows[i - 1].this_hash,
1621                "chain break at file row {}",
1622                i + 1
1623            );
1624        }
1625        for r in &rows {
1626            verify_this_hash(r);
1627        }
1628    }
1629
1630    #[test]
1631    fn corrupted_existing_log_fails_open() {
1632        let dir = TempDir::new().expect("tmp");
1633        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1634
1635        // JSON but not a valid LogRow: missing required fields, has unknown
1636        // field. `deny_unknown_fields` ensures the parser refuses.
1637        fs::write(&path, "{\"ts_seq\": 1, \"garbage\": true}\n").expect("write");
1638
1639        let err =
1640            ProvenanceLog::open(&path, TEST_SESSION_ID.to_string()).expect_err("must fail open");
1641        match err {
1642            LogError::Io(io) => {
1643                let msg = io.to_string();
1644                assert!(
1645                    msg.contains("corrupted log at line 1"),
1646                    "expected synthetic corruption message, got: {}",
1647                    msg
1648                );
1649            }
1650            other => panic!("expected LogError::Io, got {:?}", other),
1651        }
1652    }
1653
1654    #[test]
1655    fn rejects_non_regular_file() {
1656        // Pointing the log at a directory must fail with NotARegularFile.
1657        let dir = TempDir::new().expect("tmp");
1658        let err = ProvenanceLog::open(tmp_dir_utf8(&dir), TEST_SESSION_ID.to_string())
1659            .expect_err("must fail");
1660        match err {
1661            LogError::NotARegularFile(_) => {}
1662            other => panic!("expected NotARegularFile, got {:?}", other),
1663        }
1664    }
1665
1666    #[test]
1667    fn canonical_json_excludes_this_hash_field() {
1668        // Spec contract: the hashed bytes do not include `this_hash`. If
1669        // this ever regresses, every previously-written log becomes
1670        // unverifiable.
1671        let rfh = RowForHash {
1672            ts: Utc::now(),
1673            ts_seq: 1,
1674            event: LogEvent::Fetch,
1675            ref_: None,
1676            source: None,
1677            result: LogResult::Ok,
1678            license: None,
1679            size_bytes: None,
1680            store_path: None,
1681            capability: Capability::Oa,
1682            session_id: TEST_SESSION_ID,
1683            error_code: None,
1684            schema_version: LOG_SCHEMA_VERSION,
1685            canonical_digest: None,
1686            prev_hash: GENESIS_HASH,
1687        };
1688        let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1689        let s = std::str::from_utf8(&bytes).expect("utf8");
1690        assert!(!s.contains("this_hash"), "this_hash leaked into hash input");
1691        assert!(s.contains("\"prev_hash\":"));
1692    }
1693
1694    #[test]
1695    fn canonical_json_keys_are_lexicographically_sorted() {
1696        // PROVENANCE_LOG.md §4: canonical JSON uses keys sorted
1697        // lexicographically. The lex-first top-level key of a row is
1698        // `capability` ("c..." < "e..." < ...). Build a row and assert the
1699        // canonical bytes start with that key.
1700        let rfh = RowForHash {
1701            ts: Utc::now(),
1702            ts_seq: 1,
1703            event: LogEvent::Fetch,
1704            ref_: Some("10.1234/example"),
1705            source: Some("unpaywall"),
1706            result: LogResult::Ok,
1707            license: Some("CC-BY-4.0"),
1708            size_bytes: Some(1234),
1709            store_path: Some("papers/x.pdf"),
1710            capability: Capability::Oa,
1711            session_id: TEST_SESSION_ID,
1712            error_code: None,
1713            schema_version: LOG_SCHEMA_VERSION,
1714            canonical_digest: Some(
1715                "0000000000000000000000000000000000000000000000000000000000000000",
1716            ),
1717            prev_hash: GENESIS_HASH,
1718        };
1719        let bytes = canonical_json_for_hash(&rfh).expect("canonicalize");
1720        let s = std::str::from_utf8(&bytes).expect("utf8");
1721        // v2: lex-first key is `canonical_digest` (< `capability` because
1722        // 'n' < 'p' at byte index 2). Pre-v2 it was `capability`.
1723        assert!(
1724            s.starts_with("{\"canonical_digest\":"),
1725            "canonical bytes must start with lex-first v2 key, got: {}",
1726            s
1727        );
1728        // Spot-check ordering: `prev_hash` (p) must come before `ref` (r),
1729        // which must come before `result` (re...) — wait, "ref" < "result"
1730        // lexicographically because 'f' < 's' in ascii at index 2 vs 'e' at
1731        // index 2 of "result"... let me just check a couple of unambiguous
1732        // pairs: `event` < `prev_hash`, and `ts` < `ts_seq`.
1733        let event_idx = s.find("\"event\":").expect("event key present");
1734        let prev_idx = s.find("\"prev_hash\":").expect("prev_hash key present");
1735        assert!(event_idx < prev_idx, "event must precede prev_hash");
1736        let ts_idx = s.find("\"ts\":").expect("ts key present");
1737        let tsseq_idx = s.find("\"ts_seq\":").expect("ts_seq key present");
1738        assert!(ts_idx < tsseq_idx, "ts must precede ts_seq");
1739    }
1740
1741    // -----------------------------------------------------------------
1742    // verify() tests — Phase 1 surface for `doiget audit-log --verify`.
1743    // -----------------------------------------------------------------
1744
1745    /// Rewrite a single field's quoted-string value on a specific 1-based
1746    /// line of `path`. Used to simulate tampering. Panics on malformed input
1747    /// — only valid inputs are produced by the test harness.
1748    ///
1749    /// `field_key` is matched as `"field_key":"...old..."` (quoted string
1750    /// JSON value). The new value is the literal string `new_value` (no
1751    /// JSON escaping needed for the test fixtures we use).
1752    fn tamper_string_field(
1753        path: &Utf8Path,
1754        line_no_1based: usize,
1755        field_key: &str,
1756        new_value: &str,
1757    ) {
1758        let raw = fs::read_to_string(path).expect("read log");
1759        let mut lines: Vec<String> = raw.lines().map(str::to_string).collect();
1760        let target = &lines[line_no_1based - 1];
1761        let needle = format!("\"{field_key}\":\"");
1762        let start = target
1763            .find(&needle)
1764            .unwrap_or_else(|| panic!("field {field_key} not found on line {line_no_1based}"))
1765            + needle.len();
1766        let end_rel = target[start..]
1767            .find('"')
1768            .unwrap_or_else(|| panic!("unterminated string for field {field_key}"));
1769        let end = start + end_rel;
1770        let mut new_line = String::with_capacity(target.len());
1771        new_line.push_str(&target[..start]);
1772        new_line.push_str(new_value);
1773        new_line.push_str(&target[end..]);
1774        lines[line_no_1based - 1] = new_line;
1775        let mut out = lines.join("\n");
1776        out.push('\n');
1777        fs::write(path, out).expect("write tampered log");
1778    }
1779
1780    #[test]
1781    fn verify_empty_log_is_ok() {
1782        // Missing file is a clean log — no tampering possible on bytes that
1783        // don't exist. `verify` returns an empty report, not an error.
1784        let dir = TempDir::new().expect("tmp");
1785        let path = tmp_dir_utf8(&dir).join("nonexistent.jsonl");
1786        assert!(!path.exists(), "precondition: file must not exist");
1787
1788        let report = verify(&path).expect("verify must not error on missing file");
1789        assert_eq!(report.total_rows, 0);
1790        assert_eq!(report.ok_rows, 0);
1791        assert!(report.errors.is_empty(), "errors: {:?}", report.errors);
1792    }
1793
1794    #[test]
1795    fn verify_well_formed_chain_passes() {
1796        // Three rows written via the real writer must verify clean.
1797        let dir = TempDir::new().expect("tmp");
1798        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1799        let log = open_log(&path);
1800        for _ in 0..3 {
1801            log.append(empty_input()).expect("append");
1802        }
1803
1804        let report = verify(&path).expect("verify must succeed");
1805        assert_eq!(report.total_rows, 3);
1806        assert_eq!(report.ok_rows, 3);
1807        assert!(
1808            report.errors.is_empty(),
1809            "expected no issues on a well-formed log; got: {:?}",
1810            report.errors
1811        );
1812    }
1813
1814    #[test]
1815    fn verify_detects_tampered_row_hash() {
1816        // Mutate the SECOND row's `this_hash` to a syntactically-valid but
1817        // wrong hash. The recomputed canonical-JSON SHA-256 will not match.
1818        let dir = TempDir::new().expect("tmp");
1819        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1820        let log = open_log(&path);
1821        log.append(empty_input()).expect("append 1");
1822        log.append(empty_input()).expect("append 2");
1823        drop(log);
1824
1825        // 64 lowercase hex chars, all zeros — passes `LogRow` parse, fails hash check.
1826        tamper_string_field(
1827            &path,
1828            2,
1829            "this_hash",
1830            "0000000000000000000000000000000000000000000000000000000000000000",
1831        );
1832
1833        let report = verify(&path).expect("verify must succeed");
1834        assert_eq!(report.total_rows, 2);
1835        // Row 2's hash mismatch breaks both the hash check on row 2 AND the
1836        // chain link from row 2's stored `prev_hash` (still correct) into the
1837        // forward direction. There's no row 3 to fail forward, so we expect
1838        // exactly one issue: the this-hash mismatch on line 2.
1839        let hash_issues: Vec<_> = report
1840            .errors
1841            .iter()
1842            .filter(|e| e.kind == VerifyIssueKind::ThisHashMismatch)
1843            .collect();
1844        assert_eq!(
1845            hash_issues.len(),
1846            1,
1847            "expected exactly one ThisHashMismatch, got {:?}",
1848            report.errors
1849        );
1850        assert_eq!(hash_issues[0].line, 2);
1851    }
1852
1853    #[test]
1854    fn verify_detects_tampered_prev_hash() {
1855        // Mutate the SECOND row's `prev_hash` to a wrong value. This
1856        // invalidates the chain link but the row's own `this_hash` was
1857        // computed with the original `prev_hash`, so the this-hash check
1858        // ALSO fails (hash input changed). We assert at least the prev-hash
1859        // issue is reported on line 2.
1860        let dir = TempDir::new().expect("tmp");
1861        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1862        let log = open_log(&path);
1863        log.append(empty_input()).expect("append 1");
1864        log.append(empty_input()).expect("append 2");
1865        drop(log);
1866
1867        tamper_string_field(
1868            &path,
1869            2,
1870            "prev_hash",
1871            "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
1872        );
1873
1874        let report = verify(&path).expect("verify must succeed");
1875        assert_eq!(report.total_rows, 2);
1876        let prev_issues: Vec<_> = report
1877            .errors
1878            .iter()
1879            .filter(|e| e.kind == VerifyIssueKind::PrevHashMismatch)
1880            .collect();
1881        assert_eq!(
1882            prev_issues.len(),
1883            1,
1884            "expected exactly one PrevHashMismatch, got {:?}",
1885            report.errors
1886        );
1887        assert_eq!(prev_issues[0].line, 2);
1888    }
1889
1890    #[test]
1891    fn verify_detects_corrupted_json() {
1892        // One valid row plus a literal `{"garbage":true}` line. The garbage
1893        // line fails `serde_json::from_str::<LogRow>` (missing fields +
1894        // `deny_unknown_fields`) and surfaces as a `ParseError` on line 2.
1895        let dir = TempDir::new().expect("tmp");
1896        let path = tmp_dir_utf8(&dir).join("log.jsonl");
1897        let log = open_log(&path);
1898        log.append(empty_input()).expect("append 1");
1899        drop(log);
1900
1901        // Append a garbage line directly.
1902        let mut existing = fs::read_to_string(&path).expect("read");
1903        if !existing.ends_with('\n') {
1904            existing.push('\n');
1905        }
1906        existing.push_str("{\"garbage\":true}\n");
1907        fs::write(&path, existing).expect("write");
1908
1909        let report = verify(&path).expect("verify must succeed");
1910        // total_rows counts non-empty lines, so both lines are counted.
1911        assert_eq!(report.total_rows, 2);
1912        let parse_issues: Vec<_> = report
1913            .errors
1914            .iter()
1915            .filter(|e| e.kind == VerifyIssueKind::ParseError)
1916            .collect();
1917        assert_eq!(
1918            parse_issues.len(),
1919            1,
1920            "expected exactly one ParseError, got {:?}",
1921            report.errors
1922        );
1923        assert_eq!(parse_issues[0].line, 2);
1924    }
1925
1926    #[test]
1927    fn capability_serializes_kebab_case() {
1928        // PROVENANCE_LOG.md §3 requires `oa`, `metadata`, `tdm-elsevier`,
1929        // `tdm-aps`, `tdm-springer` on the wire (kebab-case).
1930        let cases = [
1931            (Capability::Oa, "\"oa\""),
1932            (Capability::Metadata, "\"metadata\""),
1933            (Capability::TdmElsevier, "\"tdm-elsevier\""),
1934            (Capability::TdmAps, "\"tdm-aps\""),
1935            (Capability::TdmSpringer, "\"tdm-springer\""),
1936        ];
1937        for (cap, expected) in cases {
1938            let got = serde_json::to_string(&cap).expect("serialize");
1939            assert_eq!(
1940                got, expected,
1941                "capability wire format mismatch for {:?}",
1942                cap
1943            );
1944        }
1945    }
1946
1947    // -----------------------------------------------------------------
1948    // #140 — §6 rotation, retention, multi-segment verify.
1949    // -----------------------------------------------------------------
1950
1951    fn gunzip_to_string(gz: &Utf8Path) -> String {
1952        use std::io::Read;
1953        let f = std::fs::File::open(gz.as_std_path()).expect("open gz");
1954        let mut dec = GzDecoder::new(f);
1955        let mut s = String::new();
1956        dec.read_to_string(&mut s).expect("gunzip");
1957        s
1958    }
1959
1960    #[test]
1961    fn rotation_archives_to_gz_and_restarts_genesis_chain() {
1962        let dir = TempDir::new().expect("tmp");
1963        let path = tmp_dir_utf8(&dir).join("access.log");
1964        // Inject a tiny threshold (NOT a global env var — that raced
1965        // non-#[serial] tests): row 1 fits, so the SECOND append
1966        // (size>=50) rotates before it writes. A freshly rotated `.gz`
1967        // is not retention-aged, so the default prune at open is a no-op.
1968        let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
1969            .expect("open");
1970        log.append(empty_input()).expect("append 1");
1971        let row1 = read_rows(&path);
1972        assert_eq!(row1.len(), 1);
1973        assert_eq!(row1[0].prev_hash, GENESIS_HASH);
1974
1975        log.append(empty_input()).expect("append 2 (rotates first)");
1976
1977        // Exactly one rotated segment; it gunzips to the original row 1.
1978        let segs = rotated_segments(&path);
1979        assert_eq!(segs.len(), 1, "one .gz segment expected; got {segs:?}");
1980        let archived: Vec<LogRow> = gunzip_to_string(&segs[0])
1981            .lines()
1982            .filter(|l| !l.is_empty())
1983            .map(|l| serde_json::from_str(l).expect("row"))
1984            .collect();
1985        assert_eq!(archived.len(), 1);
1986        assert_eq!(archived[0].this_hash, row1[0].this_hash);
1987
1988        // The fresh access.log restarts the chain at GENESIS, ts_seq 1.
1989        let cur = read_rows(&path);
1990        assert_eq!(cur.len(), 1, "fresh segment holds only the post-rotate row");
1991        assert_eq!(cur[0].prev_hash, GENESIS_HASH);
1992        assert_eq!(cur[0].ts_seq, 1);
1993
1994        // verify_all sees both segments, each its own clean chain.
1995        let reports = verify_all(&path).expect("verify_all");
1996        assert_eq!(reports.len(), 2, "rotated .gz + current");
1997        for (p, r) in &reports {
1998            assert!(r.errors.is_empty(), "segment {p} must verify clean: {r:?}");
1999        }
2000    }
2001
2002    #[test]
2003    fn rotate_log_is_fail_closed_on_missing_source() {
2004        // The append path propagates this via `?`, so a rotation failure
2005        // aborts the fetch (fail-closed) rather than silently continuing.
2006        let dir = TempDir::new().expect("tmp");
2007        let missing = tmp_dir_utf8(&dir).join("nope.log");
2008        let err = rotate_log(&missing).expect_err("missing source must error");
2009        assert!(matches!(err, LogError::Io(_)), "got {err:?}");
2010    }
2011
2012    #[test]
2013    #[serial_test::serial]
2014    fn prune_respects_retention_window_and_disable() {
2015        let dir = TempDir::new().expect("tmp");
2016        let base = tmp_dir_utf8(&dir);
2017        let path = base.join("access.log");
2018        let old_gz = base.join("access.log.2020-01-01-000000.gz");
2019        let new_gz = base.join("access.log.2999-01-01-000000.gz");
2020
2021        let mk = |p: &Utf8Path, aged: bool| {
2022            let f = std::fs::File::create(p.as_std_path()).expect("create gz");
2023            if aged {
2024                // 100 days ago — older than the 90-day default & a 1-day window.
2025                let when =
2026                    std::time::SystemTime::now() - std::time::Duration::from_secs(100 * 86_400);
2027                f.set_modified(when).expect("set mtime");
2028            }
2029        };
2030
2031        // (a) days=0 disables pruning entirely.
2032        mk(&old_gz, true);
2033        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2034        let _ = open_log(&path);
2035        assert!(old_gz.exists(), "days=0 must NOT prune");
2036
2037        // (b) days=1 prunes the aged segment, keeps a fresh one.
2038        mk(&new_gz, false);
2039        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "1");
2040        let _ = open_log(&path);
2041        assert!(!old_gz.exists(), "aged segment must be pruned at days=1");
2042        assert!(new_gz.exists(), "fresh segment must survive");
2043
2044        std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2045    }
2046
2047    #[test]
2048    #[serial_test::serial]
2049    fn retention_days_env_parsing() {
2050        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "0");
2051        assert_eq!(retention_days(), 0);
2052        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "30");
2053        assert_eq!(retention_days(), 30);
2054        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "garbage");
2055        assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2056        std::env::set_var("DOIGET_LOG_RETENTION_DAYS", "-5");
2057        assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2058        std::env::remove_var("DOIGET_LOG_RETENTION_DAYS");
2059        assert_eq!(retention_days(), DEFAULT_RETENTION_DAYS);
2060    }
2061
2062    #[test]
2063    fn verify_all_flags_tampered_segment_independently() {
2064        let dir = TempDir::new().expect("tmp");
2065        let path = tmp_dir_utf8(&dir).join("access.log");
2066        // Inject the tiny threshold (no global env → no cross-test race).
2067        let log = ProvenanceLog::open_with_rotate_threshold(&path, TEST_SESSION_ID.to_string(), 50)
2068            .expect("open");
2069        log.append(empty_input()).expect("append 1");
2070        log.append(empty_input()).expect("append 2 (rotates)");
2071        drop(log);
2072
2073        // Tamper the CURRENT segment's row: set this_hash to a
2074        // syntactically-valid 64-hex string that cannot be the SHA-256
2075        // of any row (all zeros). NOTE: the previous "flip the last char
2076        // to '0'" was a no-op ~1/16 of runs when the real hash already
2077        // ended in '0' (this_hash depends on `Utc::now()`), which is the
2078        // flake this fixes — mirrors `verify_detects_tampered_row_hash`.
2079        let mut cur = read_rows(&path);
2080        let mut bad = cur.remove(0);
2081        bad.this_hash =
2082            "0000000000000000000000000000000000000000000000000000000000000000".to_string();
2083        std::fs::write(
2084            path.as_std_path(),
2085            format!("{}\n", serde_json::to_string(&bad).expect("ser")),
2086        )
2087        .expect("rewrite tampered current");
2088
2089        let reports = verify_all(&path).expect("verify_all");
2090        assert_eq!(reports.len(), 2);
2091        // Oldest first = the rotated .gz (clean); current last (tampered).
2092        let (gz_path, gz_rep) = &reports[0];
2093        let (cur_path, cur_rep) = &reports[1];
2094        assert!(
2095            gz_path.as_str().ends_with(".gz") && gz_rep.errors.is_empty(),
2096            "rotated segment must stay clean: {gz_path} {gz_rep:?}"
2097        );
2098        assert!(
2099            cur_path.file_name() == Some("access.log") && !cur_rep.errors.is_empty(),
2100            "tampered current segment must report issues: {cur_path} {cur_rep:?}"
2101        );
2102    }
2103}