Skip to content

actuarialpy

Experience analysis on a single tidy table. You build one DataFrame — claims/expense, revenue, exposure, by period — and Experience gives you views (by, rolling, trend, completion, seasonality, credibility, pooling) without re-pivoting. numpy and pandas only; no scipy.

Quickstart

import pandas as pd
import actuarialpy as ap

df = pd.DataFrame({
    "month": pd.period_range("2024-01", periods=6, freq="M").astype(str),
    "product": ["PPO"] * 6,
    "paid":    [120_000, 118_000, 125_000, 130_000, 128_000, 135_000],
    "premium": [150_000] * 6,
    "member_months": [1000, 1005, 1010, 1008, 1012, 1015],
})

exp = ap.Experience(df, expense="paid", revenue="premium",
                    exposure="member_months", date="month")

exp.by("product")                              # grouped view
exp.loss_ratio                                 # paid / premium

ap.pmpm(df["paid"], df["member_months"])       # per-member-per-month
ap.loss_ratio(df["paid"], df["premium"])       # as a free function

Retention primitives

The pooling module includes two general retention-stability primitives:

  • retained_cv(outcomes, retention, n_units=1) — coefficient of variation of the retained aggregate of n_units i.i.d. units each capped at retention.
  • retention_for_target_cv(outcomes, n_units, target_cv, ...) — inverts it: the retention at which retained CV hits a target. Basis for a size-graded pooling schedule.

API reference

actuarialpy

ActuarialPy: tools for actuarial experience analysis.

Experience dataclass

Bind an experience dataset to its actuarial column roles.

Experience is the recommended entry point for repeated experience-analysis workflows. It stores common column roles once and delegates calculations to the package's free functions. The object is immutable: methods return DataFrames or new Experience objects rather than changing stored data in place.

Bind count (a claim or service count) to unlock the frequency-severity views: :meth:frequency_severity and :meth:decompose_trend (utilization x unit cost, optionally x mix). :meth:fit_trend regresses a developed trend on the bound history.

Grain matters. Experience aggregates by summing the bound columns, so it expects rows at the grain of the exposure unit -- one row per member-month, with member_months = 1 (or the eligible fraction). If your data is long (one row per service line, so the same member-month repeats across several rows), summing the exposure column overcounts it, and every per-exposure figure -- PMPM, frequency, the loss-ratio denominator -- is wrong by the number of rows per member-month. Experience does not detect this: it has no member key, so it cannot tell a long frame from a wide one. For long or multi-table warehouse data, either aggregate to member-month grain first, or use :meth:bind, which sources exposure from a correctly-grained table (e.g. eligibility) via :class:~actuarialpy.Count and never sums a repeated column.

Source code in actuarialpy/frame.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
@dataclass(frozen=True)
class Experience:
    """Bind an experience dataset to its actuarial column roles.

    ``Experience`` is the recommended entry point for repeated experience-analysis
    workflows. It stores common column roles once and delegates calculations to
    the package's free functions. The object is immutable: methods return
    DataFrames or new ``Experience`` objects rather than changing stored data in
    place.

    Bind ``count`` (a claim or service count) to unlock the frequency-severity views:
    :meth:`frequency_severity` and :meth:`decompose_trend` (utilization x unit cost,
    optionally x mix). :meth:`fit_trend` regresses a developed trend on the bound history.

    **Grain matters.** ``Experience`` aggregates by *summing* the bound columns, so it
    expects rows at the grain of the exposure unit -- one row per member-month, with
    ``member_months`` = 1 (or the eligible fraction). If your data is *long* (one row per
    service line, so the same member-month repeats across several rows), summing the
    exposure column overcounts it, and every per-exposure figure -- PMPM, frequency, the
    loss-ratio denominator -- is wrong by the number of rows per member-month. ``Experience``
    does not detect this: it has no member key, so it cannot tell a long frame from a wide
    one. For long or multi-table warehouse data, either aggregate to member-month grain
    first, or use :meth:`bind`, which sources exposure from a correctly-grained table (e.g.
    eligibility) via :class:`~actuarialpy.Count` and never sums a repeated column.
    """

    data: pd.DataFrame
    expense: str | list[str]
    revenue: str | list[str]
    exposure: str | list[str] | None = None
    date: str | None = None
    profile: str | None = None
    count: str | None = None
    copy: bool = False

    def __post_init__(self) -> None:
        object.__setattr__(self, "expense", as_list(self.expense))
        object.__setattr__(self, "revenue", as_list(self.revenue))
        object.__setattr__(self, "exposure", as_list(self.exposure))
        if self.copy:
            object.__setattr__(self, "data", self.data.copy())

        required = as_list(self.expense) + as_list(self.revenue) + as_list(self.exposure)
        if self.date is not None:
            required.append(self.date)
        if self.count is not None:
            required.append(self.count)
        validate_columns(self.data, required)
        _validate_exposure_names(as_list(self.exposure))
        _validate_numeric_columns(self.data, as_list(self.expense), role="Expense")
        _validate_numeric_columns(self.data, as_list(self.revenue), role="Revenue")
        _validate_numeric_columns(self.data, as_list(self.exposure), role="Exposure")
        if self.count is not None:
            _validate_numeric_columns(self.data, [self.count], role="Count")

    def with_roles(
        self,
        *,
        data: pd.DataFrame | None = None,
        expense: str | list[str] | None = None,
        revenue: str | list[str] | None = None,
        exposure: str | list[str] | None = None,
        date: str | None = None,
        profile: str | None = None,
        count: str | None = None,
        copy: bool | None = None,
    ) -> "Experience":
        """Return a new ``Experience`` object with updated data or roles."""
        return replace(
            self,
            data=self.data if data is None else data,
            expense=self.expense if expense is None else expense,
            revenue=self.revenue if revenue is None else revenue,
            exposure=self.exposure if exposure is None else exposure,
            date=self.date if date is None else date,
            profile=self.profile if profile is None else profile,
            count=self.count if count is None else count,
            copy=self.copy if copy is None else copy,
        )

    def filter(
        self,
        mask: Any | None = None,
        *,
        query: str | None = None,
        copy: bool = True,
    ) -> "Experience":
        """Return a new ``Experience`` object over a filtered dataset.

        Use either a boolean mask or a pandas query string.
        """
        if (mask is None) == (query is None):
            raise ValueError("Pass exactly one of mask or query.")
        if query is not None:
            data = self.data.query(query)
        else:
            data = cast("pd.DataFrame", self.data.loc[mask])
        if copy:
            data = data.copy()
        return self.with_roles(data=data, copy=False)

    def deseasonalize(
        self,
        factors: pd.Series,
        *,
        columns: str | list[str] | None = None,
        freq: str = "M",
        by: str | list[str] | None = None,
        date_col: str | None = None,
    ) -> "Experience":
        """Return a new ``Experience`` with the seasonal pattern divided out.

        Each selected column is divided by its row's seasonal factor (as produced by
        :func:`seasonality_factors`), in place under the same name, so every
        downstream view -- :meth:`trend`, :meth:`rolling`, :meth:`by`, and the rest --
        then operates on the deseasonalized series. By default the expense
        (loss / claims) columns are adjusted; pass ``columns`` to choose others. Only
        the numerator is touched: exposure is left alone, so a deseasonalized PMPM is
        simply deseasonalized claims over unchanged member months.

        ``factors`` may be a flat Series (one pattern) or a tidy per-segment table from
        :func:`seasonality_factors_by`; with the latter, pass ``by`` naming the grouping
        column(s) to join on group plus season. Estimate factors on the broader pool,
        not on this object's own (often thin) data. To put the pattern back, apply
        :func:`apply_seasonality` to ``.data``.
        """
        resolved_date = self._resolve_date_col(date_col)
        cols = as_list(columns) if columns is not None else as_list(self.expense)
        if not cols:
            raise ValueError("No columns to deseasonalize; pass columns=... or bind an expense role.")
        validate_columns(self.data, cols + [resolved_date] + as_list(by))
        data = self.data.copy()
        for col in cols:
            data = _deseasonalize(
                data, factors, date_col=resolved_date, value_col=col, freq=freq, by=by, out_col=col, copy=False
            )
        return self.with_roles(data=data, copy=False)

    def complete(
        self,
        factors: pd.Series,
        *,
        valuation_date: Any = None,
        columns: str | list[str] | None = None,
        development_col: str | None = None,
        by: str | list[str] | None = None,
        date_col: str | None = None,
    ) -> "Experience":
        """Return a new ``Experience`` with paid amounts developed to ultimate.

        Grosses the expense (loss / claims) columns up to estimated ultimate in place
        under the same names -- ``completed = paid / completion_factor`` -- so downstream
        views (:meth:`trend`, :meth:`rolling`, :meth:`by`, ...) then run on the completed
        series. Each row's development period is
        ``development_months(date, valuation_date)`` (the convention
        :func:`make_completion_triangle` uses), or an explicit ``development_col``. The
        join is by value, so the frame's index is irrelevant; rows past the triangle's
        last development period are taken as fully complete, and only recent, immature
        months actually move.

        ``factors`` may be a flat Series (one pattern, from :func:`completion_factors`)
        or a tidy per-segment table from :func:`completion_factors_by`; with the latter,
        pass ``by`` naming the grouping column(s) to join on group plus development
        period. Only the numerator is developed -- exposure is left untouched. This
        applies to the latest-diagonal shape (one row per incurred month, ``claims``
        paid-to-date as of ``valuation_date``); a frame already on an ultimate basis must
        not be completed again.
        """
        cols = as_list(columns) if columns is not None else as_list(self.expense)
        if not cols:
            raise ValueError("No columns to complete; pass columns=... or bind an expense role.")
        if development_col is None:
            resolved_date = self._resolve_date_col(date_col)
            validate_columns(self.data, cols + [resolved_date] + as_list(by))
        else:
            resolved_date = None
            validate_columns(self.data, cols + [development_col] + as_list(by))
        data = self.data.copy()
        for col in cols:
            data = _apply_completion(
                data,
                factors,
                value_col=col,
                date_col=resolved_date,
                valuation_date=valuation_date,
                development_col=development_col,
                by=by,
                out_col=col,
                copy=False,
            )
        return self.with_roles(data=data, copy=False)

    def adjust(
        self,
        factors: float | int | pd.Series | pd.DataFrame,
        *,
        on: str | list[str] | None = None,
        columns: str | list[str] | None = None,
        by: str | list[str] | None = None,
        how: str = "multiply",
        factor_col: str = "factor",
        audit_col: str | None = None,
        default: float | None = None,
    ) -> "Experience":
        """Return a new ``Experience`` with an expense column restated by a factor.

        The general counterpart to :meth:`complete` and :meth:`deseasonalize`: joins a
        factor by the key ``on`` (a column already in the frame, optionally within ``by``
        segments) and multiplies -- or, with ``how="divide"``, divides -- the selected
        column(s) in place under the same name, so every downstream view composes on the
        restated series. ``factors`` is a scalar (one factor for all rows), a Series
        indexed by ``on``, or a tidy DataFrame keyed by ``by + on``.

        This is the spine of experience-period restatement -- trend, benefit / area /
        demographic relativities, network discounts -- where the methodology is supplied
        as the factors rather than encoded here. Chain freely
        (``exp.complete(...).adjust(trend).adjust(area, on="region")``); with ``audit_col``
        the cumulative restatement multiplier is carried across the chain, one value per
        row, for a reviewable audit trail. An absent key surfaces as ``NaN`` unless
        ``default`` is given (``default=1.0`` to mean "no adjustment for this key").
        """
        cols = as_list(columns) if columns is not None else as_list(self.expense)
        if not cols:
            raise ValueError("No columns to adjust; pass columns=... or bind an expense role.")
        validate_columns(self.data, cols + as_list(on) + as_list(by))
        data = self.data.copy()
        for col in cols:
            data = _adjust(
                data,
                factors,
                value_col=col,
                on=on,
                by=by,
                how=how,
                factor_col=factor_col,
                out_col=col,
                audit_col=audit_col,
                default=default,
                copy=False,
            )
        return self.with_roles(data=data, copy=False)

    def by(self, groupby: str | list[str] | None = None, **kwargs: Any) -> pd.DataFrame:
        """Summarize experience by optional grouping columns."""
        return summarize_experience(
            self.data,
            groupby=groupby,
            expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
            revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
            exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
            profile=kwargs.pop("profile", self.profile),
            **kwargs,
        )

    def views(self, views: dict[str, str | Iterable[str] | None], **kwargs: Any) -> dict[str, pd.DataFrame]:
        """Create several named grouped experience views."""
        return summarize_views(
            self.data,
            views=views,
            expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
            revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
            exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
            profile=kwargs.pop("profile", self.profile),
            **kwargs,
        )

    def rolling(
        self,
        window: int = 12,
        *,
        groupby: str | list[str] | None = None,
        date_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Create a rolling-period experience summary."""
        resolved_date = self._resolve_date_col(date_col)
        return rolling_summary(
            self.data,
            date_col=resolved_date,
            window=window,
            groupby=groupby,
            expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
            revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
            exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
            **kwargs,
        )

    def trend(
        self,
        *,
        amount_col: str | None = None,
        exposure_col: str | None = None,
        groupby: str | list[str] | None = None,
        date_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Compare amount or per-exposure experience between two periods."""
        data, resolved_amount = self._data_with_amount(amount_col)
        # Use the bound date column only for date-range comparisons. If the
        # caller supplies period_col/prior_period/current_period, passing the
        # bound date column would create two comparison modes and incorrectly
        # raise an error.
        resolved_date = date_col if date_col is not None else self.date
        if "period_col" in kwargs and date_col is None:
            resolved_date = None
        return trend_summary(
            data,
            amount_col=resolved_amount,
            exposure_col=exposure_col or self._single_exposure_or_none(),
            groupby=groupby,
            date_col=resolved_date,
            **kwargs,
        )

    def frequency_severity(
        self,
        *,
        count_col: str | None = None,
        loss_col: str | None = None,
        exposure_col: str | None = None,
        groupby: str | list[str] | None = None,
        annualization: float = 12,
    ) -> pd.DataFrame:
        """Per-group claim frequency, severity, and PMPM (see ``frequency_severity_summary``).

        Uses the bound ``count``, ``expense`` (as the loss), and ``exposure`` roles, so the
        columns are specified once on the object. The identity ``pmpm == frequency *
        severity`` holds for every row.
        """
        data, resolved_loss = self._data_with_amount(loss_col)
        return frequency_severity_summary(
            data,
            count_col=self._resolve_count(count_col),
            loss_col=resolved_loss,
            exposure_col=self._resolve_exposure(exposure_col),
            groupby=groupby,
            annualization=annualization,
        )

    def decompose_trend(
        self,
        *,
        count_col: str | None = None,
        loss_col: str | None = None,
        exposure_col: str | None = None,
        mix_by: str | Iterable[str] | None = None,
        groupby: str | list[str] | None = None,
        period_col: str | None = None,
        prior_period: Any = None,
        current_period: Any = None,
        date_col: str | None = None,
        prior_start: Any = None,
        prior_end: Any = None,
        current_start: Any = None,
        current_end: Any = None,
        prior_filter: Any = None,
        current_filter: Any = None,
        annualization: float = 12,
    ) -> pd.DataFrame:
        """Decompose the PMPM trend between two periods of the bound data.

        Splits the bound frame into prior and current with the same comparison modes as
        :meth:`trend` -- ``period_col`` with ``prior_period`` / ``current_period``, a
        ``date_col`` with prior/current ranges (the bound ``date`` is used when no
        ``date_col`` is passed), or explicit ``prior_filter`` / ``current_filter`` masks --
        then decomposes the change via :func:`decompose_pmpm_trend`, using the bound
        ``count``, ``expense`` (as the loss), and ``exposure`` roles. Pass ``mix_by`` to add
        the third LMDI mix term; ``groupby`` reports one decomposition per group.
        """
        resolved_count = self._resolve_count(count_col)
        resolved_exposure = self._resolve_exposure(exposure_col)
        data, resolved_loss = self._data_with_amount(loss_col)
        date_mode = any(v is not None for v in (date_col, prior_start, prior_end, current_start, current_end))
        resolved_date = (date_col if date_col is not None else self.date) if date_mode else None
        prior_mask, current_mask, _ = _comparison_masks(
            data,
            period_col=period_col,
            prior_period=prior_period,
            current_period=current_period,
            date_col=resolved_date,
            prior_start=prior_start,
            prior_end=prior_end,
            current_start=current_start,
            current_end=current_end,
            prior_filter=prior_filter,
            current_filter=current_filter,
        )
        return decompose_pmpm_trend(
            data.loc[prior_mask],
            data.loc[current_mask],
            count_col=resolved_count,
            loss_col=resolved_loss,
            exposure_col=resolved_exposure,
            on=groupby,
            mix_by=mix_by,
            annualization=annualization,
        )

    def fit_trend(
        self,
        *,
        value_col: str | None = None,
        exposure_col: str | None = None,
        date_col: str | None = None,
        freq: str = "M",
        min_periods: int = 3,
        confidence: float = 0.95,
    ) -> TrendFit:
        """Fit an exponential trend to the bound experience by log-linear regression.

        Defaults to the bound ``expense`` (claims) over the bound ``exposure`` -- the PMPM
        trend -- across the bound ``date``; pass ``value_col`` / ``exposure_col`` to
        override, or leave the exposure unbound to trend the raw amount. Returns a
        ``TrendFit`` (see :func:`fit_trend`). Run on completed, deseasonalized history.
        """
        data, resolved_value = self._data_with_amount(value_col)
        resolved_exposure = exposure_col if exposure_col is not None else self._single_exposure_or_none()
        return _fit_trend(
            data,
            value_col=resolved_value,
            date_col=self._resolve_date_col(date_col),
            exposure_col=resolved_exposure,
            freq=freq,
            min_periods=min_periods,
            confidence=confidence,
        )

    def components(
        self,
        component_cols: str | list[str],
        *,
        exposure_col: str | None = None,
        groupby: str | list[str] | None = None,
        date_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Explain component drivers between two periods."""
        # Use the bound date column only for date-range comparisons. If the
        # caller supplies period_col/prior_period/current_period, passing the
        # bound date column would create two comparison modes and incorrectly
        # raise an error.
        resolved_date = date_col if date_col is not None else self.date
        if "period_col" in kwargs and date_col is None:
            resolved_date = None
        return component_driver_analysis(
            self.data,
            component_cols=component_cols,
            exposure_col=exposure_col or self._single_exposure_or_none(),
            groupby=groupby,
            date_col=resolved_date,
            **kwargs,
        )

    def component_summary(
        self,
        component_cols: str | list[str],
        *,
        groupby: str | list[str] | None = None,
        exposure_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Summarize component amounts, per-exposure values, and shares."""
        return summarize_components(
            self.data,
            groupby=groupby,
            component_cols=component_cols,
            exposure_col=exposure_col or self._single_exposure_or_none(),
            **kwargs,
        )

    def actual_vs_expected(
        self,
        expected: str | list[str],
        *,
        actual: str | list[str] | None = None,
        groupby: str | list[str] | None = None,
        exposure: str | list[str] | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Summarize actual-versus-expected experience.

        If ``actual`` is omitted, the object's bound expense columns are used.
        """
        return summarize_actual_vs_expected(
            self.data,
            groupby=groupby,
            actual_cols=self.expense if actual is None else actual,
            expected_cols=expected,
            exposure_cols=self.exposure if exposure is None else exposure,
            **kwargs,
        )

    def claimants(
        self,
        claimant_col: str,
        *,
        amount_cols: str | list[str] | None = None,
        groupby: str | list[str] | None = None,
        exposure_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Aggregate the experience to claimant/member/risk level."""
        return summarize_claimants(
            self.data,
            claimant_col=claimant_col,
            amount_cols=self.expense if amount_cols is None else amount_cols,
            groupby=groupby,
            exposure_col=exposure_col,
            **kwargs,
        )

    def top_claimants(
        self,
        claimant_col: str,
        *,
        amount_cols: str | list[str] | None = None,
        amount_col: str | None = None,
        groupby: str | list[str] | None = None,
        n: int = 25,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Return top claimants by amount."""
        return top_claimants(
            self.data,
            claimant_col=claimant_col,
            amount_cols=self.expense if amount_cols is None and amount_col is None else amount_cols,
            amount_col=amount_col,
            groupby=groupby,
            n=n,
            **kwargs,
        )

    def claimant_concentration(
        self,
        claimant_col: str,
        *,
        amount_cols: str | list[str] | None = None,
        groupby: str | list[str] | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Summarize how concentrated experience is among top claimants."""
        claimant_summary = summarize_claimants(
            self.data,
            claimant_col=claimant_col,
            amount_cols=self.expense if amount_cols is None else amount_cols,
            groupby=groupby,
        )
        return claim_concentration(claimant_summary, groupby=groupby, **kwargs)

    def cohort(
        self,
        *,
        entity_col: str,
        start_date_col: str,
        duration_months: int = 12,
        groupby: str | list[str] | None = None,
        date_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Summarize each entity's first N months or cohort-duration window."""
        return cohort_summary(
            self.data,
            entity_col=entity_col,
            date_col=self._resolve_date_col(date_col),
            start_date_col=start_date_col,
            duration_months=duration_months,
            groupby=groupby,
            expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
            revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
            exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
            profile=kwargs.pop("profile", self.profile),
            **kwargs,
        )

    def duration(
        self,
        *,
        entity_col: str,
        start_date_col: str,
        max_duration_month: int | None = None,
        date_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Summarize experience by duration month since entity start."""
        return duration_summary(
            self.data,
            entity_col=entity_col,
            date_col=self._resolve_date_col(date_col),
            start_date_col=start_date_col,
            expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
            revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
            exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
            max_duration_month=max_duration_month,
            **kwargs,
        )

    def by_status(self, status_col: str, *, entity_col: str | None = None, **kwargs: Any) -> pd.DataFrame:
        """Summarize experience by a status column."""
        return status_summary(
            self.data,
            status_col=status_col,
            entity_col=entity_col,
            expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
            revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
            exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
            profile=kwargs.pop("profile", self.profile),
            **kwargs,
        )

    def with_status(
        self,
        *,
        effective_col: str,
        as_of: Any,
        termination_col: str | None = None,
        first_year_months: int = 12,
        status_col: str = "status",
        labels: dict[str, str] | None = None,
    ) -> "Experience":
        """Return a new ``Experience`` with a derived lifecycle status column.

        Derives active / first-year / termed from effective and termination dates
        as of a reference date (see :func:`actuarialpy.derive_status`). Summarize
        the result with :meth:`by_status`.
        """
        data = derive_status(
            self.data,
            effective_col=effective_col,
            as_of=as_of,
            termination_col=termination_col,
            first_year_months=first_year_months,
            status_col=status_col,
            labels=labels,
        )
        return self.with_roles(data=data, copy=False)

    def by_band(
        self,
        value_col: str,
        bands: Any,
        *,
        labels: Any = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Summarize experience by a size band on ``value_col`` (see ``summarize_by_band``)."""
        return summarize_by_band(
            self.data,
            value_col,
            bands,
            labels=labels,
            expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
            revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
            exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
            profile=kwargs.pop("profile", self.profile),
            **kwargs,
        )

    def margin(
        self,
        groupby: str | list[str] | None = None,
        *,
        margin_col: str = "margin",
        ratio_col: str = "margin_ratio",
        per_exposure_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Underwriting margin (revenue net of expense) by optional grouping.

        Aggregates the bound expense and revenue roles with :meth:`by`, then adds
        the margin (``total_revenue - total_expense``), the margin ratio, and an
        optional per-exposure margin.
        """
        summary = self.by(groupby, **kwargs)
        summary[margin_col] = summary["total_revenue"] - summary["total_expense"]
        summary[ratio_col] = safe_divide(summary[margin_col], summary["total_revenue"])
        if per_exposure_col is not None:
            exposure = self._single_exposure_or_none()
            if exposure is None:
                raise ValueError("A single bound exposure is required for per_exposure_col.")
            summary[per_exposure_col] = per_exposure(summary[margin_col], summary[exposure])
        return summary

    def credibility_weighted(
        self,
        groupby: str | list[str],
        *,
        z: Any,
        metric: str = "loss_ratio",
        complement: float | None = None,
        out_col: str | None = None,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Blend each group's ``metric`` with a complement at credibility ``z``.

        Computes the grouped summary (:meth:`by`), then blends ``metric`` toward
        ``complement`` using ``z`` (see
        :func:`actuarialpy.credibility_weighted_estimate`). ``z`` may be a scalar
        or values aligned to the grouped rows. When ``complement`` is omitted the
        book-level value of ``metric`` is used as the complement of credibility.
        """
        summary = self.by(groupby, **kwargs)
        if metric not in summary.columns:
            raise ValueError(f"metric '{metric}' is not in the summary columns: {list(summary.columns)}")
        if complement is None:
            complement = self.by(**kwargs)[metric].iloc[0]
        name = out_col or f"credibility_weighted_{metric}"
        summary[name] = credibility_weighted_estimate(summary[metric], complement, z)
        return summary

    def pool_claimants(
        self,
        claimant_col: str,
        pooling_point: float,
        *,
        amount_cols: str | list[str] | None = None,
        groupby: str | list[str] | None = None,
        amount_name: str = "total_expense",
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Aggregate to claimant level and split each claimant into pooled/excess.

        Summarizes the experience to claimant grain (:meth:`claimants`) and caps
        each claimant's total at ``pooling_point`` (see
        :func:`actuarialpy.pool_losses`), returning pooled and excess columns for
        capped experience and the excess hand-off to tail modeling.
        """
        claimant_totals = summarize_claimants(
            self.data,
            claimant_col=claimant_col,
            amount_cols=self.expense if amount_cols is None else amount_cols,
            groupby=groupby,
            amount_name=amount_name,
        )
        return pool_losses(claimant_totals, amount_name, pooling_point, **kwargs)

    def _resolve_date_col(self, date_col: str | None) -> str:
        resolved = date_col or self.date
        if resolved is None:
            raise ValueError("A date column is required. Pass date=... to Experience or date_col=... to this method.")
        return resolved

    def _resolve_count(self, count_col: str | None) -> str:
        resolved = count_col or self.count
        if resolved is None:
            raise ValueError(
                "A count column is required. Pass count=... to Experience or count_col=... to this method."
            )
        validate_columns(self.data, [resolved])
        return resolved

    def _resolve_exposure(self, exposure_col: str | None) -> str:
        if exposure_col is not None:
            validate_columns(self.data, [exposure_col])
            return exposure_col
        resolved = self._single_exposure_or_none()
        if resolved is None:
            raise ValueError(
                "An exposure column is required for this method. Pass exposure=... to Experience "
                "or exposure_col=... to this method."
            )
        return resolved

    def _single_exposure_or_none(self) -> str | None:
        exposures = as_list(self.exposure)
        if not exposures:
            return None
        if len(exposures) > 1:
            raise ValueError("Multiple exposures are bound. Pass exposure_col explicitly for this method.")
        return exposures[0]

    def _data_with_amount(self, amount_col: str | None) -> tuple[pd.DataFrame, str]:
        if amount_col is not None:
            validate_columns(self.data, [amount_col])
            return self.data, amount_col
        expenses = as_list(self.expense)
        if len(expenses) == 1:
            return self.data, expenses[0]
        temp = self.data.copy()
        amount_name = "_actuarialpy_total_expense"
        temp[amount_name] = sum_columns(temp, expenses)
        return temp, amount_name
with_roles
with_roles(
    *,
    data: DataFrame | None = None,
    expense: str | list[str] | None = None,
    revenue: str | list[str] | None = None,
    exposure: str | list[str] | None = None,
    date: str | None = None,
    profile: str | None = None,
    count: str | None = None,
    copy: bool | None = None
) -> "Experience"

Return a new Experience object with updated data or roles.

Source code in actuarialpy/frame.py
def with_roles(
    self,
    *,
    data: pd.DataFrame | None = None,
    expense: str | list[str] | None = None,
    revenue: str | list[str] | None = None,
    exposure: str | list[str] | None = None,
    date: str | None = None,
    profile: str | None = None,
    count: str | None = None,
    copy: bool | None = None,
) -> "Experience":
    """Return a new ``Experience`` object with updated data or roles."""
    return replace(
        self,
        data=self.data if data is None else data,
        expense=self.expense if expense is None else expense,
        revenue=self.revenue if revenue is None else revenue,
        exposure=self.exposure if exposure is None else exposure,
        date=self.date if date is None else date,
        profile=self.profile if profile is None else profile,
        count=self.count if count is None else count,
        copy=self.copy if copy is None else copy,
    )
filter
filter(
    mask: Any | None = None,
    *,
    query: str | None = None,
    copy: bool = True
) -> "Experience"

Return a new Experience object over a filtered dataset.

Use either a boolean mask or a pandas query string.

Source code in actuarialpy/frame.py
def filter(
    self,
    mask: Any | None = None,
    *,
    query: str | None = None,
    copy: bool = True,
) -> "Experience":
    """Return a new ``Experience`` object over a filtered dataset.

    Use either a boolean mask or a pandas query string.
    """
    if (mask is None) == (query is None):
        raise ValueError("Pass exactly one of mask or query.")
    if query is not None:
        data = self.data.query(query)
    else:
        data = cast("pd.DataFrame", self.data.loc[mask])
    if copy:
        data = data.copy()
    return self.with_roles(data=data, copy=False)
deseasonalize
deseasonalize(
    factors: Series,
    *,
    columns: str | list[str] | None = None,
    freq: str = "M",
    by: str | list[str] | None = None,
    date_col: str | None = None
) -> "Experience"

Return a new Experience with the seasonal pattern divided out.

Each selected column is divided by its row's seasonal factor (as produced by :func:seasonality_factors), in place under the same name, so every downstream view -- :meth:trend, :meth:rolling, :meth:by, and the rest -- then operates on the deseasonalized series. By default the expense (loss / claims) columns are adjusted; pass columns to choose others. Only the numerator is touched: exposure is left alone, so a deseasonalized PMPM is simply deseasonalized claims over unchanged member months.

factors may be a flat Series (one pattern) or a tidy per-segment table from :func:seasonality_factors_by; with the latter, pass by naming the grouping column(s) to join on group plus season. Estimate factors on the broader pool, not on this object's own (often thin) data. To put the pattern back, apply :func:apply_seasonality to .data.

Source code in actuarialpy/frame.py
def deseasonalize(
    self,
    factors: pd.Series,
    *,
    columns: str | list[str] | None = None,
    freq: str = "M",
    by: str | list[str] | None = None,
    date_col: str | None = None,
) -> "Experience":
    """Return a new ``Experience`` with the seasonal pattern divided out.

    Each selected column is divided by its row's seasonal factor (as produced by
    :func:`seasonality_factors`), in place under the same name, so every
    downstream view -- :meth:`trend`, :meth:`rolling`, :meth:`by`, and the rest --
    then operates on the deseasonalized series. By default the expense
    (loss / claims) columns are adjusted; pass ``columns`` to choose others. Only
    the numerator is touched: exposure is left alone, so a deseasonalized PMPM is
    simply deseasonalized claims over unchanged member months.

    ``factors`` may be a flat Series (one pattern) or a tidy per-segment table from
    :func:`seasonality_factors_by`; with the latter, pass ``by`` naming the grouping
    column(s) to join on group plus season. Estimate factors on the broader pool,
    not on this object's own (often thin) data. To put the pattern back, apply
    :func:`apply_seasonality` to ``.data``.
    """
    resolved_date = self._resolve_date_col(date_col)
    cols = as_list(columns) if columns is not None else as_list(self.expense)
    if not cols:
        raise ValueError("No columns to deseasonalize; pass columns=... or bind an expense role.")
    validate_columns(self.data, cols + [resolved_date] + as_list(by))
    data = self.data.copy()
    for col in cols:
        data = _deseasonalize(
            data, factors, date_col=resolved_date, value_col=col, freq=freq, by=by, out_col=col, copy=False
        )
    return self.with_roles(data=data, copy=False)
complete
complete(
    factors: Series,
    *,
    valuation_date: Any = None,
    columns: str | list[str] | None = None,
    development_col: str | None = None,
    by: str | list[str] | None = None,
    date_col: str | None = None
) -> "Experience"

Return a new Experience with paid amounts developed to ultimate.

Grosses the expense (loss / claims) columns up to estimated ultimate in place under the same names -- completed = paid / completion_factor -- so downstream views (:meth:trend, :meth:rolling, :meth:by, ...) then run on the completed series. Each row's development period is development_months(date, valuation_date) (the convention :func:make_completion_triangle uses), or an explicit development_col. The join is by value, so the frame's index is irrelevant; rows past the triangle's last development period are taken as fully complete, and only recent, immature months actually move.

factors may be a flat Series (one pattern, from :func:completion_factors) or a tidy per-segment table from :func:completion_factors_by; with the latter, pass by naming the grouping column(s) to join on group plus development period. Only the numerator is developed -- exposure is left untouched. This applies to the latest-diagonal shape (one row per incurred month, claims paid-to-date as of valuation_date); a frame already on an ultimate basis must not be completed again.

Source code in actuarialpy/frame.py
def complete(
    self,
    factors: pd.Series,
    *,
    valuation_date: Any = None,
    columns: str | list[str] | None = None,
    development_col: str | None = None,
    by: str | list[str] | None = None,
    date_col: str | None = None,
) -> "Experience":
    """Return a new ``Experience`` with paid amounts developed to ultimate.

    Grosses the expense (loss / claims) columns up to estimated ultimate in place
    under the same names -- ``completed = paid / completion_factor`` -- so downstream
    views (:meth:`trend`, :meth:`rolling`, :meth:`by`, ...) then run on the completed
    series. Each row's development period is
    ``development_months(date, valuation_date)`` (the convention
    :func:`make_completion_triangle` uses), or an explicit ``development_col``. The
    join is by value, so the frame's index is irrelevant; rows past the triangle's
    last development period are taken as fully complete, and only recent, immature
    months actually move.

    ``factors`` may be a flat Series (one pattern, from :func:`completion_factors`)
    or a tidy per-segment table from :func:`completion_factors_by`; with the latter,
    pass ``by`` naming the grouping column(s) to join on group plus development
    period. Only the numerator is developed -- exposure is left untouched. This
    applies to the latest-diagonal shape (one row per incurred month, ``claims``
    paid-to-date as of ``valuation_date``); a frame already on an ultimate basis must
    not be completed again.
    """
    cols = as_list(columns) if columns is not None else as_list(self.expense)
    if not cols:
        raise ValueError("No columns to complete; pass columns=... or bind an expense role.")
    if development_col is None:
        resolved_date = self._resolve_date_col(date_col)
        validate_columns(self.data, cols + [resolved_date] + as_list(by))
    else:
        resolved_date = None
        validate_columns(self.data, cols + [development_col] + as_list(by))
    data = self.data.copy()
    for col in cols:
        data = _apply_completion(
            data,
            factors,
            value_col=col,
            date_col=resolved_date,
            valuation_date=valuation_date,
            development_col=development_col,
            by=by,
            out_col=col,
            copy=False,
        )
    return self.with_roles(data=data, copy=False)
adjust
adjust(
    factors: float | int | Series | DataFrame,
    *,
    on: str | list[str] | None = None,
    columns: str | list[str] | None = None,
    by: str | list[str] | None = None,
    how: str = "multiply",
    factor_col: str = "factor",
    audit_col: str | None = None,
    default: float | None = None
) -> "Experience"

Return a new Experience with an expense column restated by a factor.

The general counterpart to :meth:complete and :meth:deseasonalize: joins a factor by the key on (a column already in the frame, optionally within by segments) and multiplies -- or, with how="divide", divides -- the selected column(s) in place under the same name, so every downstream view composes on the restated series. factors is a scalar (one factor for all rows), a Series indexed by on, or a tidy DataFrame keyed by by + on.

This is the spine of experience-period restatement -- trend, benefit / area / demographic relativities, network discounts -- where the methodology is supplied as the factors rather than encoded here. Chain freely (exp.complete(...).adjust(trend).adjust(area, on="region")); with audit_col the cumulative restatement multiplier is carried across the chain, one value per row, for a reviewable audit trail. An absent key surfaces as NaN unless default is given (default=1.0 to mean "no adjustment for this key").

Source code in actuarialpy/frame.py
def adjust(
    self,
    factors: float | int | pd.Series | pd.DataFrame,
    *,
    on: str | list[str] | None = None,
    columns: str | list[str] | None = None,
    by: str | list[str] | None = None,
    how: str = "multiply",
    factor_col: str = "factor",
    audit_col: str | None = None,
    default: float | None = None,
) -> "Experience":
    """Return a new ``Experience`` with an expense column restated by a factor.

    The general counterpart to :meth:`complete` and :meth:`deseasonalize`: joins a
    factor by the key ``on`` (a column already in the frame, optionally within ``by``
    segments) and multiplies -- or, with ``how="divide"``, divides -- the selected
    column(s) in place under the same name, so every downstream view composes on the
    restated series. ``factors`` is a scalar (one factor for all rows), a Series
    indexed by ``on``, or a tidy DataFrame keyed by ``by + on``.

    This is the spine of experience-period restatement -- trend, benefit / area /
    demographic relativities, network discounts -- where the methodology is supplied
    as the factors rather than encoded here. Chain freely
    (``exp.complete(...).adjust(trend).adjust(area, on="region")``); with ``audit_col``
    the cumulative restatement multiplier is carried across the chain, one value per
    row, for a reviewable audit trail. An absent key surfaces as ``NaN`` unless
    ``default`` is given (``default=1.0`` to mean "no adjustment for this key").
    """
    cols = as_list(columns) if columns is not None else as_list(self.expense)
    if not cols:
        raise ValueError("No columns to adjust; pass columns=... or bind an expense role.")
    validate_columns(self.data, cols + as_list(on) + as_list(by))
    data = self.data.copy()
    for col in cols:
        data = _adjust(
            data,
            factors,
            value_col=col,
            on=on,
            by=by,
            how=how,
            factor_col=factor_col,
            out_col=col,
            audit_col=audit_col,
            default=default,
            copy=False,
        )
    return self.with_roles(data=data, copy=False)
by
by(
    groupby: str | list[str] | None = None, **kwargs: Any
) -> pd.DataFrame

Summarize experience by optional grouping columns.

Source code in actuarialpy/frame.py
def by(self, groupby: str | list[str] | None = None, **kwargs: Any) -> pd.DataFrame:
    """Summarize experience by optional grouping columns."""
    return summarize_experience(
        self.data,
        groupby=groupby,
        expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
        revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
        exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
        profile=kwargs.pop("profile", self.profile),
        **kwargs,
    )
views
views(
    views: dict[str, str | Iterable[str] | None],
    **kwargs: Any
) -> dict[str, pd.DataFrame]

Create several named grouped experience views.

Source code in actuarialpy/frame.py
def views(self, views: dict[str, str | Iterable[str] | None], **kwargs: Any) -> dict[str, pd.DataFrame]:
    """Create several named grouped experience views."""
    return summarize_views(
        self.data,
        views=views,
        expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
        revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
        exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
        profile=kwargs.pop("profile", self.profile),
        **kwargs,
    )
rolling
rolling(
    window: int = 12,
    *,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Create a rolling-period experience summary.

Source code in actuarialpy/frame.py
def rolling(
    self,
    window: int = 12,
    *,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Create a rolling-period experience summary."""
    resolved_date = self._resolve_date_col(date_col)
    return rolling_summary(
        self.data,
        date_col=resolved_date,
        window=window,
        groupby=groupby,
        expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
        revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
        exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
        **kwargs,
    )
trend
trend(
    *,
    amount_col: str | None = None,
    exposure_col: str | None = None,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Compare amount or per-exposure experience between two periods.

Source code in actuarialpy/frame.py
def trend(
    self,
    *,
    amount_col: str | None = None,
    exposure_col: str | None = None,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Compare amount or per-exposure experience between two periods."""
    data, resolved_amount = self._data_with_amount(amount_col)
    # Use the bound date column only for date-range comparisons. If the
    # caller supplies period_col/prior_period/current_period, passing the
    # bound date column would create two comparison modes and incorrectly
    # raise an error.
    resolved_date = date_col if date_col is not None else self.date
    if "period_col" in kwargs and date_col is None:
        resolved_date = None
    return trend_summary(
        data,
        amount_col=resolved_amount,
        exposure_col=exposure_col or self._single_exposure_or_none(),
        groupby=groupby,
        date_col=resolved_date,
        **kwargs,
    )
frequency_severity
frequency_severity(
    *,
    count_col: str | None = None,
    loss_col: str | None = None,
    exposure_col: str | None = None,
    groupby: str | list[str] | None = None,
    annualization: float = 12
) -> pd.DataFrame

Per-group claim frequency, severity, and PMPM (see frequency_severity_summary).

Uses the bound count, expense (as the loss), and exposure roles, so the columns are specified once on the object. The identity pmpm == frequency * severity holds for every row.

Source code in actuarialpy/frame.py
def frequency_severity(
    self,
    *,
    count_col: str | None = None,
    loss_col: str | None = None,
    exposure_col: str | None = None,
    groupby: str | list[str] | None = None,
    annualization: float = 12,
) -> pd.DataFrame:
    """Per-group claim frequency, severity, and PMPM (see ``frequency_severity_summary``).

    Uses the bound ``count``, ``expense`` (as the loss), and ``exposure`` roles, so the
    columns are specified once on the object. The identity ``pmpm == frequency *
    severity`` holds for every row.
    """
    data, resolved_loss = self._data_with_amount(loss_col)
    return frequency_severity_summary(
        data,
        count_col=self._resolve_count(count_col),
        loss_col=resolved_loss,
        exposure_col=self._resolve_exposure(exposure_col),
        groupby=groupby,
        annualization=annualization,
    )
decompose_trend
decompose_trend(
    *,
    count_col: str | None = None,
    loss_col: str | None = None,
    exposure_col: str | None = None,
    mix_by: str | Iterable[str] | None = None,
    groupby: str | list[str] | None = None,
    period_col: str | None = None,
    prior_period: Any = None,
    current_period: Any = None,
    date_col: str | None = None,
    prior_start: Any = None,
    prior_end: Any = None,
    current_start: Any = None,
    current_end: Any = None,
    prior_filter: Any = None,
    current_filter: Any = None,
    annualization: float = 12
) -> pd.DataFrame

Decompose the PMPM trend between two periods of the bound data.

Splits the bound frame into prior and current with the same comparison modes as :meth:trend -- period_col with prior_period / current_period, a date_col with prior/current ranges (the bound date is used when no date_col is passed), or explicit prior_filter / current_filter masks -- then decomposes the change via :func:decompose_pmpm_trend, using the bound count, expense (as the loss), and exposure roles. Pass mix_by to add the third LMDI mix term; groupby reports one decomposition per group.

Source code in actuarialpy/frame.py
def decompose_trend(
    self,
    *,
    count_col: str | None = None,
    loss_col: str | None = None,
    exposure_col: str | None = None,
    mix_by: str | Iterable[str] | None = None,
    groupby: str | list[str] | None = None,
    period_col: str | None = None,
    prior_period: Any = None,
    current_period: Any = None,
    date_col: str | None = None,
    prior_start: Any = None,
    prior_end: Any = None,
    current_start: Any = None,
    current_end: Any = None,
    prior_filter: Any = None,
    current_filter: Any = None,
    annualization: float = 12,
) -> pd.DataFrame:
    """Decompose the PMPM trend between two periods of the bound data.

    Splits the bound frame into prior and current with the same comparison modes as
    :meth:`trend` -- ``period_col`` with ``prior_period`` / ``current_period``, a
    ``date_col`` with prior/current ranges (the bound ``date`` is used when no
    ``date_col`` is passed), or explicit ``prior_filter`` / ``current_filter`` masks --
    then decomposes the change via :func:`decompose_pmpm_trend`, using the bound
    ``count``, ``expense`` (as the loss), and ``exposure`` roles. Pass ``mix_by`` to add
    the third LMDI mix term; ``groupby`` reports one decomposition per group.
    """
    resolved_count = self._resolve_count(count_col)
    resolved_exposure = self._resolve_exposure(exposure_col)
    data, resolved_loss = self._data_with_amount(loss_col)
    date_mode = any(v is not None for v in (date_col, prior_start, prior_end, current_start, current_end))
    resolved_date = (date_col if date_col is not None else self.date) if date_mode else None
    prior_mask, current_mask, _ = _comparison_masks(
        data,
        period_col=period_col,
        prior_period=prior_period,
        current_period=current_period,
        date_col=resolved_date,
        prior_start=prior_start,
        prior_end=prior_end,
        current_start=current_start,
        current_end=current_end,
        prior_filter=prior_filter,
        current_filter=current_filter,
    )
    return decompose_pmpm_trend(
        data.loc[prior_mask],
        data.loc[current_mask],
        count_col=resolved_count,
        loss_col=resolved_loss,
        exposure_col=resolved_exposure,
        on=groupby,
        mix_by=mix_by,
        annualization=annualization,
    )
fit_trend
fit_trend(
    *,
    value_col: str | None = None,
    exposure_col: str | None = None,
    date_col: str | None = None,
    freq: str = "M",
    min_periods: int = 3,
    confidence: float = 0.95
) -> TrendFit

Fit an exponential trend to the bound experience by log-linear regression.

Defaults to the bound expense (claims) over the bound exposure -- the PMPM trend -- across the bound date; pass value_col / exposure_col to override, or leave the exposure unbound to trend the raw amount. Returns a TrendFit (see :func:fit_trend). Run on completed, deseasonalized history.

Source code in actuarialpy/frame.py
def fit_trend(
    self,
    *,
    value_col: str | None = None,
    exposure_col: str | None = None,
    date_col: str | None = None,
    freq: str = "M",
    min_periods: int = 3,
    confidence: float = 0.95,
) -> TrendFit:
    """Fit an exponential trend to the bound experience by log-linear regression.

    Defaults to the bound ``expense`` (claims) over the bound ``exposure`` -- the PMPM
    trend -- across the bound ``date``; pass ``value_col`` / ``exposure_col`` to
    override, or leave the exposure unbound to trend the raw amount. Returns a
    ``TrendFit`` (see :func:`fit_trend`). Run on completed, deseasonalized history.
    """
    data, resolved_value = self._data_with_amount(value_col)
    resolved_exposure = exposure_col if exposure_col is not None else self._single_exposure_or_none()
    return _fit_trend(
        data,
        value_col=resolved_value,
        date_col=self._resolve_date_col(date_col),
        exposure_col=resolved_exposure,
        freq=freq,
        min_periods=min_periods,
        confidence=confidence,
    )
components
components(
    component_cols: str | list[str],
    *,
    exposure_col: str | None = None,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Explain component drivers between two periods.

Source code in actuarialpy/frame.py
def components(
    self,
    component_cols: str | list[str],
    *,
    exposure_col: str | None = None,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Explain component drivers between two periods."""
    # Use the bound date column only for date-range comparisons. If the
    # caller supplies period_col/prior_period/current_period, passing the
    # bound date column would create two comparison modes and incorrectly
    # raise an error.
    resolved_date = date_col if date_col is not None else self.date
    if "period_col" in kwargs and date_col is None:
        resolved_date = None
    return component_driver_analysis(
        self.data,
        component_cols=component_cols,
        exposure_col=exposure_col or self._single_exposure_or_none(),
        groupby=groupby,
        date_col=resolved_date,
        **kwargs,
    )
component_summary
component_summary(
    component_cols: str | list[str],
    *,
    groupby: str | list[str] | None = None,
    exposure_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Summarize component amounts, per-exposure values, and shares.

Source code in actuarialpy/frame.py
def component_summary(
    self,
    component_cols: str | list[str],
    *,
    groupby: str | list[str] | None = None,
    exposure_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Summarize component amounts, per-exposure values, and shares."""
    return summarize_components(
        self.data,
        groupby=groupby,
        component_cols=component_cols,
        exposure_col=exposure_col or self._single_exposure_or_none(),
        **kwargs,
    )
actual_vs_expected
actual_vs_expected(
    expected: str | list[str],
    *,
    actual: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    exposure: str | list[str] | None = None,
    **kwargs: Any
) -> pd.DataFrame

Summarize actual-versus-expected experience.

If actual is omitted, the object's bound expense columns are used.

Source code in actuarialpy/frame.py
def actual_vs_expected(
    self,
    expected: str | list[str],
    *,
    actual: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    exposure: str | list[str] | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Summarize actual-versus-expected experience.

    If ``actual`` is omitted, the object's bound expense columns are used.
    """
    return summarize_actual_vs_expected(
        self.data,
        groupby=groupby,
        actual_cols=self.expense if actual is None else actual,
        expected_cols=expected,
        exposure_cols=self.exposure if exposure is None else exposure,
        **kwargs,
    )
claimants
claimants(
    claimant_col: str,
    *,
    amount_cols: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    exposure_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Aggregate the experience to claimant/member/risk level.

Source code in actuarialpy/frame.py
def claimants(
    self,
    claimant_col: str,
    *,
    amount_cols: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    exposure_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Aggregate the experience to claimant/member/risk level."""
    return summarize_claimants(
        self.data,
        claimant_col=claimant_col,
        amount_cols=self.expense if amount_cols is None else amount_cols,
        groupby=groupby,
        exposure_col=exposure_col,
        **kwargs,
    )
top_claimants
top_claimants(
    claimant_col: str,
    *,
    amount_cols: str | list[str] | None = None,
    amount_col: str | None = None,
    groupby: str | list[str] | None = None,
    n: int = 25,
    **kwargs: Any
) -> pd.DataFrame

Return top claimants by amount.

Source code in actuarialpy/frame.py
def top_claimants(
    self,
    claimant_col: str,
    *,
    amount_cols: str | list[str] | None = None,
    amount_col: str | None = None,
    groupby: str | list[str] | None = None,
    n: int = 25,
    **kwargs: Any,
) -> pd.DataFrame:
    """Return top claimants by amount."""
    return top_claimants(
        self.data,
        claimant_col=claimant_col,
        amount_cols=self.expense if amount_cols is None and amount_col is None else amount_cols,
        amount_col=amount_col,
        groupby=groupby,
        n=n,
        **kwargs,
    )
claimant_concentration
claimant_concentration(
    claimant_col: str,
    *,
    amount_cols: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    **kwargs: Any
) -> pd.DataFrame

Summarize how concentrated experience is among top claimants.

Source code in actuarialpy/frame.py
def claimant_concentration(
    self,
    claimant_col: str,
    *,
    amount_cols: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Summarize how concentrated experience is among top claimants."""
    claimant_summary = summarize_claimants(
        self.data,
        claimant_col=claimant_col,
        amount_cols=self.expense if amount_cols is None else amount_cols,
        groupby=groupby,
    )
    return claim_concentration(claimant_summary, groupby=groupby, **kwargs)
cohort
cohort(
    *,
    entity_col: str,
    start_date_col: str,
    duration_months: int = 12,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Summarize each entity's first N months or cohort-duration window.

Source code in actuarialpy/frame.py
def cohort(
    self,
    *,
    entity_col: str,
    start_date_col: str,
    duration_months: int = 12,
    groupby: str | list[str] | None = None,
    date_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Summarize each entity's first N months or cohort-duration window."""
    return cohort_summary(
        self.data,
        entity_col=entity_col,
        date_col=self._resolve_date_col(date_col),
        start_date_col=start_date_col,
        duration_months=duration_months,
        groupby=groupby,
        expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
        revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
        exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
        profile=kwargs.pop("profile", self.profile),
        **kwargs,
    )
duration
duration(
    *,
    entity_col: str,
    start_date_col: str,
    max_duration_month: int | None = None,
    date_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Summarize experience by duration month since entity start.

Source code in actuarialpy/frame.py
def duration(
    self,
    *,
    entity_col: str,
    start_date_col: str,
    max_duration_month: int | None = None,
    date_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Summarize experience by duration month since entity start."""
    return duration_summary(
        self.data,
        entity_col=entity_col,
        date_col=self._resolve_date_col(date_col),
        start_date_col=start_date_col,
        expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
        revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
        exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
        max_duration_month=max_duration_month,
        **kwargs,
    )
by_status
by_status(
    status_col: str,
    *,
    entity_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Summarize experience by a status column.

Source code in actuarialpy/frame.py
def by_status(self, status_col: str, *, entity_col: str | None = None, **kwargs: Any) -> pd.DataFrame:
    """Summarize experience by a status column."""
    return status_summary(
        self.data,
        status_col=status_col,
        entity_col=entity_col,
        expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
        revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
        exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
        profile=kwargs.pop("profile", self.profile),
        **kwargs,
    )
with_status
with_status(
    *,
    effective_col: str,
    as_of: Any,
    termination_col: str | None = None,
    first_year_months: int = 12,
    status_col: str = "status",
    labels: dict[str, str] | None = None
) -> "Experience"

Return a new Experience with a derived lifecycle status column.

Derives active / first-year / termed from effective and termination dates as of a reference date (see :func:actuarialpy.derive_status). Summarize the result with :meth:by_status.

Source code in actuarialpy/frame.py
def with_status(
    self,
    *,
    effective_col: str,
    as_of: Any,
    termination_col: str | None = None,
    first_year_months: int = 12,
    status_col: str = "status",
    labels: dict[str, str] | None = None,
) -> "Experience":
    """Return a new ``Experience`` with a derived lifecycle status column.

    Derives active / first-year / termed from effective and termination dates
    as of a reference date (see :func:`actuarialpy.derive_status`). Summarize
    the result with :meth:`by_status`.
    """
    data = derive_status(
        self.data,
        effective_col=effective_col,
        as_of=as_of,
        termination_col=termination_col,
        first_year_months=first_year_months,
        status_col=status_col,
        labels=labels,
    )
    return self.with_roles(data=data, copy=False)
by_band
by_band(
    value_col: str,
    bands: Any,
    *,
    labels: Any = None,
    **kwargs: Any
) -> pd.DataFrame

Summarize experience by a size band on value_col (see summarize_by_band).

Source code in actuarialpy/frame.py
def by_band(
    self,
    value_col: str,
    bands: Any,
    *,
    labels: Any = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Summarize experience by a size band on ``value_col`` (see ``summarize_by_band``)."""
    return summarize_by_band(
        self.data,
        value_col,
        bands,
        labels=labels,
        expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
        revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
        exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
        profile=kwargs.pop("profile", self.profile),
        **kwargs,
    )
margin
margin(
    groupby: str | list[str] | None = None,
    *,
    margin_col: str = "margin",
    ratio_col: str = "margin_ratio",
    per_exposure_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Underwriting margin (revenue net of expense) by optional grouping.

Aggregates the bound expense and revenue roles with :meth:by, then adds the margin (total_revenue - total_expense), the margin ratio, and an optional per-exposure margin.

Source code in actuarialpy/frame.py
def margin(
    self,
    groupby: str | list[str] | None = None,
    *,
    margin_col: str = "margin",
    ratio_col: str = "margin_ratio",
    per_exposure_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Underwriting margin (revenue net of expense) by optional grouping.

    Aggregates the bound expense and revenue roles with :meth:`by`, then adds
    the margin (``total_revenue - total_expense``), the margin ratio, and an
    optional per-exposure margin.
    """
    summary = self.by(groupby, **kwargs)
    summary[margin_col] = summary["total_revenue"] - summary["total_expense"]
    summary[ratio_col] = safe_divide(summary[margin_col], summary["total_revenue"])
    if per_exposure_col is not None:
        exposure = self._single_exposure_or_none()
        if exposure is None:
            raise ValueError("A single bound exposure is required for per_exposure_col.")
        summary[per_exposure_col] = per_exposure(summary[margin_col], summary[exposure])
    return summary
credibility_weighted
credibility_weighted(
    groupby: str | list[str],
    *,
    z: Any,
    metric: str = "loss_ratio",
    complement: float | None = None,
    out_col: str | None = None,
    **kwargs: Any
) -> pd.DataFrame

Blend each group's metric with a complement at credibility z.

Computes the grouped summary (:meth:by), then blends metric toward complement using z (see :func:actuarialpy.credibility_weighted_estimate). z may be a scalar or values aligned to the grouped rows. When complement is omitted the book-level value of metric is used as the complement of credibility.

Source code in actuarialpy/frame.py
def credibility_weighted(
    self,
    groupby: str | list[str],
    *,
    z: Any,
    metric: str = "loss_ratio",
    complement: float | None = None,
    out_col: str | None = None,
    **kwargs: Any,
) -> pd.DataFrame:
    """Blend each group's ``metric`` with a complement at credibility ``z``.

    Computes the grouped summary (:meth:`by`), then blends ``metric`` toward
    ``complement`` using ``z`` (see
    :func:`actuarialpy.credibility_weighted_estimate`). ``z`` may be a scalar
    or values aligned to the grouped rows. When ``complement`` is omitted the
    book-level value of ``metric`` is used as the complement of credibility.
    """
    summary = self.by(groupby, **kwargs)
    if metric not in summary.columns:
        raise ValueError(f"metric '{metric}' is not in the summary columns: {list(summary.columns)}")
    if complement is None:
        complement = self.by(**kwargs)[metric].iloc[0]
    name = out_col or f"credibility_weighted_{metric}"
    summary[name] = credibility_weighted_estimate(summary[metric], complement, z)
    return summary
pool_claimants
pool_claimants(
    claimant_col: str,
    pooling_point: float,
    *,
    amount_cols: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    amount_name: str = "total_expense",
    **kwargs: Any
) -> pd.DataFrame

Aggregate to claimant level and split each claimant into pooled/excess.

Summarizes the experience to claimant grain (:meth:claimants) and caps each claimant's total at pooling_point (see :func:actuarialpy.pool_losses), returning pooled and excess columns for capped experience and the excess hand-off to tail modeling.

Source code in actuarialpy/frame.py
def pool_claimants(
    self,
    claimant_col: str,
    pooling_point: float,
    *,
    amount_cols: str | list[str] | None = None,
    groupby: str | list[str] | None = None,
    amount_name: str = "total_expense",
    **kwargs: Any,
) -> pd.DataFrame:
    """Aggregate to claimant level and split each claimant into pooled/excess.

    Summarizes the experience to claimant grain (:meth:`claimants`) and caps
    each claimant's total at ``pooling_point`` (see
    :func:`actuarialpy.pool_losses`), returning pooled and excess columns for
    capped experience and the excess hand-off to tail modeling.
    """
    claimant_totals = summarize_claimants(
        self.data,
        claimant_col=claimant_col,
        amount_cols=self.expense if amount_cols is None else amount_cols,
        groupby=groupby,
        amount_name=amount_name,
    )
    return pool_losses(claimant_totals, amount_name, pooling_point, **kwargs)

ChainLadder dataclass

Chain-ladder development pattern fitted from a cumulative triangle.

Fit with :meth:fit from a cumulative development triangle (for example the output of :func:make_completion_triangle with cumulative=True):

  • age_to_age -- link (age-to-age) factors, indexed by their starting development period.
  • cdf -- cumulative development factor to ultimate by development period, including the tail.
  • completion_factors -- 1 / cdf by development period: the proportion of ultimate emerged by each development period. These are divide-convention factors in (0, 1] (completed = paid / factor), so they line up with :func:validate_completion_factors and downstream completion.

Use :meth:project to apply the pattern to a triangle and get per-origin ultimate and IBNR.

Source code in actuarialpy/reserving.py
@dataclass(frozen=True)
class ChainLadder:
    """Chain-ladder development pattern fitted from a cumulative triangle.

    Fit with :meth:`fit` from a cumulative development triangle (for example the
    output of :func:`make_completion_triangle` with ``cumulative=True``):

    - ``age_to_age`` -- link (age-to-age) factors, indexed by their starting development period.
    - ``cdf`` -- cumulative development factor to ultimate by development period, including the
      tail.
    - ``completion_factors`` -- ``1 / cdf`` by development period: the proportion of ultimate
      emerged by each development period. These are divide-convention factors in ``(0, 1]``
      (``completed = paid / factor``), so they line up with
      :func:`validate_completion_factors` and downstream completion.

    Use :meth:`project` to apply the pattern to a triangle and get per-origin
    ultimate and IBNR.
    """

    age_to_age: pd.Series
    cdf: pd.Series
    completion_factors: pd.Series
    tail: float
    method: str

    @classmethod
    def fit(cls, triangle: pd.DataFrame, *, method: str = "volume", tail: float = 1.0) -> ChainLadder:
        """Estimate the development pattern from a cumulative triangle.

        ``method`` is ``"volume"`` (volume-weighted age-to-age factors, the
        default) or ``"simple"`` (straight average of individual link ratios).
        ``tail`` (>= 1) extends development beyond the latest observed development period.
        """
        if method not in ("volume", "simple"):
            raise ValueError("method must be 'volume' or 'simple'")
        if tail < 1.0:
            raise ValueError("tail must be >= 1.0")
        if not isinstance(triangle, pd.DataFrame):
            raise TypeError("triangle must be a pandas DataFrame")

        tri = triangle.sort_index(axis=1)
        cols = list(tri.columns)
        if len(cols) < 2:
            raise ValueError("triangle must have at least two development periods")
        if tri.shape[0] < 2:
            raise ValueError("triangle must have at least two origin periods")

        # age-to-age (link) factors between each pair of adjacent development periods
        ratios: dict[object, float] = {}
        for start, end in zip(cols[:-1], cols[1:]):
            pair = tri[[start, end]].dropna()
            if pair.empty:
                raise ValueError(f"no overlapping origins to estimate the {start}->{end} development factor")
            if method == "volume":
                start_sum = float(pair[start].sum())
                if start_sum == 0:
                    raise ValueError(f"zero cumulative at development period {start}; cannot estimate {start}->{end} factor")
                ratios[start] = float(pair[end].sum()) / start_sum
            else:
                ratios[start] = float((pair[end] / pair[start]).mean())
        age_to_age = pd.Series(ratios, name="age_to_age")

        # cumulative development factors to ultimate (with tail), accumulating back
        cdf_vals: dict[object, float] = {cols[-1]: float(tail)}
        running = float(tail)
        for start in reversed(cols[:-1]):
            running *= age_to_age[start]
            cdf_vals[start] = running
        cdf = pd.Series(cdf_vals, name="cdf").reindex(cols)

        completion = (1.0 / cdf).rename("completion_factor")
        return cls(
            age_to_age=age_to_age,
            cdf=cdf,
            completion_factors=completion,
            tail=float(tail),
            method=method,
        )

    def project(self, triangle: pd.DataFrame) -> pd.DataFrame:
        """Project ultimate and IBNR per origin by applying the fitted pattern.

        For each origin, takes its latest observed cumulative amount and multiplies
        by the cumulative development factor at that development period. Returns one row per origin
        with the latest development period, latest cumulative, development factor applied,
        ultimate, and IBNR (ultimate minus latest).
        """
        tri = triangle.sort_index(axis=1)
        records: list[dict[str, float]] = []
        origins: list[object] = []
        for origin, row in tri.iterrows():
            observed = row.dropna()
            if observed.empty:
                continue
            latest_development = max(observed.index)
            if latest_development not in self.cdf.index:
                raise ValueError(f"no development factor for development period {latest_development}; fit on a matching triangle")
            latest = float(observed.loc[latest_development])
            factor = float(self.cdf.loc[latest_development])
            ultimate = latest * factor
            origins.append(origin)
            records.append({
                "latest_development": latest_development,
                "latest": latest,
                "development_factor": factor,
                "ultimate": ultimate,
                "ibnr": ultimate - latest,
            })
        return pd.DataFrame.from_records(records, index=pd.Index(origins, name=tri.index.name))
fit classmethod
fit(
    triangle: DataFrame,
    *,
    method: str = "volume",
    tail: float = 1.0
) -> ChainLadder

Estimate the development pattern from a cumulative triangle.

method is "volume" (volume-weighted age-to-age factors, the default) or "simple" (straight average of individual link ratios). tail (>= 1) extends development beyond the latest observed development period.

Source code in actuarialpy/reserving.py
@classmethod
def fit(cls, triangle: pd.DataFrame, *, method: str = "volume", tail: float = 1.0) -> ChainLadder:
    """Estimate the development pattern from a cumulative triangle.

    ``method`` is ``"volume"`` (volume-weighted age-to-age factors, the
    default) or ``"simple"`` (straight average of individual link ratios).
    ``tail`` (>= 1) extends development beyond the latest observed development period.
    """
    if method not in ("volume", "simple"):
        raise ValueError("method must be 'volume' or 'simple'")
    if tail < 1.0:
        raise ValueError("tail must be >= 1.0")
    if not isinstance(triangle, pd.DataFrame):
        raise TypeError("triangle must be a pandas DataFrame")

    tri = triangle.sort_index(axis=1)
    cols = list(tri.columns)
    if len(cols) < 2:
        raise ValueError("triangle must have at least two development periods")
    if tri.shape[0] < 2:
        raise ValueError("triangle must have at least two origin periods")

    # age-to-age (link) factors between each pair of adjacent development periods
    ratios: dict[object, float] = {}
    for start, end in zip(cols[:-1], cols[1:]):
        pair = tri[[start, end]].dropna()
        if pair.empty:
            raise ValueError(f"no overlapping origins to estimate the {start}->{end} development factor")
        if method == "volume":
            start_sum = float(pair[start].sum())
            if start_sum == 0:
                raise ValueError(f"zero cumulative at development period {start}; cannot estimate {start}->{end} factor")
            ratios[start] = float(pair[end].sum()) / start_sum
        else:
            ratios[start] = float((pair[end] / pair[start]).mean())
    age_to_age = pd.Series(ratios, name="age_to_age")

    # cumulative development factors to ultimate (with tail), accumulating back
    cdf_vals: dict[object, float] = {cols[-1]: float(tail)}
    running = float(tail)
    for start in reversed(cols[:-1]):
        running *= age_to_age[start]
        cdf_vals[start] = running
    cdf = pd.Series(cdf_vals, name="cdf").reindex(cols)

    completion = (1.0 / cdf).rename("completion_factor")
    return cls(
        age_to_age=age_to_age,
        cdf=cdf,
        completion_factors=completion,
        tail=float(tail),
        method=method,
    )
project
project(triangle: DataFrame) -> pd.DataFrame

Project ultimate and IBNR per origin by applying the fitted pattern.

For each origin, takes its latest observed cumulative amount and multiplies by the cumulative development factor at that development period. Returns one row per origin with the latest development period, latest cumulative, development factor applied, ultimate, and IBNR (ultimate minus latest).

Source code in actuarialpy/reserving.py
def project(self, triangle: pd.DataFrame) -> pd.DataFrame:
    """Project ultimate and IBNR per origin by applying the fitted pattern.

    For each origin, takes its latest observed cumulative amount and multiplies
    by the cumulative development factor at that development period. Returns one row per origin
    with the latest development period, latest cumulative, development factor applied,
    ultimate, and IBNR (ultimate minus latest).
    """
    tri = triangle.sort_index(axis=1)
    records: list[dict[str, float]] = []
    origins: list[object] = []
    for origin, row in tri.iterrows():
        observed = row.dropna()
        if observed.empty:
            continue
        latest_development = max(observed.index)
        if latest_development not in self.cdf.index:
            raise ValueError(f"no development factor for development period {latest_development}; fit on a matching triangle")
        latest = float(observed.loc[latest_development])
        factor = float(self.cdf.loc[latest_development])
        ultimate = latest * factor
        origins.append(origin)
        records.append({
            "latest_development": latest_development,
            "latest": latest,
            "development_factor": factor,
            "ultimate": ultimate,
            "ibnr": ultimate - latest,
        })
    return pd.DataFrame.from_records(records, index=pd.Index(origins, name=tri.index.name))

InsufficientDataWarning

Bases: UserWarning

Emitted when a segment has too little data to fit and is skipped or aggregated.

Filter it with the standard :mod:warnings machinery, e.g. warnings.filterwarnings("ignore", category=InsufficientDataWarning).

Source code in actuarialpy/reserving.py
class InsufficientDataWarning(UserWarning):
    """Emitted when a segment has too little data to fit and is skipped or aggregated.

    Filter it with the standard :mod:`warnings` machinery, e.g.
    ``warnings.filterwarnings("ignore", category=InsufficientDataWarning)``.
    """

Buhlmann

Bühlmann credibility model.

This implementation assumes each risk has the same number of observations.

Parameters

overall_mean : float Estimated collective mean. epv : float Estimated expected process variance (EPV). vhm : float Estimated variance of hypothetical means (VHM). n_obs : int Number of observations per risk.

Source code in actuarialpy/credibility.py
class Buhlmann:
    """Bühlmann credibility model.

    This implementation assumes each risk has the same number of observations.

    Parameters
    ----------
    overall_mean : float
        Estimated collective mean.
    epv : float
        Estimated expected process variance (EPV).
    vhm : float
        Estimated variance of hypothetical means (VHM).
    n_obs : int
        Number of observations per risk.
    """

    def __init__(self, overall_mean: float, epv: float, vhm: float, n_obs: int):
        if n_obs <= 0:
            raise ValueError("n_obs must be positive.")
        if epv < 0:
            raise ValueError("epv must be nonnegative.")
        if vhm < 0:
            raise ValueError("vhm must be nonnegative.")

        self.overall_mean = float(overall_mean)
        self.epv = float(epv)
        self.vhm = float(vhm)
        self.n_obs = int(n_obs)

    @property
    def k(self) -> float:
        """K = EPV / VHM. Returns infinity when VHM = 0."""
        if self.vhm == 0:
            return float("inf")
        return self.epv / self.vhm

    @property
    def z(self) -> float:
        """Credibility factor ``Z = n / (n + K)``. Returns 0 when K is infinite."""
        k = self.k
        if not np.isfinite(k):
            return 0.0
        return self.n_obs / (self.n_obs + k)

    def premium(self, risk_mean: Any) -> Any:
        """Compute the Bühlmann credibility premium ``Z * risk_mean + (1 - Z) * overall_mean``.

        Parameters
        ----------
        risk_mean : float or array-like
            Risk-specific sample mean(s).

        Returns
        -------
        float or numpy.ndarray
            Credibility-weighted premium(s).
        """
        risk_mean = np.asarray(risk_mean, dtype=float)
        premium = self.z * risk_mean + (1.0 - self.z) * self.overall_mean
        return float(premium) if premium.ndim == 0 else premium

    @classmethod
    def fit(cls, data: Any) -> Buhlmann:
        """Fit a Bühlmann credibility model from data.

        Parameters
        ----------
        data : array-like, shape (m, n)
            Observations for m risks, each with n observations.

        Returns
        -------
        Buhlmann
            Fitted Bühlmann model.

        Notes
        -----
        Estimators used:

        - overall_mean = mean of all observations
        - EPV = average of within-risk sample variances
        - VHM = sample variance of risk means minus EPV / n, floored at 0
        """
        data = np.asarray(data, dtype=float)

        if data.ndim != 2:
            raise ValueError("data must be a 2D array with shape (n_risks, n_obs).")

        n_risks, n_obs = data.shape

        if n_risks < 2:
            raise ValueError("data must contain at least two risks.")
        if n_obs < 2:
            raise ValueError("each risk must have at least two observations.")

        risk_means = np.mean(data, axis=1)
        overall_mean = float(np.mean(data))

        within_vars = np.var(data, axis=1, ddof=1)
        epv = float(np.mean(within_vars))

        between_var = float(np.var(risk_means, ddof=1))
        vhm = max(between_var - epv / n_obs, 0.0)

        return cls(overall_mean=overall_mean, epv=epv, vhm=vhm, n_obs=n_obs)

    def __repr__(self) -> str:
        return (
            f"Buhlmann(overall_mean={self.overall_mean}, "
            f"epv={self.epv}, vhm={self.vhm}, n_obs={self.n_obs})"
        )
k property
k: float

K = EPV / VHM. Returns infinity when VHM = 0.

z property
z: float

Credibility factor Z = n / (n + K). Returns 0 when K is infinite.

premium
premium(risk_mean: Any) -> Any

Compute the Bühlmann credibility premium Z * risk_mean + (1 - Z) * overall_mean.

Parameters

risk_mean : float or array-like Risk-specific sample mean(s).

Returns

float or numpy.ndarray Credibility-weighted premium(s).

Source code in actuarialpy/credibility.py
def premium(self, risk_mean: Any) -> Any:
    """Compute the Bühlmann credibility premium ``Z * risk_mean + (1 - Z) * overall_mean``.

    Parameters
    ----------
    risk_mean : float or array-like
        Risk-specific sample mean(s).

    Returns
    -------
    float or numpy.ndarray
        Credibility-weighted premium(s).
    """
    risk_mean = np.asarray(risk_mean, dtype=float)
    premium = self.z * risk_mean + (1.0 - self.z) * self.overall_mean
    return float(premium) if premium.ndim == 0 else premium
fit classmethod
fit(data: Any) -> Buhlmann

Fit a Bühlmann credibility model from data.

Parameters

data : array-like, shape (m, n) Observations for m risks, each with n observations.

Returns

Buhlmann Fitted Bühlmann model.

Notes

Estimators used:

  • overall_mean = mean of all observations
  • EPV = average of within-risk sample variances
  • VHM = sample variance of risk means minus EPV / n, floored at 0
Source code in actuarialpy/credibility.py
@classmethod
def fit(cls, data: Any) -> Buhlmann:
    """Fit a Bühlmann credibility model from data.

    Parameters
    ----------
    data : array-like, shape (m, n)
        Observations for m risks, each with n observations.

    Returns
    -------
    Buhlmann
        Fitted Bühlmann model.

    Notes
    -----
    Estimators used:

    - overall_mean = mean of all observations
    - EPV = average of within-risk sample variances
    - VHM = sample variance of risk means minus EPV / n, floored at 0
    """
    data = np.asarray(data, dtype=float)

    if data.ndim != 2:
        raise ValueError("data must be a 2D array with shape (n_risks, n_obs).")

    n_risks, n_obs = data.shape

    if n_risks < 2:
        raise ValueError("data must contain at least two risks.")
    if n_obs < 2:
        raise ValueError("each risk must have at least two observations.")

    risk_means = np.mean(data, axis=1)
    overall_mean = float(np.mean(data))

    within_vars = np.var(data, axis=1, ddof=1)
    epv = float(np.mean(within_vars))

    between_var = float(np.var(risk_means, ddof=1))
    vhm = max(between_var - epv / n_obs, 0.0)

    return cls(overall_mean=overall_mean, epv=epv, vhm=vhm, n_obs=n_obs)

BuhlmannStraub

Bühlmann-Straub credibility model.

This implementation allows different exposure weights by risk and period.

Parameters

overall_mean : float Estimated collective mean. epv : float Estimated expected process variance (EPV). vhm : float Estimated variance of hypothetical means (VHM). weights : array-like Total weight (exposure) for each risk.

Source code in actuarialpy/credibility.py
class BuhlmannStraub:
    """Bühlmann-Straub credibility model.

    This implementation allows different exposure weights by risk and period.

    Parameters
    ----------
    overall_mean : float
        Estimated collective mean.
    epv : float
        Estimated expected process variance (EPV).
    vhm : float
        Estimated variance of hypothetical means (VHM).
    weights : array-like
        Total weight (exposure) for each risk.
    """

    def __init__(self, overall_mean: float, epv: float, vhm: float, weights: Any):
        weights = np.asarray(weights, dtype=float)

        if weights.ndim != 1:
            raise ValueError("weights must be a 1D array.")
        if weights.size == 0:
            raise ValueError("weights must not be empty.")
        if np.any(weights <= 0):
            raise ValueError("weights must be positive.")
        if epv < 0:
            raise ValueError("epv must be nonnegative.")
        if vhm < 0:
            raise ValueError("vhm must be nonnegative.")

        self.overall_mean = float(overall_mean)
        self.epv = float(epv)
        self.vhm = float(vhm)
        self.weights = weights

    @property
    def k(self) -> float:
        """K = EPV / VHM. Returns infinity when VHM = 0."""
        if self.vhm == 0:
            return float("inf")
        return self.epv / self.vhm

    def z(self, weight: Any) -> Any:
        """Credibility factor for a given total risk weight: ``Z_i = w_i / (w_i + K)``.

        Parameters
        ----------
        weight : float or array-like
            Total exposure weight(s).

        Returns
        -------
        float or numpy.ndarray
            Credibility factor(s).
        """
        weight = np.asarray(weight, dtype=float)

        if np.any(weight <= 0):
            raise ValueError("weight must be positive.")

        k = self.k
        if not np.isfinite(k):
            out = np.zeros_like(weight, dtype=float)
        else:
            out = weight / (weight + k)

        return float(out) if out.ndim == 0 else out

    def premium(self, risk_mean: Any, weight: Any) -> Any:
        """Compute the Bühlmann-Straub premium ``Z_i * risk_mean_i + (1 - Z_i) * overall_mean``.

        Parameters
        ----------
        risk_mean : float or array-like
            Risk-specific weighted mean(s).
        weight : float or array-like
            Total exposure weight(s).

        Returns
        -------
        float or numpy.ndarray
            Credibility-weighted premium(s).
        """
        risk_mean = np.asarray(risk_mean, dtype=float)
        z = self.z(weight)
        premium = z * risk_mean + (1.0 - z) * self.overall_mean
        return float(premium) if np.ndim(premium) == 0 else premium

    @classmethod
    def fit(cls, data: Any, weights: Any) -> BuhlmannStraub:
        """Fit a Bühlmann-Straub model from observations and weights.

        Parameters
        ----------
        data : array-like, shape (m, n)
            Observed values X_ij for m risks and n periods.
        weights : array-like, shape (m, n)
            Exposure weights w_ij for m risks and n periods.

        Returns
        -------
        BuhlmannStraub
            Fitted Bühlmann-Straub model.

        Notes
        -----
        Let ``w_i. = sum_j w_ij``, ``Xbar_i = sum_j w_ij X_ij / w_i.``, and
        ``overall_mean = sum_i sum_j w_ij X_ij / sum_i sum_j w_ij``.

        EPV is estimated by ``[sum_i sum_j w_ij (X_ij - Xbar_i)^2] / [m (n - 1)]``.

        VHM is the weighted sample variance of the risk means around the overall
        mean, adjusted by EPV and floored at 0. This is a practical
        implementation intended for equal period counts.
        """
        data = np.asarray(data, dtype=float)
        weights = np.asarray(weights, dtype=float)

        if data.ndim != 2:
            raise ValueError("data must be a 2D array.")
        if weights.ndim != 2:
            raise ValueError("weights must be a 2D array.")
        if data.shape != weights.shape:
            raise ValueError("data and weights must have the same shape.")
        if data.shape[0] < 2:
            raise ValueError("data must contain at least two risks.")
        if data.shape[1] < 2:
            raise ValueError("each risk must have at least two periods.")
        if np.any(weights <= 0):
            raise ValueError("weights must be positive.")

        m, n = data.shape

        risk_weights = np.sum(weights, axis=1)
        weighted_risk_means = np.sum(weights * data, axis=1) / risk_weights

        overall_mean = float(np.sum(weights * data) / np.sum(weights))

        ss_within = np.sum(weights * (data - weighted_risk_means[:, None]) ** 2)
        epv = float(ss_within / (m * (n - 1)))

        mean_risk_weight = float(np.mean(risk_weights))
        between_term = float(
            np.sum(risk_weights * (weighted_risk_means - overall_mean) ** 2) / (m - 1)
        )

        vhm = max((between_term - epv) / mean_risk_weight, 0.0)

        return cls(overall_mean=overall_mean, epv=epv, vhm=vhm, weights=risk_weights)

    def __repr__(self) -> str:
        return (
            f"BuhlmannStraub(overall_mean={self.overall_mean}, "
            f"epv={self.epv}, vhm={self.vhm}, weights={self.weights})"
        )
k property
k: float

K = EPV / VHM. Returns infinity when VHM = 0.

z
z(weight: Any) -> Any

Credibility factor for a given total risk weight: Z_i = w_i / (w_i + K).

Parameters

weight : float or array-like Total exposure weight(s).

Returns

float or numpy.ndarray Credibility factor(s).

Source code in actuarialpy/credibility.py
def z(self, weight: Any) -> Any:
    """Credibility factor for a given total risk weight: ``Z_i = w_i / (w_i + K)``.

    Parameters
    ----------
    weight : float or array-like
        Total exposure weight(s).

    Returns
    -------
    float or numpy.ndarray
        Credibility factor(s).
    """
    weight = np.asarray(weight, dtype=float)

    if np.any(weight <= 0):
        raise ValueError("weight must be positive.")

    k = self.k
    if not np.isfinite(k):
        out = np.zeros_like(weight, dtype=float)
    else:
        out = weight / (weight + k)

    return float(out) if out.ndim == 0 else out
premium
premium(risk_mean: Any, weight: Any) -> Any

Compute the Bühlmann-Straub premium Z_i * risk_mean_i + (1 - Z_i) * overall_mean.

Parameters

risk_mean : float or array-like Risk-specific weighted mean(s). weight : float or array-like Total exposure weight(s).

Returns

float or numpy.ndarray Credibility-weighted premium(s).

Source code in actuarialpy/credibility.py
def premium(self, risk_mean: Any, weight: Any) -> Any:
    """Compute the Bühlmann-Straub premium ``Z_i * risk_mean_i + (1 - Z_i) * overall_mean``.

    Parameters
    ----------
    risk_mean : float or array-like
        Risk-specific weighted mean(s).
    weight : float or array-like
        Total exposure weight(s).

    Returns
    -------
    float or numpy.ndarray
        Credibility-weighted premium(s).
    """
    risk_mean = np.asarray(risk_mean, dtype=float)
    z = self.z(weight)
    premium = z * risk_mean + (1.0 - z) * self.overall_mean
    return float(premium) if np.ndim(premium) == 0 else premium
fit classmethod
fit(data: Any, weights: Any) -> BuhlmannStraub

Fit a Bühlmann-Straub model from observations and weights.

Parameters

data : array-like, shape (m, n) Observed values X_ij for m risks and n periods. weights : array-like, shape (m, n) Exposure weights w_ij for m risks and n periods.

Returns

BuhlmannStraub Fitted Bühlmann-Straub model.

Notes

Let w_i. = sum_j w_ij, Xbar_i = sum_j w_ij X_ij / w_i., and overall_mean = sum_i sum_j w_ij X_ij / sum_i sum_j w_ij.

EPV is estimated by [sum_i sum_j w_ij (X_ij - Xbar_i)^2] / [m (n - 1)].

VHM is the weighted sample variance of the risk means around the overall mean, adjusted by EPV and floored at 0. This is a practical implementation intended for equal period counts.

Source code in actuarialpy/credibility.py
@classmethod
def fit(cls, data: Any, weights: Any) -> BuhlmannStraub:
    """Fit a Bühlmann-Straub model from observations and weights.

    Parameters
    ----------
    data : array-like, shape (m, n)
        Observed values X_ij for m risks and n periods.
    weights : array-like, shape (m, n)
        Exposure weights w_ij for m risks and n periods.

    Returns
    -------
    BuhlmannStraub
        Fitted Bühlmann-Straub model.

    Notes
    -----
    Let ``w_i. = sum_j w_ij``, ``Xbar_i = sum_j w_ij X_ij / w_i.``, and
    ``overall_mean = sum_i sum_j w_ij X_ij / sum_i sum_j w_ij``.

    EPV is estimated by ``[sum_i sum_j w_ij (X_ij - Xbar_i)^2] / [m (n - 1)]``.

    VHM is the weighted sample variance of the risk means around the overall
    mean, adjusted by EPV and floored at 0. This is a practical
    implementation intended for equal period counts.
    """
    data = np.asarray(data, dtype=float)
    weights = np.asarray(weights, dtype=float)

    if data.ndim != 2:
        raise ValueError("data must be a 2D array.")
    if weights.ndim != 2:
        raise ValueError("weights must be a 2D array.")
    if data.shape != weights.shape:
        raise ValueError("data and weights must have the same shape.")
    if data.shape[0] < 2:
        raise ValueError("data must contain at least two risks.")
    if data.shape[1] < 2:
        raise ValueError("each risk must have at least two periods.")
    if np.any(weights <= 0):
        raise ValueError("weights must be positive.")

    m, n = data.shape

    risk_weights = np.sum(weights, axis=1)
    weighted_risk_means = np.sum(weights * data, axis=1) / risk_weights

    overall_mean = float(np.sum(weights * data) / np.sum(weights))

    ss_within = np.sum(weights * (data - weighted_risk_means[:, None]) ** 2)
    epv = float(ss_within / (m * (n - 1)))

    mean_risk_weight = float(np.mean(risk_weights))
    between_term = float(
        np.sum(risk_weights * (weighted_risk_means - overall_mean) ** 2) / (m - 1)
    )

    vhm = max((between_term - epv) / mean_risk_weight, 0.0)

    return cls(overall_mean=overall_mean, epv=epv, vhm=vhm, weights=risk_weights)

TrendFit dataclass

Result of :func:fit_trend: an exponential trend fitted to a rate series.

annual_trend is the fitted multiplicative annual trend (exp(slope) - 1 on the log scale). r_squared is the goodness of fit, std_error the delta-method standard error of annual_trend, and (ci_low, ci_high) its confidence interval (asymmetric -- the endpoints are transformed from the log-scale slope interval). slope and intercept describe the underlying log(value) = intercept + slope * t fit with t measured in years from the first period.

Source code in actuarialpy/trend.py
@dataclass(frozen=True)
class TrendFit:
    """Result of :func:`fit_trend`: an exponential trend fitted to a rate series.

    ``annual_trend`` is the fitted multiplicative annual trend (``exp(slope) - 1`` on the
    log scale). ``r_squared`` is the goodness of fit, ``std_error`` the delta-method
    standard error of ``annual_trend``, and ``(ci_low, ci_high)`` its confidence interval
    (asymmetric -- the endpoints are transformed from the log-scale slope interval).
    ``slope`` and ``intercept`` describe the underlying ``log(value) = intercept + slope * t``
    fit with ``t`` measured in years from the first period.
    """

    annual_trend: float
    r_squared: float
    std_error: float
    ci_low: float
    ci_high: float
    confidence: float
    n_periods: int
    slope: float
    intercept: float

    @property
    def ci(self) -> tuple[float, float]:
        """The confidence interval as a ``(low, high)`` tuple."""
        return (self.ci_low, self.ci_high)

    def factor(self, months: float) -> float:
        """Trend factor over ``months`` at the fitted rate: ``(1 + annual_trend) ** (months / 12)``."""
        return (1.0 + self.annual_trend) ** (months / 12.0)

    def __repr__(self) -> str:
        return (
            f"TrendFit(annual_trend={self.annual_trend:.2%}, R2={self.r_squared:.3f}, "
            f"{self.confidence:.0%} CI [{self.ci_low:.2%}, {self.ci_high:.2%}], n={self.n_periods})"
        )
ci property
ci: tuple[float, float]

The confidence interval as a (low, high) tuple.

factor
factor(months: float) -> float

Trend factor over months at the fitted rate: (1 + annual_trend) ** (months / 12).

Source code in actuarialpy/trend.py
def factor(self, months: float) -> float:
    """Trend factor over ``months`` at the fitted rate: ``(1 + annual_trend) ** (months / 12)``."""
    return (1.0 + self.annual_trend) ** (months / 12.0)

actual_to_expected

actual_to_expected(actual: Any, expected: Any) -> Any

Calculate actual-to-expected: actual divided by expected.

Source code in actuarialpy/metrics.py
def actual_to_expected(actual: Any, expected: Any) -> Any:
    """Calculate actual-to-expected: actual divided by expected."""
    return ratio(actual, expected)

combined_ratio

combined_ratio(
    losses: Any, expenses: Any, revenue: Any
) -> Any

Calculate combined ratio: (losses + expenses) divided by revenue.

Source code in actuarialpy/metrics.py
def combined_ratio(losses: Any, expenses: Any, revenue: Any) -> Any:
    """Calculate combined ratio: (losses + expenses) divided by revenue."""
    return ratio(np.asarray(losses) + np.asarray(expenses), revenue)

expense_ratio

expense_ratio(expenses: Any, revenue: Any) -> Any

Calculate an expense ratio: expenses divided by revenue.

Source code in actuarialpy/metrics.py
def expense_ratio(expenses: Any, revenue: Any) -> Any:
    """Calculate an expense ratio: expenses divided by revenue."""
    return ratio(expenses, revenue)

frequency

frequency(claim_count: Any, exposure: Any) -> Any

Calculate claim frequency: claim count divided by exposure.

Source code in actuarialpy/metrics.py
def frequency(claim_count: Any, exposure: Any) -> Any:
    """Calculate claim frequency: claim count divided by exposure."""
    return ratio(claim_count, exposure)

indicated_change

indicated_change(required: Any, current: Any) -> Any

Indicated change from current to required amount.

Source code in actuarialpy/metrics.py
def indicated_change(required: Any, current: Any) -> Any:
    """Indicated change from current to required amount."""
    return safe_divide(required, current) - 1

loss_ratio

loss_ratio(losses_or_expenses: Any, revenue: Any) -> Any

Calculate a loss ratio: losses or expenses divided by revenue.

Source code in actuarialpy/metrics.py
def loss_ratio(losses_or_expenses: Any, revenue: Any) -> Any:
    """Calculate a loss ratio: losses or expenses divided by revenue."""
    return ratio(losses_or_expenses, revenue)

medical_loss_ratio

medical_loss_ratio(claims: Any, premium: Any) -> Any

Calculate a medical loss ratio: claims divided by premium.

Source code in actuarialpy/metrics.py
def medical_loss_ratio(claims: Any, premium: Any) -> Any:
    """Calculate a medical loss ratio: claims divided by premium."""
    return loss_ratio(claims, premium)

pepm

pepm(amount: Any, employee_months: Any) -> Any

Calculate amount per employee month.

Source code in actuarialpy/metrics.py
def pepm(amount: Any, employee_months: Any) -> Any:
    """Calculate amount per employee month."""
    return per_exposure(amount, employee_months)

per_exposure

per_exposure(amount: Any, exposure: Any) -> Any

Calculate amount per exposure unit.

Source code in actuarialpy/metrics.py
def per_exposure(amount: Any, exposure: Any) -> Any:
    """Calculate amount per exposure unit."""
    return ratio(amount, exposure)

permissible_loss_ratio

permissible_loss_ratio(
    expense_ratio: Any, profit_provision: Any = 0.0
) -> Any

Permissible (target / break-even) loss ratio.

PLR = 1 - expense_ratio - profit_provision where both loadings are expressed as a fraction of premium. Also called the zero-margin or target loss ratio: the loss ratio at which premium exactly covers losses, expenses, and the profit/contingency provision. Works element-wise on scalars or Series. (Shops that load fixed expenses on a loss basis instead use (1 - V - Q) / (1 + G); this implements the premium-basis form.)

Source code in actuarialpy/metrics.py
def permissible_loss_ratio(expense_ratio: Any, profit_provision: Any = 0.0) -> Any:
    """Permissible (target / break-even) loss ratio.

    ``PLR = 1 - expense_ratio - profit_provision`` where both loadings are
    expressed as a fraction of premium. Also called the zero-margin or target
    loss ratio: the loss ratio at which premium exactly covers losses, expenses,
    and the profit/contingency provision. Works element-wise on scalars or
    Series. (Shops that load fixed expenses on a loss basis instead use
    ``(1 - V - Q) / (1 + G)``; this implements the premium-basis form.)
    """
    return 1.0 - expense_ratio - profit_provision

pmpm

pmpm(amount: Any, member_months: Any) -> Any

Calculate amount per member month.

Source code in actuarialpy/metrics.py
def pmpm(amount: Any, member_months: Any) -> Any:
    """Calculate amount per member month."""
    return per_exposure(amount, member_months)

pspm

pspm(amount: Any, subscriber_months: Any) -> Any

Calculate amount per subscriber month.

Source code in actuarialpy/metrics.py
def pspm(amount: Any, subscriber_months: Any) -> Any:
    """Calculate amount per subscriber month."""
    return per_exposure(amount, subscriber_months)

pure_premium

pure_premium(losses: Any, exposure: Any) -> Any

Calculate pure premium: losses divided by exposure.

Source code in actuarialpy/metrics.py
def pure_premium(losses: Any, exposure: Any) -> Any:
    """Calculate pure premium: losses divided by exposure."""
    return per_exposure(losses, exposure)

ratio

ratio(numerator: Any, denominator: Any) -> Any

Calculate a generic ratio as numerator divided by denominator.

Source code in actuarialpy/metrics.py
def ratio(numerator: Any, denominator: Any) -> Any:
    """Calculate a generic ratio as numerator divided by denominator."""
    return safe_divide(numerator, denominator)

required_revenue

required_revenue(expense: Any, target_ratio: Any) -> Any

Revenue needed for an expense amount to hit a target ratio.

Source code in actuarialpy/metrics.py
def required_revenue(expense: Any, target_ratio: Any) -> Any:
    """Revenue needed for an expense amount to hit a target ratio."""
    return safe_divide(expense, target_ratio)

safe_divide

safe_divide(
    numerator: Any,
    denominator: Any,
    *,
    fill_value: float = np.nan
) -> Any

Safely divide numerator by denominator.

Scalars return scalars. Array-like inputs return NumPy arrays. Zero denominators are returned as fill_value.

Source code in actuarialpy/metrics.py
def safe_divide(numerator: Any, denominator: Any, *, fill_value: float = np.nan) -> Any:
    """Safely divide numerator by denominator.

    Scalars return scalars. Array-like inputs return NumPy arrays. Zero denominators
    are returned as ``fill_value``.
    """
    if isinstance(numerator, (int, float, np.number)) and isinstance(denominator, (int, float, np.number)):
        return fill_value if denominator == 0 else numerator / denominator

    numerator_arr = np.asarray(numerator, dtype=float)
    denominator_arr = np.asarray(denominator, dtype=float)
    numerator_b, denominator_b = np.broadcast_arrays(numerator_arr, denominator_arr)
    return np.divide(
        numerator_b,
        denominator_b,
        out=np.full(numerator_b.shape, fill_value, dtype=float),
        where=denominator_b != 0,
    )

severity

severity(losses: Any, claim_count: Any) -> Any

Calculate severity: losses divided by claim count.

Source code in actuarialpy/metrics.py
def severity(losses: Any, claim_count: Any) -> Any:
    """Calculate severity: losses divided by claim count."""
    return ratio(losses, claim_count)

utilization_per_1000

utilization_per_1000(
    claim_count: Any,
    exposure: Any,
    *,
    annualization: float = 12
) -> Any

Annualized utilization per 1,000 members.

Returns claim_count / exposure * annualization * 1000. With monthly member months as exposure the default annualization=12 yields services (admits, visits, scripts, ...) per 1,000 members per year. If exposure is already in member-years, pass annualization=1.

Source code in actuarialpy/metrics.py
def utilization_per_1000(claim_count: Any, exposure: Any, *, annualization: float = 12) -> Any:
    """Annualized utilization per 1,000 members.

    Returns ``claim_count / exposure * annualization * 1000``. With monthly member
    months as ``exposure`` the default ``annualization=12`` yields services (admits,
    visits, scripts, ...) per 1,000 members per year. If ``exposure`` is already in
    member-years, pass ``annualization=1``.
    """
    return ratio(claim_count, exposure) * annualization * 1000

apply_completion

apply_completion(
    df: DataFrame,
    factors: Series | DataFrame,
    *,
    value_col: str,
    date_col: str | None = None,
    valuation_date: Any = None,
    development_col: str | None = None,
    by: str | list[str] | None = None,
    factor_col: str = "completion_factor",
    development_name: str = "development_month",
    out_col: str | None = None,
    copy: bool = True
) -> pd.DataFrame

Develop a paid amount to estimated ultimate with completion factors.

For each row the development period is taken from development_col if supplied, otherwise computed as development_months(df[date_col], valuation_date) -- the convention :func:make_completion_triangle uses, so factors from :func:completion_factors or :func:completion_factors_by join by construction. The completed amount is paid / factor (the divide convention, factors in (0, 1]).

factors may be either of:

  • a flat Series indexed by development period (one pattern for the whole frame), or
  • a tidy DataFrame of per-segment factors -- grouping column(s), a development-period column (development_name) and a factor column (factor_col), the shape :func:completion_factors_by returns -- joined on by plus development period. The table must be unique on by + [development] (a duplicate would fan out the data); this is checked.

The join is by value, never index alignment, so the frame's own index is irrelevant. A row past its (group's) largest development period is taken as fully complete (factor 1.0); a development period inside the fitted range but absent stays NaN -- a surfaced gap; a row whose group is absent from the factor table stays NaN; a negative development period (incurred after valuation_date) raises. Supply either development_col, or both date_col and valuation_date.

Source code in actuarialpy/reserving.py
def apply_completion(
    df: pd.DataFrame,
    factors: pd.Series | pd.DataFrame,
    *,
    value_col: str,
    date_col: str | None = None,
    valuation_date: Any = None,
    development_col: str | None = None,
    by: str | list[str] | None = None,
    factor_col: str = "completion_factor",
    development_name: str = "development_month",
    out_col: str | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Develop a paid amount to estimated ultimate with completion factors.

    For each row the development period is taken from ``development_col`` if supplied,
    otherwise computed as ``development_months(df[date_col], valuation_date)`` -- the
    convention :func:`make_completion_triangle` uses, so factors from
    :func:`completion_factors` or :func:`completion_factors_by` join by construction.
    The completed amount is ``paid / factor`` (the divide convention, factors in
    ``(0, 1]``).

    ``factors`` may be either of:

    - a flat Series indexed by development period (one pattern for the whole frame), or
    - a tidy DataFrame of per-segment factors -- grouping column(s), a development-period
      column (``development_name``) and a factor column (``factor_col``), the shape
      :func:`completion_factors_by` returns -- joined on ``by`` plus development period.
      The table must be unique on ``by + [development]`` (a duplicate would fan out the
      data); this is checked.

    The join is by value, never index alignment, so the frame's own index is irrelevant.
    A row past its (group's) largest development period is taken as fully complete
    (factor ``1.0``); a development period inside the fitted range but absent stays
    ``NaN`` -- a surfaced gap; a row whose group is absent from the factor table stays
    ``NaN``; a negative development period (incurred after ``valuation_date``) raises.
    Supply either ``development_col``, or both ``date_col`` and ``valuation_date``.
    """
    if development_col is None and (date_col is None or valuation_date is None):
        raise ValueError(
            "Provide development_col, or both date_col and valuation_date, to determine each row's development period."
        )
    by_cols = as_list(by)
    needed = [value_col] + ([development_col] if development_col is not None else [date_col]) + by_cols
    validate_columns(df, needed)
    result = df.copy() if copy else df

    factor = _emerged_factor(
        result, factors, date_col=date_col, valuation_date=valuation_date, development_col=development_col,
        by_cols=by_cols, factor_col=factor_col, development_name=development_name,
    )
    result[out_col or f"{value_col}_completed"] = result[value_col].to_numpy() / factor
    return result

chain_ladder_by

chain_ladder_by(
    df: DataFrame,
    *,
    groupby: str | list[str],
    origin_col: str,
    valuation_col: str,
    amount_col: str,
    cumulative: bool = True,
    method: str = "volume",
    tail: float = 1.0,
    on_insufficient: str = "raise",
    warn: bool = True
) -> dict[Any, ChainLadder]

Fit a chain-ladder development pattern per segment of df.

Groups df by groupby, builds a development triangle for each segment (see :func:make_completion_triangle), and fits a :class:ChainLadder to each. Returns {segment_key: ChainLadder} -- the key is a scalar for a single grouping column, or a tuple for several.

Segments too small to fit (fewer than two origins or development periods, a zero cumulative, and so on) are handled by on_insufficient:

  • "raise" (default): raise a ValueError naming the failing segment.
  • "skip": omit those segments from the result.
  • "aggregate": use the pooled pattern fit on the whole frame for them.

When on_insufficient is "skip" or "aggregate" and warn is true, an :class:InsufficientDataWarning naming the affected segments is emitted; warn=False suppresses it (the standard :mod:warnings filters also apply). To ignore thin segments entirely, use on_insufficient="skip", warn=False.

Source code in actuarialpy/reserving.py
def chain_ladder_by(
    df: pd.DataFrame,
    *,
    groupby: str | list[str],
    origin_col: str,
    valuation_col: str,
    amount_col: str,
    cumulative: bool = True,
    method: str = "volume",
    tail: float = 1.0,
    on_insufficient: str = "raise",
    warn: bool = True,
) -> dict[Any, ChainLadder]:
    """Fit a chain-ladder development pattern per segment of ``df``.

    Groups ``df`` by ``groupby``, builds a development triangle for each segment
    (see :func:`make_completion_triangle`), and fits a :class:`ChainLadder` to
    each. Returns ``{segment_key: ChainLadder}`` -- the key is a scalar for a
    single grouping column, or a tuple for several.

    Segments too small to fit (fewer than two origins or development periods, a zero cumulative,
    and so on) are handled by ``on_insufficient``:

    - ``"raise"`` (default): raise a ``ValueError`` naming the failing segment.
    - ``"skip"``: omit those segments from the result.
    - ``"aggregate"``: use the pooled pattern fit on the whole frame for them.

    When ``on_insufficient`` is ``"skip"`` or ``"aggregate"`` and ``warn`` is true,
    an :class:`InsufficientDataWarning` naming the affected segments is emitted;
    ``warn=False`` suppresses it (the standard :mod:`warnings` filters also apply).
    To ignore thin segments entirely, use ``on_insufficient="skip", warn=False``.
    """
    if on_insufficient not in ("raise", "skip", "aggregate"):
        raise ValueError("on_insufficient must be 'raise', 'skip', or 'aggregate'")
    group_cols = as_list(groupby)
    if not group_cols:
        raise ValueError("groupby must name at least one column")
    validate_columns(df, group_cols + [origin_col, valuation_col, amount_col])

    def _fit(frame: pd.DataFrame) -> ChainLadder:
        triangle = make_completion_triangle(
            frame,
            origin_col=origin_col,
            valuation_col=valuation_col,
            amount_col=amount_col,
            cumulative=cumulative,
        )
        return ChainLadder.fit(triangle, method=method, tail=tail)

    aggregate_pattern: ChainLadder | None = None
    if on_insufficient == "aggregate":
        try:
            aggregate_pattern = _fit(df)
        except ValueError as exc:
            raise ValueError(f"cannot fit the aggregate fallback pattern: {exc}") from exc

    by_key = group_cols if len(group_cols) > 1 else group_cols[0]
    patterns: dict[Any, ChainLadder] = {}
    insufficient: list[Any] = []
    for key, part in df.groupby(by_key, sort=True):
        try:
            patterns[key] = _fit(part)
        except ValueError as exc:
            if on_insufficient == "raise":
                raise ValueError(f"segment {key!r} cannot be fit: {exc}") from exc
            insufficient.append(key)
            if on_insufficient == "aggregate" and aggregate_pattern is not None:
                patterns[key] = aggregate_pattern

    if insufficient and warn:
        action = "using the aggregate pattern for" if on_insufficient == "aggregate" else "skipping"
        warnings.warn(
            f"{action} {len(insufficient)} segment(s) with insufficient data: {insufficient}",
            InsufficientDataWarning,
            stacklevel=2,
        )
    return patterns

completion_factors

completion_factors(
    triangle: DataFrame,
    *,
    method: str = "volume",
    tail: float = 1.0
) -> pd.Series

Completion factors by development period, via chain-ladder.

Convenience wrapper around :class:ChainLadder: returns the proportion of ultimate emerged by each development period (1 / cdf) estimated from a cumulative triangle. Divide-convention factors in (0, 1] (completed = paid / factor). See :class:ChainLadder for the full pattern and per-origin ultimate/IBNR.

Source code in actuarialpy/reserving.py
def completion_factors(triangle: pd.DataFrame, *, method: str = "volume", tail: float = 1.0) -> pd.Series:
    """Completion factors by development period, via chain-ladder.

    Convenience wrapper around :class:`ChainLadder`: returns the proportion of
    ultimate emerged by each development period (``1 / cdf``) estimated from a cumulative
    triangle. Divide-convention factors in ``(0, 1]`` (``completed = paid /
    factor``). See :class:`ChainLadder` for the full pattern and per-origin
    ultimate/IBNR.
    """
    return ChainLadder.fit(triangle, method=method, tail=tail).completion_factors

completion_factors_by

completion_factors_by(
    df: DataFrame,
    *,
    groupby: str | list[str],
    origin_col: str,
    valuation_col: str,
    amount_col: str,
    cumulative: bool = True,
    method: str = "volume",
    tail: float = 1.0,
    on_insufficient: str = "raise",
    warn: bool = True,
    development_name: str = "development_month"
) -> pd.DataFrame

Completion factors per segment as a tidy table.

Convenience over :func:chain_ladder_by: one row per (segment, development period) with the completion factor, ready to review, pivot, or join. Columns are the grouping column(s), development_name, and completion_factor. on_insufficient and warn behave as in :func:chain_ladder_by.

Source code in actuarialpy/reserving.py
def completion_factors_by(
    df: pd.DataFrame,
    *,
    groupby: str | list[str],
    origin_col: str,
    valuation_col: str,
    amount_col: str,
    cumulative: bool = True,
    method: str = "volume",
    tail: float = 1.0,
    on_insufficient: str = "raise",
    warn: bool = True,
    development_name: str = "development_month",
) -> pd.DataFrame:
    """Completion factors per segment as a tidy table.

    Convenience over :func:`chain_ladder_by`: one row per (segment, development period) with the
    completion factor, ready to review, pivot, or join. Columns are the grouping
    column(s), ``development_name``, and ``completion_factor``. ``on_insufficient`` and
    ``warn`` behave as in :func:`chain_ladder_by`.
    """
    group_cols = as_list(groupby)
    patterns = chain_ladder_by(
        df,
        groupby=groupby,
        origin_col=origin_col,
        valuation_col=valuation_col,
        amount_col=amount_col,
        cumulative=cumulative,
        method=method,
        tail=tail,
        on_insufficient=on_insufficient,
        warn=warn,
    )
    records: list[dict[str, Any]] = []
    for key, fitted in patterns.items():
        key_tuple = key if isinstance(key, tuple) else (key,)
        key_map = dict(zip(group_cols, key_tuple))
        for development, factor in fitted.completion_factors.items():
            records.append({**key_map, development_name: development, "completion_factor": float(factor)})
    if not records:
        return pd.DataFrame(columns=group_cols + [development_name, "completion_factor"])
    return pd.DataFrame.from_records(records)

develop_ultimate

develop_ultimate(
    df: DataFrame,
    factors: Series | DataFrame,
    *,
    method: str = "bornhuetter_ferguson",
    value_col: str,
    date_col: str | None = None,
    valuation_date: Any = None,
    development_col: str | None = None,
    apriori_col: str | None = None,
    exposure_col: str | None = None,
    by: str | list[str] | None = None,
    factor_col: str = "completion_factor",
    development_name: str = "development_month",
    out_col: str | None = None,
    copy: bool = True
) -> pd.DataFrame

Develop a paid amount to estimated ultimate by a chosen reserving method.

All methods share one input -- the proportion emerged at each row's development period, joined exactly as :func:apply_completion does (flat Series or per-segment table, beyond-the-triangle rows fully emerged). They differ only in how they combine that with the paid-to-date and an a priori expectation:

  • "chain_ladder" -- paid / emerged. Ignores the a priori; equivalent to :func:apply_completion. Volatile for immature periods (a thin latest diagonal drives the whole tail).
  • "bornhuetter_ferguson" -- paid + apriori * (1 - emerged). Takes the unemerged portion from the a priori rather than from the data, so it is stable for green periods. Requires apriori_col (an expected ultimate per row -- an input, e.g. a plan, budget, or manual times exposure).
  • "benktander" -- one Bornhuetter-Ferguson iteration using the BF ultimate as the a priori: paid + bf * (1 - emerged). A credibility blend sitting between BF and chain ladder (weight emerged on chain ladder). Requires apriori_col.
  • "cape_cod" -- Bornhuetter-Ferguson with the a priori derived from the data: a single expected loss ratio per segment, sum(paid) / sum(exposure * emerged), times each row's exposure. Requires exposure_col (an on-level premium / exposure per row). The loss ratio is mechanical; the exposure base is an input.

The library applies a method; it does not pick the a priori or the exposure base. Supply either development_col or both date_col and valuation_date; pass by with a per-segment factor table (and Cape Cod then derives one loss ratio per segment). Returns df with an out_col (default f"{value_col}_ultimate").

Source code in actuarialpy/reserving.py
def develop_ultimate(
    df: pd.DataFrame,
    factors: pd.Series | pd.DataFrame,
    *,
    method: str = "bornhuetter_ferguson",
    value_col: str,
    date_col: str | None = None,
    valuation_date: Any = None,
    development_col: str | None = None,
    apriori_col: str | None = None,
    exposure_col: str | None = None,
    by: str | list[str] | None = None,
    factor_col: str = "completion_factor",
    development_name: str = "development_month",
    out_col: str | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Develop a paid amount to estimated ultimate by a chosen reserving method.

    All methods share one input -- the proportion emerged at each row's development
    period, joined exactly as :func:`apply_completion` does (flat Series or per-segment
    table, beyond-the-triangle rows fully emerged). They differ only in how they combine
    that with the paid-to-date and an *a priori* expectation:

    - ``"chain_ladder"`` -- ``paid / emerged``. Ignores the a priori; equivalent to
      :func:`apply_completion`. Volatile for immature periods (a thin latest diagonal
      drives the whole tail).
    - ``"bornhuetter_ferguson"`` -- ``paid + apriori * (1 - emerged)``. Takes the
      unemerged portion from the a priori rather than from the data, so it is stable for
      green periods. Requires ``apriori_col`` (an expected ultimate per row -- an input,
      e.g. a plan, budget, or manual times exposure).
    - ``"benktander"`` -- one Bornhuetter-Ferguson iteration using the BF ultimate as the
      a priori: ``paid + bf * (1 - emerged)``. A credibility blend sitting between BF and
      chain ladder (weight ``emerged`` on chain ladder). Requires ``apriori_col``.
    - ``"cape_cod"`` -- Bornhuetter-Ferguson with the a priori derived from the data: a
      single expected loss ratio per segment, ``sum(paid) / sum(exposure * emerged)``,
      times each row's exposure. Requires ``exposure_col`` (an on-level premium /
      exposure per row). The loss ratio is mechanical; the exposure base is an input.

    The library applies a method; it does not pick the a priori or the exposure base.
    Supply either ``development_col`` or both ``date_col`` and ``valuation_date``; pass
    ``by`` with a per-segment factor table (and Cape Cod then derives one loss ratio per
    segment). Returns ``df`` with an ``out_col`` (default ``f"{value_col}_ultimate"``).
    """
    methods = {"chain_ladder", "bornhuetter_ferguson", "benktander", "cape_cod"}
    if method not in methods:
        raise ValueError(f"method must be one of {sorted(methods)}; got {method!r}.")
    if development_col is None and (date_col is None or valuation_date is None):
        raise ValueError(
            "Provide development_col, or both date_col and valuation_date, to determine each row's development period."
        )
    by_cols = as_list(by)
    needed = [value_col] + ([development_col] if development_col is not None else [date_col]) + by_cols
    if method in ("bornhuetter_ferguson", "benktander"):
        if apriori_col is None:
            raise ValueError(f"method={method!r} requires apriori_col (an expected ultimate per row).")
        needed.append(apriori_col)
    if method == "cape_cod":
        if exposure_col is None:
            raise ValueError("method='cape_cod' requires exposure_col (an on-level premium / exposure per row).")
        needed.append(exposure_col)
    validate_columns(df, needed)
    result = df.copy() if copy else df

    emerged = _emerged_factor(
        result, factors, date_col=date_col, valuation_date=valuation_date, development_col=development_col,
        by_cols=by_cols, factor_col=factor_col, development_name=development_name,
    )
    paid = result[value_col].to_numpy(dtype="float64")

    if method == "chain_ladder":
        ultimate = paid / emerged
    elif method == "bornhuetter_ferguson":
        apriori = result[apriori_col].to_numpy(dtype="float64")
        ultimate = paid + apriori * (1.0 - emerged)
    elif method == "benktander":
        apriori = result[apriori_col].to_numpy(dtype="float64")
        bf = paid + apriori * (1.0 - emerged)
        ultimate = paid + bf * (1.0 - emerged)
    else:  # cape_cod
        exposure = result[exposure_col].to_numpy(dtype="float64")
        elr = _cape_cod_elr(paid, exposure, emerged, result, by_cols)
        ultimate = paid + exposure * elr * (1.0 - emerged)

    result[out_col or f"{value_col}_ultimate"] = ultimate
    return result

development_months

development_months(incurred_date, valuation_date)

Whole months of development between incurred (origin) and valuation.

Either argument may be a scalar, a Series, or array-like, in any combination (e.g. a column of incurred dates against a single valuation date). The result is a Series when either argument is a Series, otherwise a scalar.

Source code in actuarialpy/reserving.py
def development_months(incurred_date, valuation_date):
    """Whole months of development between incurred (origin) and valuation.

    Either argument may be a scalar, a Series, or array-like, in any combination
    (e.g. a column of incurred dates against a single valuation date). The result is
    a Series when either argument is a Series, otherwise a scalar.
    """
    incurred = pd.to_datetime(incurred_date)
    valuation = pd.to_datetime(valuation_date)

    def year_month(value):
        accessor = value.dt if hasattr(value, "dt") else value  # Series use .dt; Timestamp/Index don't
        return accessor.year, accessor.month

    incurred_year, incurred_month = year_month(incurred)
    valuation_year, valuation_month = year_month(valuation)
    return (valuation_year - incurred_year) * 12 + (valuation_month - incurred_month)

ibnr

ibnr(completed, paid)

IBNR as completed minus paid (the completed/paid identity).

Works element-wise on scalars or Series. completed and paid must be on the same basis; the result is the amount bridging paid-to-date to ultimate.

Source code in actuarialpy/reserving.py
def ibnr(completed, paid):
    """IBNR as completed minus paid (the completed/paid identity).

    Works element-wise on scalars or Series. ``completed`` and ``paid`` must be on
    the same basis; the result is the amount bridging paid-to-date to ultimate.
    """
    return completed - paid

make_completion_triangle

make_completion_triangle(
    df: DataFrame,
    *,
    origin_col: str,
    valuation_col: str,
    amount_col: str,
    cumulative: bool = True,
    index_name: str = "origin_period",
    development_name: str = "development_month"
) -> pd.DataFrame

Build a development (completion) triangle by origin period and development period.

Each cell aggregates amount_col for an origin month at a given valuation development period (whole months between origin and valuation, via :func:development_months). amount_col is treated as the incremental amount in each (origin, development period) cell; with cumulative=True -- the default, and the usual basis for estimating development/completion factors -- the cells are accumulated across development period. Set cumulative=False to return the incremental triangle, or if your input amounts are already cumulative-to-date snapshots.

This consumes a compact development aggregate (one row per origin x valuation, i.e. months x months); it does not require transaction/line-level data.

Source code in actuarialpy/reserving.py
def make_completion_triangle(
    df: pd.DataFrame,
    *,
    origin_col: str,
    valuation_col: str,
    amount_col: str,
    cumulative: bool = True,
    index_name: str = "origin_period",
    development_name: str = "development_month",
) -> pd.DataFrame:
    """Build a development (completion) triangle by origin period and development period.

    Each cell aggregates ``amount_col`` for an origin month at a given valuation
    development period (whole months between origin and valuation, via :func:`development_months`).
    ``amount_col`` is treated as the *incremental* amount in each (origin, development period)
    cell; with ``cumulative=True`` -- the default, and the usual basis for
    estimating development/completion factors -- the cells are accumulated across
    development period. Set ``cumulative=False`` to return the incremental triangle, or if your
    input amounts are already cumulative-to-date snapshots.

    This consumes a compact development aggregate (one row per origin x valuation,
    i.e. months x months); it does not require transaction/line-level data.
    """
    validate_columns(df, [origin_col, valuation_col, amount_col])
    temp = df.copy()
    temp[index_name] = pd.to_datetime(temp[origin_col]).dt.to_period("M")
    temp[development_name] = development_months(temp[origin_col], temp[valuation_col])
    grouped = temp.groupby([index_name, development_name], dropna=False)[amount_col].sum().reset_index()
    triangle = grouped.pivot(index=index_name, columns=development_name, values=amount_col).sort_index(axis=1)
    if cumulative:
        triangle = triangle.cumsum(axis=1)
    return triangle

validate_completion_factors

validate_completion_factors(
    factors: DataFrame,
    factor_col: str = "completion_factor",
    *,
    method: str = "divide"
) -> None

Validate completion-factor values for a selected convention.

divide factors (completed = paid / factor) should satisfy 0 < factor <= 1; multiply factors (completed = paid * factor) should satisfy factor >= 1. Useful as a sanity check on estimated factors before they are applied upstream.

Source code in actuarialpy/reserving.py
def validate_completion_factors(
    factors: pd.DataFrame, factor_col: str = "completion_factor", *, method: str = "divide"
) -> None:
    """Validate completion-factor values for a selected convention.

    ``divide`` factors (completed = paid / factor) should satisfy
    ``0 < factor <= 1``; ``multiply`` factors (completed = paid * factor) should
    satisfy ``factor >= 1``. Useful as a sanity check on estimated factors before
    they are applied upstream.
    """
    validate_columns(factors, [factor_col])
    values = factors[factor_col]
    bad_missing = values.isna()
    if bad_missing.any():
        raise ValueError(f"{bad_missing.sum()} completion factors are missing")
    if method == "divide":
        bad = (values <= 0) | (values > 1)
        if bad.any():
            raise ValueError("divide-method completion factors should generally satisfy 0 < factor <= 1")
    elif method == "multiply":
        bad = values < 1
        if bad.any():
            raise ValueError("multiply-method completion factors should generally satisfy factor >= 1")
    else:
        raise ValueError("method must be either 'divide' or 'multiply'")

credibility_weighted_estimate

credibility_weighted_estimate(
    observed: Any, complement: Any, z: Any
) -> Any

Blend an observed estimate with its complement at credibility z.

Returns z * observed + (1 - z) * complement. Scalar inputs return a native float; pandas.Series inputs return a Series with the index preserved; other array-like inputs return a numpy.ndarray. This is the atomic credibility operation; the z may come from a model below, a filed credibility formula, or any other source.

Source code in actuarialpy/credibility.py
def credibility_weighted_estimate(observed: Any, complement: Any, z: Any) -> Any:
    """Blend an observed estimate with its complement at credibility ``z``.

    Returns ``z * observed + (1 - z) * complement``. Scalar inputs return a
    native ``float``; ``pandas.Series`` inputs return a ``Series`` with the index
    preserved; other array-like inputs return a ``numpy.ndarray``. This is the
    atomic credibility operation; the ``z`` may come from a model below, a filed
    credibility formula, or any other source.
    """
    if isinstance(observed, pd.Series) or isinstance(complement, pd.Series) or isinstance(z, pd.Series):
        return z * observed + (1 - z) * complement
    if isinstance(observed, _SCALAR_TYPES) and isinstance(complement, _SCALAR_TYPES) and isinstance(z, _SCALAR_TYPES):
        return float(z * observed + (1 - z) * complement)
    observed_arr = np.asarray(observed, dtype=float)
    complement_arr = np.asarray(complement, dtype=float)
    z_arr = np.asarray(z, dtype=float)
    return z_arr * observed_arr + (1 - z_arr) * complement_arr

full_credibility_claims

full_credibility_claims(
    *,
    confidence: float = 0.9,
    tolerance: float = 0.05,
    severity_cv: float | None = None
) -> float

Classical full-credibility standard, in expected number of claims.

Returns the expected claim count for full credibility under the limited-fluctuation model: (z / k) ** 2 for claim frequency, where z is the standard-normal quantile for two-sided confidence and k is the tolerance. The classic 90% / 5% choice gives about 1082 claims. Supplying severity_cv (the coefficient of variation of individual claim severity) inflates it to (z / k) ** 2 * (1 + severity_cv ** 2) for aggregate losses rather than pure frequency.

Many shops use a filed standard instead; pass that straight to :func:limited_fluctuation_z.

Source code in actuarialpy/credibility.py
def full_credibility_claims(
    *, confidence: float = 0.90, tolerance: float = 0.05, severity_cv: float | None = None
) -> float:
    """Classical full-credibility standard, in expected number of claims.

    Returns the expected claim count for full credibility under the
    limited-fluctuation model: ``(z / k) ** 2`` for claim frequency, where ``z`` is
    the standard-normal quantile for two-sided ``confidence`` and ``k`` is the
    ``tolerance``. The classic 90% / 5% choice gives about 1082 claims. Supplying
    ``severity_cv`` (the coefficient of variation of individual claim severity)
    inflates it to ``(z / k) ** 2 * (1 + severity_cv ** 2)`` for aggregate losses
    rather than pure frequency.

    Many shops use a filed standard instead; pass that straight to
    :func:`limited_fluctuation_z`.
    """
    if not 0.0 < confidence < 1.0:
        raise ValueError("confidence must be between 0 and 1.")
    if tolerance <= 0.0:
        raise ValueError("tolerance must be positive.")
    z = NormalDist().inv_cdf((1.0 + confidence) / 2.0)
    standard = (z / tolerance) ** 2
    if severity_cv is not None:
        if severity_cv < 0.0:
            raise ValueError("severity_cv must be non-negative.")
        standard *= 1.0 + severity_cv**2
    return standard

limited_fluctuation_z

limited_fluctuation_z(
    exposure: Any, full_credibility_standard: float
) -> Any

Limited-fluctuation (classical) credibility factor -- the square-root rule.

Returns Z = min(1, sqrt(exposure / full_credibility_standard)). exposure is the volume credibility is based on (claim counts, member months, life-years, ...) and full_credibility_standard is the amount of that volume required for full (Z = 1) credibility -- often a filed value. Scalars return a native float; pandas.Series inputs return a Series (index preserved); other array-likes return a numpy.ndarray, so credibility can be computed per group. Feed the result to :func:credibility_weighted_estimate to blend experience with its complement.

Source code in actuarialpy/credibility.py
def limited_fluctuation_z(exposure: Any, full_credibility_standard: float) -> Any:
    """Limited-fluctuation (classical) credibility factor -- the square-root rule.

    Returns ``Z = min(1, sqrt(exposure / full_credibility_standard))``. ``exposure``
    is the volume credibility is based on (claim counts, member months, life-years,
    ...) and ``full_credibility_standard`` is the amount of that volume required for
    full (``Z = 1``) credibility -- often a filed value. Scalars return a native
    ``float``; ``pandas.Series`` inputs return a ``Series`` (index preserved); other
    array-likes return a ``numpy.ndarray``, so credibility can be computed per group.
    Feed the result to :func:`credibility_weighted_estimate` to blend experience with
    its complement.
    """
    if full_credibility_standard <= 0:
        raise ValueError("full_credibility_standard must be positive.")
    if isinstance(exposure, _SCALAR_TYPES):
        return float(min(1.0, np.sqrt(max(float(exposure), 0.0) / full_credibility_standard)))
    ratio_arr = np.asarray(exposure, dtype=float) / full_credibility_standard
    z_arr = np.minimum(np.sqrt(np.clip(ratio_arr, 0.0, None)), 1.0)
    if isinstance(exposure, pd.Series):
        return pd.Series(z_arr, index=exposure.index, name=exposure.name)
    return z_arr

add_months_in_force

add_months_in_force(
    df: DataFrame,
    *,
    effective_col: str,
    period_start,
    period_end,
    termination_col: str | None = None,
    out_col: str = "months_in_force",
    copy: bool = True
) -> pd.DataFrame

Add whole months of overlap between each entity's in-force window and a period.

The in-force window is [effective, termination] (a missing termination means the period end). The result is clipped to [period_start, period_end] and floored at 0. Month counting is inclusive of both endpoint months, so a full coverage of an N-month period returns N.

Source code in actuarialpy/lifecycle.py
def add_months_in_force(
    df: pd.DataFrame,
    *,
    effective_col: str,
    period_start,
    period_end,
    termination_col: str | None = None,
    out_col: str = "months_in_force",
    copy: bool = True,
) -> pd.DataFrame:
    """Add whole months of overlap between each entity's in-force window and a period.

    The in-force window is ``[effective, termination]`` (a missing termination
    means the period end). The result is clipped to ``[period_start, period_end]``
    and floored at 0. Month counting is inclusive of both endpoint months, so a
    full coverage of an N-month period returns N.
    """
    cols = [effective_col] + ([termination_col] if termination_col else [])
    validate_columns(df, cols)
    result = df.copy() if copy else df

    start = pd.to_datetime(period_start)
    end = pd.to_datetime(period_end)

    eff = _to_dt(result[effective_col])
    if termination_col:
        term = _to_dt(result[termination_col]).fillna(end)
    else:
        term = pd.Series(end, index=result.index)

    eff_clipped = eff.clip(lower=start)
    term_clipped = term.clip(upper=end)
    months = _months_between_series(eff_clipped, term_clipped) + 1
    result[out_col] = months.clip(lower=0)
    return result

add_tenure

add_tenure(
    df: DataFrame,
    effective_col: str,
    as_of,
    *,
    tenure_col: str = "tenure_months",
    one_based: bool = False,
    copy: bool = True
) -> pd.DataFrame

Add tenure in whole months from each entity's effective date to as_of.

as_of is a single reference date (e.g. the experience as-of date). With one_based=True an entity effective in the as-of month has tenure 1 rather than 0, matching "months of experience" conventions.

Source code in actuarialpy/lifecycle.py
def add_tenure(
    df: pd.DataFrame,
    effective_col: str,
    as_of,
    *,
    tenure_col: str = "tenure_months",
    one_based: bool = False,
    copy: bool = True,
) -> pd.DataFrame:
    """Add tenure in whole months from each entity's effective date to ``as_of``.

    ``as_of`` is a single reference date (e.g. the experience as-of date). With
    ``one_based=True`` an entity effective in the as-of month has tenure 1 rather
    than 0, matching "months of experience" conventions.
    """
    validate_columns(df, [effective_col])
    result = df.copy() if copy else df
    eff = _to_dt(result[effective_col])
    as_of_ts = pd.to_datetime(as_of)
    tenure = (as_of_ts.year - eff.dt.year) * 12 + (as_of_ts.month - eff.dt.month)
    result[tenure_col] = tenure + 1 if one_based else tenure
    return result

derive_status

derive_status(
    df: DataFrame,
    *,
    effective_col: str,
    as_of,
    termination_col: str | None = None,
    first_year_months: int = 12,
    status_col: str = "status",
    labels: dict[str, str] | None = None,
    copy: bool = True
) -> pd.DataFrame

Derive an active / first-year / termed status as of a reference date.

Classification (in precedence order):

  • termed: a termination date is present and on/before as_of.
  • first_year: not termed and tenure (as_of minus effective) is less than first_year_months. The window is a parameter because "first year" means the first 12 months in some shops and the first policy year in others.
  • active: in force beyond the first-year window.

labels optionally remaps the three canonical values, e.g. {"first_year": "First Year Account", "termed": "Term"}.

Source code in actuarialpy/lifecycle.py
def derive_status(
    df: pd.DataFrame,
    *,
    effective_col: str,
    as_of,
    termination_col: str | None = None,
    first_year_months: int = 12,
    status_col: str = "status",
    labels: dict[str, str] | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Derive an active / first-year / termed status as of a reference date.

    Classification (in precedence order):

    - **termed**: a termination date is present and on/before ``as_of``.
    - **first_year**: not termed and tenure (``as_of`` minus effective) is less
      than ``first_year_months``. The window is a parameter because "first year"
      means the first 12 months in some shops and the first policy year in
      others.
    - **active**: in force beyond the first-year window.

    ``labels`` optionally remaps the three canonical values, e.g.
    ``{"first_year": "First Year Account", "termed": "Term"}``.
    """
    cols = [effective_col] + ([termination_col] if termination_col else [])
    validate_columns(df, cols)
    result = df.copy() if copy else df

    eff = _to_dt(result[effective_col])
    as_of_ts = pd.to_datetime(as_of)
    tenure = (as_of_ts.year - eff.dt.year) * 12 + (as_of_ts.month - eff.dt.month)

    if termination_col:
        term = _to_dt(result[termination_col])
        termed = term.notna() & (term <= as_of_ts)
    else:
        termed = pd.Series(False, index=result.index)

    first_year = (~termed) & (tenure < first_year_months)

    status_values = np.where(termed, STATUS_TERMED, np.where(first_year, STATUS_FIRST_YEAR, STATUS_ACTIVE))
    status = pd.Series(status_values, index=result.index)
    if labels:
        status = status.map(lambda s: labels.get(s, s))
    result[status_col] = status
    return result

earned_exposure

earned_exposure(
    df: DataFrame,
    exposure_col: str,
    *,
    effective_col: str,
    period_start,
    period_end,
    termination_col: str | None = None,
    period_months: int | None = None,
    out_col: str | None = None,
    copy: bool = True
) -> pd.DataFrame

Prorate a full-period exposure by the fraction of the period in force.

earned = exposure * months_in_force / period_months. Use this when each row carries a full-period exposure (e.g. annualized) that must be reduced for mid-period entry or termination. If your data is already monthly, filtering to in-force months with :func:is_in_force is usually simpler.

Source code in actuarialpy/lifecycle.py
def earned_exposure(
    df: pd.DataFrame,
    exposure_col: str,
    *,
    effective_col: str,
    period_start,
    period_end,
    termination_col: str | None = None,
    period_months: int | None = None,
    out_col: str | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Prorate a full-period exposure by the fraction of the period in force.

    ``earned = exposure * months_in_force / period_months``. Use this when each
    row carries a full-period exposure (e.g. annualized) that must be reduced for
    mid-period entry or termination. If your data is already monthly, filtering
    to in-force months with :func:`is_in_force` is usually simpler.
    """
    validate_columns(df, [exposure_col])
    result = add_months_in_force(
        df,
        effective_col=effective_col,
        termination_col=termination_col,
        period_start=period_start,
        period_end=period_end,
        out_col="_months_in_force_tmp",
        copy=copy,
    )
    if period_months is None:
        start = pd.to_datetime(period_start)
        end = pd.to_datetime(period_end)
        period_months = (end.year - start.year) * 12 + (end.month - start.month) + 1
    name = out_col or f"earned_{exposure_col}"
    fraction = result["_months_in_force_tmp"] / period_months
    result[name] = result[exposure_col] * fraction
    return result.drop(columns="_months_in_force_tmp")

is_in_force

is_in_force(
    df: DataFrame,
    *,
    effective_col: str,
    period_start,
    period_end,
    termination_col: str | None = None
) -> pd.Series

Boolean Series: in force at any point during [period_start, period_end].

In force when effective on/before period_end and the entity had not terminated before period_start (a missing termination date means still in force).

Source code in actuarialpy/lifecycle.py
def is_in_force(
    df: pd.DataFrame,
    *,
    effective_col: str,
    period_start,
    period_end,
    termination_col: str | None = None,
) -> pd.Series:
    """Boolean Series: in force at any point during ``[period_start, period_end]``.

    In force when effective on/before ``period_end`` and the entity had not
    terminated before ``period_start`` (a missing termination date means still
    in force).
    """
    cols = [effective_col] + ([termination_col] if termination_col else [])
    validate_columns(df, cols)
    eff = _to_dt(df[effective_col])
    start = pd.to_datetime(period_start)
    end = pd.to_datetime(period_end)
    in_force = eff <= end
    if termination_col:
        term = _to_dt(df[termination_col])
        in_force = in_force & (term.isna() | (term >= start))
    return in_force

assign_band

assign_band(
    df: DataFrame,
    value_col: str,
    bands: Sequence[float],
    *,
    labels: Sequence[str] | None = None,
    band_col: str = "band",
    right: bool = False,
    copy: bool = True
) -> pd.DataFrame

Assign each row to an ordered size band based on value_col.

bands are bin edges. For integer counts the natural form is left-closed (right=False), so bands=[0, 51, 76, 151, 251, 501, inf] yields [0, 51), [51, 76), .... A trailing float("inf") captures the open top band. The resulting column is an ordered categorical so downstream group-bys keep band order.

Source code in actuarialpy/banding.py
def assign_band(
    df: pd.DataFrame,
    value_col: str,
    bands: Sequence[float],
    *,
    labels: Sequence[str] | None = None,
    band_col: str = "band",
    right: bool = False,
    copy: bool = True,
) -> pd.DataFrame:
    """Assign each row to an ordered size band based on ``value_col``.

    ``bands`` are bin edges. For integer counts the natural form is left-closed
    (``right=False``), so ``bands=[0, 51, 76, 151, 251, 501, inf]`` yields
    ``[0, 51)``, ``[51, 76)``, .... A trailing ``float("inf")`` captures the open
    top band. The resulting column is an ordered categorical so downstream
    group-bys keep band order.
    """
    validate_columns(df, [value_col])
    edges = list(bands)
    if len(edges) < 2:
        raise ValueError("bands must contain at least two edges (one band).")
    if labels is None:
        labels = _default_labels(edges)
    if len(labels) != len(edges) - 1:
        raise ValueError(f"Expected {len(edges) - 1} labels for {len(edges)} edges, got {len(labels)}.")
    result = df.copy() if copy else df
    result[band_col] = pd.cut(
        result[value_col],
        bins=edges,
        labels=list(labels),
        right=right,
        include_lowest=True,
        ordered=True,
    )
    return result

summarize_by_band

summarize_by_band(
    df: DataFrame,
    value_col: str,
    bands: Sequence[float],
    *,
    labels: Sequence[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    band_col: str = "band",
    ratio_col: str | None = None,
    right: bool = False,
    profile: str | None = None
) -> pd.DataFrame

Assign size bands then summarize experience grouped by band.

Returns one row per band in band order (empty bands included), with the same aggregates, loss ratio, and per-exposure metrics as :func:~actuarialpy.experience.summarize_experience.

Source code in actuarialpy/banding.py
def summarize_by_band(
    df: pd.DataFrame,
    value_col: str,
    bands: Sequence[float],
    *,
    labels: Sequence[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    band_col: str = "band",
    ratio_col: str | None = None,
    right: bool = False,
    profile: str | None = None,
) -> pd.DataFrame:
    """Assign size bands then summarize experience grouped by band.

    Returns one row per band in band order (empty bands included), with the same
    aggregates, loss ratio, and per-exposure metrics as
    :func:`~actuarialpy.experience.summarize_experience`.
    """
    banded = assign_band(
        df,
        value_col,
        bands,
        labels=labels,
        band_col=band_col,
        right=right,
        copy=True,
    )
    summary = summarize_experience(
        banded,
        groupby=band_col,
        expense_cols=expense_cols,
        revenue_cols=revenue_cols,
        exposure_cols=exposure_cols,
        ratio_col=ratio_col,
        profile=profile,
    )
    # Preserve band order and surface empty bands explicitly.
    order = list(banded[band_col].cat.categories)
    summary[band_col] = pd.Categorical(summary[band_col], categories=order, ordered=True)
    return summary.sort_values(band_col).reset_index(drop=True)

adjust

adjust(
    df: DataFrame,
    factors: float | int | Series | DataFrame,
    *,
    value_col: str,
    on: str | list[str] | None = None,
    by: str | list[str] | None = None,
    how: str = "multiply",
    factor_col: str = "factor",
    out_col: str | None = None,
    audit_col: str | None = None,
    default: float | None = None,
    copy: bool = True
) -> pd.DataFrame

Multiply or divide a column by a factor joined on a key.

The general factor-application primitive behind trend, benefit / area / demographic relativities, network discounts -- any per-key multiplier. The factor for each row is taken from one of:

  • a scalar factors -- one factor for every row (e.g. a single trend factor);
  • a Series indexed by on -- one key column (e.g. an area factor by region);
  • a tidy DataFrame keyed by by + on with factor_col -- per-segment factors (the shape the *_by estimators return).

and applied to value_col: how="multiply" gives value * factor (loads, trend), how="divide" gives value / factor (backing a factor out).

The join is by value (the frame's index never participates); the factor table must be unique on its keys -- a duplicate would fan out the data -- which is enforced. An absent key gives default (NaN when default is None -- a surfaced gap, never silently filled); pass default=1.0 when a key missing from the table should mean "no adjustment". With audit_col, the cumulative net multiplier applied to value_col is accumulated there (factor for multiply, 1 / factor for divide), so a chain of adjustments leaves a per-row record of total restatement.

Source code in actuarialpy/adjustments.py
def adjust(
    df: pd.DataFrame,
    factors: float | int | pd.Series | pd.DataFrame,
    *,
    value_col: str,
    on: str | list[str] | None = None,
    by: str | list[str] | None = None,
    how: str = "multiply",
    factor_col: str = "factor",
    out_col: str | None = None,
    audit_col: str | None = None,
    default: float | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Multiply or divide a column by a factor joined on a key.

    The general factor-application primitive behind trend, benefit / area / demographic
    relativities, network discounts -- any per-key multiplier. The factor for each row is
    taken from one of:

    - a **scalar** ``factors`` -- one factor for every row (e.g. a single trend factor);
    - a **Series** indexed by ``on`` -- one key column (e.g. an area factor by region);
    - a tidy **DataFrame** keyed by ``by + on`` with ``factor_col`` -- per-segment factors
      (the shape the ``*_by`` estimators return).

    and applied to ``value_col``: ``how="multiply"`` gives ``value * factor`` (loads,
    trend), ``how="divide"`` gives ``value / factor`` (backing a factor out).

    The join is by value (the frame's index never participates); the factor table must be
    unique on its keys -- a duplicate would fan out the data -- which is enforced. An
    absent key gives ``default`` (``NaN`` when ``default`` is ``None`` -- a surfaced gap,
    never silently filled); pass ``default=1.0`` when a key missing from the table should
    mean "no adjustment". With ``audit_col``, the cumulative *net multiplier* applied to
    ``value_col`` is accumulated there (``factor`` for multiply, ``1 / factor`` for
    divide), so a chain of adjustments leaves a per-row record of total restatement.
    """
    if how not in ("multiply", "divide"):
        raise ValueError("how must be 'multiply' or 'divide'")
    on_cols = as_list(on)
    by_cols = as_list(by)
    validate_columns(df, [value_col] + on_cols + by_cols)
    result = df.copy() if copy else df

    if isinstance(factors, pd.DataFrame):
        keys = by_cols + on_cols
        if not keys:
            raise ValueError("Pass on=... (and optionally by=...) naming the key column(s) for a factor table.")
        factor = factor_lookup(result, factors, keys, factor_col=factor_col, default=default)
    elif isinstance(factors, pd.Series):
        if len(on_cols) != 1:
            raise ValueError("Pass on=<column> (one key) when factors is a Series indexed by that key.")
        if by_cols:
            raise ValueError("by= needs a tidy DataFrame of per-segment factors, not a Series.")
        factor = np.array(result[on_cols[0]].map(factors), dtype="float64")
        if default is not None:
            factor = np.where(np.isnan(factor), float(default), factor)
    elif isinstance(factors, bool):
        raise TypeError("factors must be a number, a Series keyed by `on`, or a tidy DataFrame.")
    elif isinstance(factors, (int, float)):
        factor = np.full(len(result), float(factors))
    else:
        raise TypeError("factors must be a number, a Series keyed by `on`, or a tidy DataFrame.")

    applied = factor if how == "multiply" else 1.0 / factor
    result[out_col or value_col] = result[value_col].to_numpy() * applied
    if audit_col is not None:
        prior = result[audit_col].to_numpy() if audit_col in result.columns else np.ones(len(result))
        result[audit_col] = prior * applied
    return result

factor_lookup

factor_lookup(
    df: DataFrame,
    factors: DataFrame,
    keys: str | Iterable[str],
    *,
    factor_col: str,
    default: float | None = None
) -> np.ndarray

Join a factor onto df by value on one or more existing key columns.

The single factor-join primitive behind grouped completion, seasonality, and :func:adjust. factors is a tidy table containing keys and factor_col; each row of df is matched on its keys values. The factor table must be unique on keys -- a duplicate would fan rows out on the join -- so this raises otherwise. Returns a float array aligned to df's row order (the frame's own index never participates). An absent key gives default (NaN when default is None -- a surfaced gap, never silently filled).

Source code in actuarialpy/columns.py
def factor_lookup(
    df: pd.DataFrame,
    factors: pd.DataFrame,
    keys: str | Iterable[str],
    *,
    factor_col: str,
    default: float | None = None,
) -> np.ndarray:
    """Join a factor onto ``df`` by value on one or more existing key columns.

    The single factor-join primitive behind grouped completion, seasonality, and
    :func:`adjust`. ``factors`` is a tidy table containing ``keys`` and ``factor_col``;
    each row of ``df`` is matched on its ``keys`` values. The factor table must be unique
    on ``keys`` -- a duplicate would fan rows out on the join -- so this raises otherwise.
    Returns a float array aligned to ``df``'s row order (the frame's own index never
    participates). An absent key gives ``default`` (``NaN`` when ``default`` is ``None``
    -- a surfaced gap, never silently filled).
    """
    key_cols = as_list(keys)
    if not key_cols:
        raise ValueError("keys must name at least one column")
    validate_columns(factors, key_cols + [factor_col])
    validate_columns(df, key_cols)
    ensure_unique_keys(factors, key_cols, name="factor table")
    if len(key_cols) == 1:
        lookup = factors.set_index(key_cols[0])[factor_col]
        factor = np.array(df[key_cols[0]].map(lookup), dtype="float64")
    else:
        lookup = factors.set_index(key_cols)[factor_col]
        row_keys = pd.MultiIndex.from_frame(df[key_cols])
        factor = np.array(lookup.reindex(row_keys), dtype="float64")
    if default is not None:
        factor = np.where(np.isnan(factor), float(default), factor)
    return factor

add_margin

add_margin(
    df: DataFrame,
    *,
    premium_col: str,
    expense_cols: str | Iterable[str],
    out_col: str = "margin",
    ratio_col: str | None = None,
    exposure_col: str | None = None,
    per_exposure_col: str | None = None,
    copy: bool = True
) -> pd.DataFrame

Add an underwriting-margin column (premium minus summed expense columns).

expense_cols is summed row-wise and may mix losses and loadings (e.g. medical/claims, retention, commission, allocated overhead). Optionally also add the margin ratio (ratio_col) and a per-exposure margin (per_exposure_col, requires exposure_col) such as margin PMPM.

Source code in actuarialpy/margins.py
def add_margin(
    df: pd.DataFrame,
    *,
    premium_col: str,
    expense_cols: str | Iterable[str],
    out_col: str = "margin",
    ratio_col: str | None = None,
    exposure_col: str | None = None,
    per_exposure_col: str | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Add an underwriting-margin column (premium minus summed expense columns).

    ``expense_cols`` is summed row-wise and may mix losses and loadings (e.g.
    medical/claims, retention, commission, allocated overhead). Optionally also
    add the margin ratio (``ratio_col``) and a per-exposure margin
    (``per_exposure_col``, requires ``exposure_col``) such as margin PMPM.
    """
    validate_columns(df, [premium_col])
    result = df.copy() if copy else df
    total_expense = sum_columns(result, expense_cols)
    result[out_col] = result[premium_col] - total_expense
    if ratio_col is not None:
        result[ratio_col] = safe_divide(result[out_col], result[premium_col])
    if per_exposure_col is not None:
        if exposure_col is None:
            raise ValueError("exposure_col is required when per_exposure_col is set.")
        validate_columns(result, [exposure_col])
        result[per_exposure_col] = per_exposure(result[out_col], result[exposure_col])
    return result

margin

margin(premium: Any, expenses: Any) -> Any

Margin = premium - expenses, element-wise.

expenses should already be the total of losses plus any loadings.

Source code in actuarialpy/margins.py
def margin(premium: Any, expenses: Any) -> Any:
    """Margin = premium - expenses, element-wise.

    ``expenses`` should already be the total of losses plus any loadings.
    """
    return premium - expenses

margin_ratio

margin_ratio(margin_amount: Any, premium: Any) -> Any

Margin as a fraction of premium = margin / premium.

Source code in actuarialpy/margins.py
def margin_ratio(margin_amount: Any, premium: Any) -> Any:
    """Margin as a fraction of premium = margin / premium."""
    return safe_divide(margin_amount, premium)

excess_over_threshold

excess_over_threshold(
    df: DataFrame,
    loss_col: str,
    threshold: float,
    *,
    keep_cols: str | Iterable[str] | None = None,
    excess_col: str = "excess"
) -> pd.DataFrame

Return losses strictly above threshold with their excess amount.

excess = loss - threshold for rows where loss > threshold. This is the excess-over-threshold sample used to fit a tail (e.g. a generalized Pareto distribution in extremeloss) or a severity distribution in lossmodels; the threshold is the EVT exceedance threshold / pooling point. keep_cols carries identifier or covariate columns through.

Source code in actuarialpy/pooling.py
def excess_over_threshold(
    df: pd.DataFrame,
    loss_col: str,
    threshold: float,
    *,
    keep_cols: str | Iterable[str] | None = None,
    excess_col: str = "excess",
) -> pd.DataFrame:
    """Return losses strictly above ``threshold`` with their excess amount.

    ``excess = loss - threshold`` for rows where ``loss > threshold``. This is
    the excess-over-threshold sample used to fit a tail (e.g. a generalized
    Pareto distribution in ``extremeloss``) or a severity distribution in
    ``lossmodels``; the threshold is the EVT exceedance threshold / pooling
    point. ``keep_cols`` carries identifier or covariate columns through.
    """
    keep = as_list(keep_cols)
    validate_columns(df, [loss_col] + keep)
    above = df[df[loss_col] > threshold].copy()
    result = above[keep + [loss_col]].copy()
    result[excess_col] = above[loss_col] - threshold
    return result.reset_index(drop=True)

pool_losses

pool_losses(
    df: DataFrame,
    loss_col: str,
    pooling_point: float,
    *,
    pooled_col: str = "pooled_loss",
    excess_col: str = "excess_loss",
    copy: bool = True
) -> pd.DataFrame

Split each loss into a pooled (capped) portion and an excess portion.

pooled = min(loss, pooling_point) is the retained amount used in the group's experience; excess = max(loss - pooling_point, 0) is the portion pooled across the block. Summing pooled_col by group gives capped experience; summing excess_col gives the pooled excess. The input is typically one row per claimant (e.g. the output of summarize_claimants).

Source code in actuarialpy/pooling.py
def pool_losses(
    df: pd.DataFrame,
    loss_col: str,
    pooling_point: float,
    *,
    pooled_col: str = "pooled_loss",
    excess_col: str = "excess_loss",
    copy: bool = True,
) -> pd.DataFrame:
    """Split each loss into a pooled (capped) portion and an excess portion.

    ``pooled = min(loss, pooling_point)`` is the retained amount used in the
    group's experience; ``excess = max(loss - pooling_point, 0)`` is the portion
    pooled across the block. Summing ``pooled_col`` by group gives capped
    experience; summing ``excess_col`` gives the pooled excess. The input is
    typically one row per claimant (e.g. the output of ``summarize_claimants``).
    """
    validate_columns(df, [loss_col])
    result = df.copy() if copy else df
    result[pooled_col] = result[loss_col].clip(upper=pooling_point)
    result[excess_col] = (result[loss_col] - pooling_point).clip(lower=0)
    return result

retained_cv

retained_cv(outcomes, retention, *, n_units=1)

Coefficient of variation of the retained aggregate of n_units iid units.

Each unit's outcome is retained (capped) at retention -- min(outcome, retention) -- and n_units such units are summed. For independent units this CV is cv(min(X, retention)) / sqrt(n_units), where X is drawn from the per-unit outcome sample outcomes (array-like). Capping discards everything above retention, so only the body of outcomes matters.

Parameters

outcomes : array-like Per-unit outcome sample (e.g. one value per member-year, claim, or risk). retention : float or array-like Cap applied to each unit. Scalar returns a float; an array returns the CV at each retention. n_units : int, default 1 Number of independent units in the aggregate.

Returns

float or numpy.ndarray Coefficient of variation of the retained aggregate.

Source code in actuarialpy/pooling.py
def retained_cv(outcomes, retention, *, n_units=1):
    """Coefficient of variation of the retained aggregate of ``n_units`` iid units.

    Each unit's outcome is retained (capped) at ``retention`` -- ``min(outcome,
    retention)`` -- and ``n_units`` such units are summed. For independent units
    this CV is ``cv(min(X, retention)) / sqrt(n_units)``, where ``X`` is drawn from
    the per-unit outcome sample ``outcomes`` (array-like). Capping discards
    everything above ``retention``, so only the body of ``outcomes`` matters.

    Parameters
    ----------
    outcomes : array-like
        Per-unit outcome sample (e.g. one value per member-year, claim, or risk).
    retention : float or array-like
        Cap applied to each unit. Scalar returns a float; an array returns the CV
        at each retention.
    n_units : int, default 1
        Number of independent units in the aggregate.

    Returns
    -------
    float or numpy.ndarray
        Coefficient of variation of the retained aggregate.
    """
    x, csum, csum2 = _retention_moments(outcomes)
    cv = _retained_cv_at(x, csum, csum2, retention, n_units)
    return float(cv) if np.ndim(retention) == 0 else cv

retention_for_target_cv

retention_for_target_cv(
    outcomes, n_units, target_cv, *, bounds=None, n_grid=256
)

Retention at which the retained aggregate of n_units units hits a target CV.

Inverts :func:retained_cv. The single-unit retained CV increases with the retention, so this solves retained_cv(outcomes, u, n_units=n_units) == target_cv for the retention u by interpolation over a grid spanning bounds (default min..max of outcomes). Targets below or above the achievable range clamp to the lower or upper bound. Holding target_cv fixed, a larger n_units yields a higher retention (more independent units stabilize the aggregate, so less needs to be capped) -- i.e. the basis for a size-graded retention rule.

Parameters

outcomes : array-like Per-unit outcome sample. n_units : int Number of independent units in the aggregate. target_cv : float Desired coefficient of variation of the retained aggregate. bounds : tuple(float, float), optional (lo, hi) retention search bounds. Defaults to the min and max of outcomes. n_grid : int, default 256 Number of grid points spanning bounds.

Returns

float The retention level, clamped to bounds.

Source code in actuarialpy/pooling.py
def retention_for_target_cv(outcomes, n_units, target_cv, *, bounds=None, n_grid=256):
    """Retention at which the retained aggregate of ``n_units`` units hits a target CV.

    Inverts :func:`retained_cv`. The single-unit retained CV increases with the
    retention, so this solves ``retained_cv(outcomes, u, n_units=n_units) ==
    target_cv`` for the retention ``u`` by interpolation over a grid spanning
    ``bounds`` (default ``min..max`` of ``outcomes``). Targets below or above the
    achievable range clamp to the lower or upper bound. Holding ``target_cv`` fixed,
    a larger ``n_units`` yields a higher retention (more independent units stabilize
    the aggregate, so less needs to be capped) -- i.e. the basis for a size-graded
    retention rule.

    Parameters
    ----------
    outcomes : array-like
        Per-unit outcome sample.
    n_units : int
        Number of independent units in the aggregate.
    target_cv : float
        Desired coefficient of variation of the retained aggregate.
    bounds : tuple(float, float), optional
        ``(lo, hi)`` retention search bounds. Defaults to the min and max of
        ``outcomes``.
    n_grid : int, default 256
        Number of grid points spanning ``bounds``.

    Returns
    -------
    float
        The retention level, clamped to ``bounds``.
    """
    x, csum, csum2 = _retention_moments(outcomes)
    lo = float(bounds[0]) if bounds is not None else float(x[0])
    hi = float(bounds[1]) if bounds is not None else float(x[-1])
    grid = np.linspace(lo, hi, int(n_grid))
    cvx = np.maximum.accumulate(_retained_cv_at(x, csum, csum2, grid, n_units=1))
    target_cvx = float(target_cv) * np.sqrt(n_units)
    u = float(np.interp(target_cvx, cvx, grid))
    return float(np.clip(u, lo, hi))

status_summary

status_summary(
    df: DataFrame,
    *,
    status_col: str,
    entity_col: str | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    profile: str | None = None
) -> pd.DataFrame

Summarize experience by status, optionally adding entity counts.

Source code in actuarialpy/experience.py
def status_summary(
    df: pd.DataFrame,
    *,
    status_col: str,
    entity_col: str | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    profile: str | None = None,
) -> pd.DataFrame:
    """Summarize experience by status, optionally adding entity counts."""
    validate_columns(df, [status_col] + ([entity_col] if entity_col else []))
    summary = summarize_experience(
        df,
        groupby=status_col,
        expense_cols=expense_cols,
        revenue_cols=revenue_cols,
        exposure_cols=exposure_cols,
        profile=profile,
    )
    if entity_col:
        counts = df.groupby(status_col, dropna=False)[entity_col].nunique().reset_index(name="entity_count")
        summary = counts.merge(summary, on=status_col, how="right")
    return summary

summarize_experience

summarize_experience(
    df: DataFrame,
    *,
    groupby: str | Iterable[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    ratio_col: str | None = None,
    ratio_name: str | None = None,
    total_expense_name: str = "total_expense",
    total_revenue_name: str = "total_revenue",
    profile: str | None = None,
    labels: dict[str, str] | None = None
) -> pd.DataFrame

Summarize experience by grouping columns.

Amounts and exposures are aggregated first. Ratios and per-exposure metrics are calculated after aggregation, which avoids averaging row-level ratios.

By default the ratio column is named loss_ratio (general across lines of business); the health profile names it mlr and life benefit_ratio. profile only supplies light defaults and does not rename total expense or total revenue.

Source code in actuarialpy/experience.py
def summarize_experience(
    df: pd.DataFrame,
    *,
    groupby: str | Iterable[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    ratio_col: str | None = None,
    ratio_name: str | None = None,
    total_expense_name: str = "total_expense",
    total_revenue_name: str = "total_revenue",
    profile: str | None = None,
    labels: dict[str, str] | None = None,
) -> pd.DataFrame:
    """Summarize experience by grouping columns.

    Amounts and exposures are aggregated first. Ratios and per-exposure metrics
    are calculated after aggregation, which avoids averaging row-level ratios.

    By default the ratio column is named ``loss_ratio`` (general across lines of
    business); the ``health`` profile names it ``mlr`` and ``life``
    ``benefit_ratio``. ``profile`` only supplies light defaults and does not
    rename total expense or total revenue.
    """
    groups = as_list(groupby)
    expenses = as_list(expense_cols)
    revenues = as_list(revenue_cols)
    exposures = as_list(exposure_cols)
    _validate_exposures(exposures)
    validate_columns(df, groups + expenses + revenues + exposures)

    if ratio_col is not None and ratio_name is not None:
        raise ValueError("Specify only one of ratio_col or ratio_name.")
    if ratio_name is not None:
        ratio_col = ratio_name
    if ratio_col is None:
        ratio_col = get_profile_defaults(profile).get("ratio_col", "loss_ratio")

    amount_cols = list(dict.fromkeys(expenses + revenues + exposures))
    if groups:
        summary = df[groups + amount_cols].groupby(groups, dropna=False, as_index=False).sum(numeric_only=True)
    else:
        summary = pd.DataFrame({col: [df[col].sum()] for col in amount_cols})

    summary[total_expense_name] = sum_columns(summary, expenses)
    summary[total_revenue_name] = sum_columns(summary, revenues)
    summary[ratio_col] = loss_ratio(summary[total_expense_name], summary[total_revenue_name])

    expense_per_names: list[str] = []
    revenue_per_names: list[str] = []
    for exposure in exposures:
        expense_per, revenue_per = _per_exposure_column_names(total_expense_name, total_revenue_name, exposure)
        summary[expense_per] = per_exposure(summary[total_expense_name], summary[exposure])
        summary[revenue_per] = per_exposure(summary[total_revenue_name], summary[exposure])
        expense_per_names.append(expense_per)
        revenue_per_names.append(revenue_per)

    summary = _order_summary_columns(
        summary,
        groups=groups,
        expenses=expenses,
        revenues=revenues,
        exposures=exposures,
        total_expense_name=total_expense_name,
        total_revenue_name=total_revenue_name,
        ratio_col=ratio_col,
        expense_per_names=expense_per_names,
        revenue_per_names=revenue_per_names,
    )
    return apply_profile_labels(summary, profile=profile, labels=labels)

summarize_views

summarize_views(
    df: DataFrame,
    *,
    views: dict[str, str | Iterable[str] | None],
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    ratio_col: str | None = None,
    ratio_name: str | None = None,
    total_expense_name: str = "total_expense",
    total_revenue_name: str = "total_revenue",
    profile: str | None = None
) -> dict[str, pd.DataFrame]

Create multiple experience summary views from the same input data.

Source code in actuarialpy/experience.py
def summarize_views(
    df: pd.DataFrame,
    *,
    views: dict[str, str | Iterable[str] | None],
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    ratio_col: str | None = None,
    ratio_name: str | None = None,
    total_expense_name: str = "total_expense",
    total_revenue_name: str = "total_revenue",
    profile: str | None = None,
) -> dict[str, pd.DataFrame]:
    """Create multiple experience summary views from the same input data."""
    return {
        name: summarize_experience(
            df,
            groupby=groupby,
            expense_cols=expense_cols,
            revenue_cols=revenue_cols,
            exposure_cols=exposure_cols,
            ratio_col=ratio_col,
            ratio_name=ratio_name,
            total_expense_name=total_expense_name,
            total_revenue_name=total_revenue_name,
            profile=profile,
        )
        for name, groupby in views.items()
    }

summarize_actual_vs_expected

summarize_actual_vs_expected(
    df: DataFrame,
    *,
    groupby: str | Iterable[str] | None = None,
    actual_cols: str | Iterable[str],
    expected_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    actual_name: str = "actual",
    expected_name: str = "expected",
    ae_name: str = "actual_to_expected",
    variance_name: str = "variance",
    variance_pct_name: str = "variance_pct"
) -> pd.DataFrame

Summarize actual-versus-expected results by optional grouping columns.

Actual and expected amounts are aggregated before ratios are calculated. This makes the function suitable for claim costs, benefits, expenses, revenue, or any other actual-versus-expected measure.

Source code in actuarialpy/expected.py
def summarize_actual_vs_expected(
    df: pd.DataFrame,
    *,
    groupby: str | Iterable[str] | None = None,
    actual_cols: str | Iterable[str],
    expected_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    actual_name: str = "actual",
    expected_name: str = "expected",
    ae_name: str = "actual_to_expected",
    variance_name: str = "variance",
    variance_pct_name: str = "variance_pct",
) -> pd.DataFrame:
    """Summarize actual-versus-expected results by optional grouping columns.

    Actual and expected amounts are aggregated before ratios are calculated.
    This makes the function suitable for claim costs, benefits, expenses,
    revenue, or any other actual-versus-expected measure.
    """
    groups = as_list(groupby)
    actuals = as_list(actual_cols)
    expecteds = as_list(expected_cols)
    exposures = as_list(exposure_cols)
    validate_columns(df, groups + actuals + expecteds + exposures)

    amount_cols = list(dict.fromkeys(actuals + expecteds + exposures))
    if groups:
        out = df[groups + amount_cols].groupby(groups, dropna=False, as_index=False).sum(numeric_only=True)
    else:
        out = pd.DataFrame({col: [df[col].sum()] for col in amount_cols})

    out[actual_name] = sum_columns(out, actuals)
    out[expected_name] = sum_columns(out, expecteds)
    out[ae_name] = actual_to_expected_ratio(out[actual_name], out[expected_name])
    out[variance_name] = out[actual_name] - out[expected_name]
    out[variance_pct_name] = safe_divide(out[variance_name], out[expected_name])

    for exposure in exposures:
        out[_per_exposure_name(actual_name, exposure)] = per_exposure(out[actual_name], out[exposure])
        out[_per_exposure_name(expected_name, exposure)] = per_exposure(out[expected_name], out[exposure])
        out[_per_exposure_name(variance_name, exposure)] = per_exposure(out[variance_name], out[exposure])

    return _order_ave_columns(
        out,
        groups=groups,
        actuals=actuals,
        expecteds=expecteds,
        exposures=exposures,
        actual_name=actual_name,
        expected_name=expected_name,
        ae_name=ae_name,
        variance_name=variance_name,
        variance_pct_name=variance_pct_name,
    )

summarize_claimants

summarize_claimants(
    df: DataFrame,
    *,
    claimant_col: str,
    amount_cols: str | Iterable[str],
    groupby: str | Iterable[str] | None = None,
    exposure_col: str | None = None,
    amount_name: str = "total_expense"
) -> pd.DataFrame

Aggregate experience to claimant/member/risk level.

claimant_col can be a member ID, policy ID, claim group ID, or another entity identifier. The function is descriptive; it does not cap, pool, or otherwise adjust the underlying amounts.

Source code in actuarialpy/claimants.py
def summarize_claimants(
    df: pd.DataFrame,
    *,
    claimant_col: str,
    amount_cols: str | Iterable[str],
    groupby: str | Iterable[str] | None = None,
    exposure_col: str | None = None,
    amount_name: str = "total_expense",
) -> pd.DataFrame:
    """Aggregate experience to claimant/member/risk level.

    ``claimant_col`` can be a member ID, policy ID, claim group ID, or another
    entity identifier. The function is descriptive; it does not cap, pool, or
    otherwise adjust the underlying amounts.
    """
    groups = as_list(groupby)
    amounts = as_list(amount_cols)
    required = groups + [claimant_col] + amounts + ([exposure_col] if exposure_col else [])
    validate_columns(df, required)

    agg_cols = list(dict.fromkeys(amounts + ([exposure_col] if exposure_col else [])))
    out = df[groups + [claimant_col] + agg_cols].groupby(
        groups + [claimant_col], dropna=False, as_index=False
    ).sum(numeric_only=True)
    out[amount_name] = sum_columns(out, amounts)
    if exposure_col:
        out[_per_exposure_name(amount_name, exposure_col)] = per_exposure(out[amount_name], out[exposure_col])
    return out

top_claimants

top_claimants(
    df: DataFrame,
    *,
    claimant_col: str,
    amount_cols: str | Iterable[str] | None = None,
    amount_col: str | None = None,
    groupby: str | Iterable[str] | None = None,
    n: int = 25,
    amount_name: str = "total_expense"
) -> pd.DataFrame

Return the top claimants by amount, optionally within each group.

Source code in actuarialpy/claimants.py
def top_claimants(
    df: pd.DataFrame,
    *,
    claimant_col: str,
    amount_cols: str | Iterable[str] | None = None,
    amount_col: str | None = None,
    groupby: str | Iterable[str] | None = None,
    n: int = 25,
    amount_name: str = "total_expense",
) -> pd.DataFrame:
    """Return the top claimants by amount, optionally within each group."""
    if n <= 0:
        raise ValueError("n must be positive")
    groups = as_list(groupby)

    if amount_col is None:
        if amount_cols is None:
            raise ValueError("Pass either amount_col or amount_cols.")
        base = summarize_claimants(
            df,
            claimant_col=claimant_col,
            amount_cols=amount_cols,
            groupby=groups,
            amount_name=amount_name,
        )
        amount_col = amount_name
    else:
        validate_columns(df, groups + [claimant_col, amount_col])
        base = df[groups + [claimant_col, amount_col]].copy()

    sort_cols = groups + [amount_col] if groups else [amount_col]
    ascending = [True] * len(groups) + [False]
    base = base.sort_values(sort_cols, ascending=ascending).copy()
    if groups:
        base["rank"] = base.groupby(groups, dropna=False)[amount_col].rank(method="first", ascending=False).astype(int)
        totals = base.groupby(groups, dropna=False)[amount_col].sum().reset_index(name="_group_total")
        base = base.merge(totals, on=groups, how="left")
        base = base[base["rank"] <= n].copy()
        base["share_of_total"] = safe_divide(base[amount_col], base["_group_total"])
        base["cumulative_share"] = base.groupby(groups, dropna=False)["share_of_total"].cumsum()
        return base.drop(columns=["_group_total"])

    base["rank"] = range(1, len(base) + 1)
    total = base[amount_col].sum()
    base = base[base["rank"] <= n].copy()
    base["share_of_total"] = safe_divide(base[amount_col], total)
    base["cumulative_share"] = base["share_of_total"].cumsum()
    return base

large_claimant_flags

large_claimant_flags(
    df: DataFrame,
    *,
    amount_col: str = "total_expense",
    thresholds: Sequence[float] = (50000, 100000, 250000)
) -> pd.DataFrame

Add boolean flags for claimants above one or more amount thresholds.

Source code in actuarialpy/claimants.py
def large_claimant_flags(
    df: pd.DataFrame,
    *,
    amount_col: str = "total_expense",
    thresholds: Sequence[float] = (50_000, 100_000, 250_000),
) -> pd.DataFrame:
    """Add boolean flags for claimants above one or more amount thresholds."""
    validate_columns(df, [amount_col])
    out = df.copy()
    for threshold in thresholds:
        label = str(int(threshold)) if float(threshold).is_integer() else str(threshold).replace(".", "_")
        out[f"is_over_{label}"] = out[amount_col] >= threshold
    return out

claim_concentration

claim_concentration(
    df: DataFrame,
    *,
    amount_col: str = "total_expense",
    groupby: str | Iterable[str] | None = None,
    top_n: Sequence[int] = (10, 25),
    thresholds: Sequence[float] = (50000, 100000, 250000)
) -> pd.DataFrame

Summarize how concentrated total amounts are among top claimants.

The input should generally be one row per claimant within the requested grouping level, such as the output of summarize_claimants.

Source code in actuarialpy/claimants.py
def claim_concentration(
    df: pd.DataFrame,
    *,
    amount_col: str = "total_expense",
    groupby: str | Iterable[str] | None = None,
    top_n: Sequence[int] = (10, 25),
    thresholds: Sequence[float] = (50_000, 100_000, 250_000),
) -> pd.DataFrame:
    """Summarize how concentrated total amounts are among top claimants.

    The input should generally be one row per claimant within the requested
    grouping level, such as the output of ``summarize_claimants``.
    """
    groups = as_list(groupby)
    validate_columns(df, groups + [amount_col])

    def summarize(part: pd.DataFrame) -> dict[str, float]:
        sorted_part = part.sort_values(amount_col, ascending=False)
        total = sorted_part[amount_col].sum()
        row: dict[str, float] = {
            "claimant_count": len(sorted_part),
            "total_amount": total,
        }
        for n in top_n:
            top_amount = sorted_part.head(n)[amount_col].sum()
            row[f"top_{n}_amount"] = top_amount
            row[f"top_{n}_share"] = safe_divide(top_amount, total)
        for threshold in thresholds:
            label = str(int(threshold)) if float(threshold).is_integer() else str(threshold).replace(".", "_")
            mask = sorted_part[amount_col] >= threshold
            threshold_amount = sorted_part.loc[mask, amount_col].sum()
            row[f"count_over_{label}"] = int(mask.sum())
            row[f"amount_over_{label}"] = threshold_amount
            row[f"share_over_{label}"] = safe_divide(threshold_amount, total)
        return row

    if groups:
        rows = []
        for keys, part in df.groupby(groups, dropna=False, sort=False):
            if not isinstance(keys, tuple):
                keys = (keys,)
            rows.append({**dict(zip(groups, keys)), **summarize(part)})
        return pd.DataFrame(rows)
    return pd.DataFrame([summarize(df)])

rolling_summary

rolling_summary(
    df: DataFrame,
    *,
    date_col: str,
    window: int = 12,
    groupby: str | Iterable[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    min_periods: int | None = None,
    drop_incomplete: bool = True,
    ratio_col: str = "loss_ratio"
) -> pd.DataFrame

Calculate rolling sums and ratios by period and optional grouping.

The output includes period_start and period_end. By default only complete rolling windows are returned; for a 12-month window, the first output row appears after 12 months of data are available.

Source code in actuarialpy/rolling.py
def rolling_summary(
    df: pd.DataFrame,
    *,
    date_col: str,
    window: int = 12,
    groupby: str | Iterable[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    min_periods: int | None = None,
    drop_incomplete: bool = True,
    ratio_col: str = "loss_ratio",
) -> pd.DataFrame:
    """Calculate rolling sums and ratios by period and optional grouping.

    The output includes ``period_start`` and ``period_end``. By default only
    complete rolling windows are returned; for a 12-month window, the first
    output row appears after 12 months of data are available.
    """
    if window <= 0:
        raise ValueError("window must be positive")
    groups = as_list(groupby)
    expenses = as_list(expense_cols)
    revenues = as_list(revenue_cols)
    exposures = as_list(exposure_cols)
    validate_columns(df, groups + [date_col] + expenses + revenues + exposures)
    min_periods = window if min_periods is None else min_periods

    base = summarize_experience(
        df,
        groupby=groups + [date_col],
        expense_cols=expenses,
        revenue_cols=revenues,
        exposure_cols=exposures,
        ratio_col="period_ratio",
    ).sort_values(groups + [date_col] if groups else [date_col])

    amount_cols = ["total_expense", "total_revenue"] + exposures
    pieces = []
    iterator = base.groupby(groups, dropna=False, sort=False) if groups else [((), base)]

    for _, part in iterator:
        part = part.sort_values(date_col).copy().reset_index(drop=True)
        rolled = part[amount_cols].rolling(window=window, min_periods=min_periods).sum()
        months_available = part["total_expense"].rolling(window=window, min_periods=1).count().astype(int)

        out = part[groups].copy() if groups else pd.DataFrame(index=part.index)
        dates = pd.to_datetime(part[date_col])
        starts = []
        for i in range(len(part)):
            start_i = max(0, i - window + 1)
            starts.append(dates.iloc[start_i])
        out["period_start"] = starts
        out["period_end"] = dates
        out["months_available"] = months_available.values

        for col in amount_cols:
            out[col] = rolled[col].values
        out[ratio_col] = loss_ratio(out["total_expense"], out["total_revenue"])
        for exposure in exposures:
            expense_per, revenue_per = _per_exposure_column_names(exposure)
            out[expense_per] = per_exposure(out["total_expense"], out[exposure])
            out[revenue_per] = per_exposure(out["total_revenue"], out[exposure])

        if drop_incomplete:
            out = out[out["months_available"] >= window].copy()
        pieces.append(out)

    if not pieces:
        return pd.DataFrame()
    result = pd.concat(pieces, ignore_index=True)
    if drop_incomplete:
        result = result.drop(columns=["months_available"])
    return result

annualized_trend

annualized_trend(
    current: Any, prior: Any, months_between: float
) -> Any

Annualize change between two values separated by a number of months.

Source code in actuarialpy/trend.py
def annualized_trend(current: Any, prior: Any, months_between: float) -> Any:
    """Annualize change between two values separated by a number of months."""
    if months_between <= 0:
        raise ValueError("months_between must be positive")
    return safe_divide(current, prior) ** (12 / months_between) - 1

fit_trend

fit_trend(
    df: DataFrame,
    *,
    value_col: str,
    date_col: str,
    exposure_col: str | None = None,
    freq: str = "M",
    min_periods: int = 3,
    confidence: float = 0.95
) -> TrendFit

Fit an exponential trend to a rate series by log-linear regression.

Aggregates df to the freq grain (summing value_col and, if given, exposure_col), forms the rate -- value / exposure (e.g. PMPM) when exposure_col is supplied, otherwise value itself -- and fits log(rate) = intercept + slope * t by ordinary least squares, with t in years from the first period. The fitted annual trend is exp(slope) - 1.

Unlike :func:annualized_trend (a two-point CAGR between a single current and prior value), this uses every period, so one noisy month does not swing the estimate, and it returns goodness of fit and a confidence interval -- what a developed (rather than received) trend is judged on. It does not select the trend: the window, the rate basis (allowed vs paid), any benefit leveraging, and the blend with external trends remain judgment. Run it on completed, deseasonalized history (complete -> deseasonalize -> fit_trend) so runout and seasonality do not contaminate the slope; apply the result with :func:trend_factor/:meth:TrendFit.factor or :func:adjust.

Time is measured from actual period dates, so an occasional missing period is handled correctly. Requires at least min_periods distinct periods with strictly positive rates (non-positive values, which cannot be logged, raise). Returns a :class:TrendFit.

Source code in actuarialpy/trend.py
def fit_trend(
    df: pd.DataFrame,
    *,
    value_col: str,
    date_col: str,
    exposure_col: str | None = None,
    freq: str = "M",
    min_periods: int = 3,
    confidence: float = 0.95,
) -> TrendFit:
    """Fit an exponential trend to a rate series by log-linear regression.

    Aggregates ``df`` to the ``freq`` grain (summing ``value_col`` and, if given,
    ``exposure_col``), forms the rate -- ``value / exposure`` (e.g. PMPM) when
    ``exposure_col`` is supplied, otherwise ``value`` itself -- and fits
    ``log(rate) = intercept + slope * t`` by ordinary least squares, with ``t`` in years
    from the first period. The fitted annual trend is ``exp(slope) - 1``.

    Unlike :func:`annualized_trend` (a two-point CAGR between a single current and prior
    value), this uses every period, so one noisy month does not swing the estimate, and it
    returns goodness of fit and a confidence interval -- what a *developed* (rather than
    received) trend is judged on. It does not select the trend: the window, the rate basis
    (allowed vs paid), any benefit leveraging, and the blend with external trends remain
    judgment. Run it on completed, deseasonalized history (``complete -> deseasonalize ->
    fit_trend``) so runout and seasonality do not contaminate the slope; apply the result
    with :func:`trend_factor`/:meth:`TrendFit.factor` or :func:`adjust`.

    Time is measured from actual period dates, so an occasional missing period is handled
    correctly. Requires at least ``min_periods`` distinct periods with strictly positive
    rates (non-positive values, which cannot be logged, raise). Returns a :class:`TrendFit`.
    """
    if not 0.0 < confidence < 1.0:
        raise ValueError("confidence must be between 0 and 1.")
    cols = [value_col, date_col] + ([exposure_col] if exposure_col else [])
    validate_columns(df, cols)

    period = pd.PeriodIndex(pd.to_datetime(df[date_col]), freq=freq)
    work = pd.DataFrame({"_value": pd.to_numeric(df[value_col]).to_numpy()}, index=period)
    if exposure_col:
        work["_exposure"] = pd.to_numeric(df[exposure_col]).to_numpy()
    grouped = work.groupby(level=0).sum().sort_index()

    rate = grouped["_value"] / grouped["_exposure"] if exposure_col else grouped["_value"]
    rate = rate.to_numpy(dtype="float64")
    if len(rate) < max(min_periods, 3):
        raise ValueError(f"fit_trend needs at least {max(min_periods, 3)} periods; got {len(rate)}.")
    if np.any(rate <= 0):
        raise ValueError("fit_trend requires strictly positive rates (cannot take the log of <= 0).")

    timestamps = grouped.index.to_timestamp()
    t = (timestamps - timestamps[0]).days.to_numpy(dtype="float64") / 365.25
    if np.ptp(t) == 0:
        raise ValueError("fit_trend needs at least two distinct periods.")
    y = np.log(rate)
    n = len(y)

    t_mean, y_mean = t.mean(), y.mean()
    sxx = float(np.sum((t - t_mean) ** 2))
    sxy = float(np.sum((t - t_mean) * (y - y_mean)))
    slope = sxy / sxx
    intercept = y_mean - slope * t_mean

    residuals = y - (intercept + slope * t)
    sse = float(np.sum(residuals**2))
    sst = float(np.sum((y - y_mean) ** 2))
    # a flat series has no variance to explain (sst ~ 0 up to rounding); a constant fits it
    # perfectly, so R^2 is 1.0 there rather than the unstable 0/0 of 1 - sse/sst.
    r_squared = 1.0 if sst <= 1e-12 * max(1.0, abs(y_mean)) else 1.0 - sse / sst
    resid_var = sse / (n - 2)
    slope_se = float(np.sqrt(resid_var / sxx))

    annual_trend = float(np.exp(slope) - 1.0)
    std_error = float(np.exp(slope) * slope_se)  # delta method
    t_crit = _student_t_ppf((1.0 + confidence) / 2.0, n - 2)
    ci_low = float(np.exp(slope - t_crit * slope_se) - 1.0)
    ci_high = float(np.exp(slope + t_crit * slope_se) - 1.0)

    return TrendFit(
        annual_trend=annual_trend, r_squared=r_squared, std_error=std_error,
        ci_low=ci_low, ci_high=ci_high, confidence=confidence, n_periods=n,
        slope=float(slope), intercept=float(intercept),
    )

midpoint_trend_factor

midpoint_trend_factor(
    base_midpoint, projection_midpoint, annual_trend: Any
) -> Any

Trend factor between base and projection midpoints.

Source code in actuarialpy/trend.py
def midpoint_trend_factor(base_midpoint, projection_midpoint, annual_trend: Any) -> Any:
    """Trend factor between base and projection midpoints."""
    base = pd.to_datetime(base_midpoint)
    projection = pd.to_datetime(projection_midpoint)
    months = (projection.year - base.year) * 12 + (projection.month - base.month)
    return trend_factor(annual_trend, months)

period_change

period_change(current: Any, prior: Any) -> Any

Calculate period-over-period change: current / prior - 1.

Source code in actuarialpy/trend.py
def period_change(current: Any, prior: Any) -> Any:
    """Calculate period-over-period change: current / prior - 1."""
    return safe_divide(current, prior) - 1

project_forward

project_forward(
    value: Any, annual_trend: Any, months: float
) -> Any

Project a value forward using an annual trend rate.

Source code in actuarialpy/trend.py
def project_forward(value: Any, annual_trend: Any, months: float) -> Any:
    """Project a value forward using an annual trend rate."""
    return value * trend_factor(annual_trend, months)

trend_factor

trend_factor(annual_trend: Any, months: float) -> Any

Convert an annual trend rate into a trend factor over a number of months.

Source code in actuarialpy/trend.py
def trend_factor(annual_trend: Any, months: float) -> Any:
    """Convert an annual trend rate into a trend factor over a number of months."""
    return (1 + annual_trend) ** (months / 12)

trend_summary

trend_summary(
    df: DataFrame,
    *,
    period_col: str | None = None,
    prior_period=None,
    current_period=None,
    date_col: str | None = None,
    prior_start=None,
    prior_end=None,
    current_start=None,
    current_end=None,
    groupby=None,
    amount_col: str,
    exposure_col: str | None = None,
    prior_filter=None,
    current_filter=None,
    prior_label: str = "prior",
    current_label: str = "current"
) -> pd.DataFrame

Summarize current vs prior trend by optional grouping.

Supported comparison modes: - period_col='year', prior_period=2025, current_period=2026 - date_col='incurred_date' with prior/current start and end dates - explicit boolean prior_filter and current_filter masks

Source code in actuarialpy/trend.py
def trend_summary(
    df: pd.DataFrame,
    *,
    period_col: str | None = None,
    prior_period=None,
    current_period=None,
    date_col: str | None = None,
    prior_start=None,
    prior_end=None,
    current_start=None,
    current_end=None,
    groupby=None,
    amount_col: str,
    exposure_col: str | None = None,
    prior_filter=None,
    current_filter=None,
    prior_label: str = "prior",
    current_label: str = "current",
) -> pd.DataFrame:
    """Summarize current vs prior trend by optional grouping.

    Supported comparison modes:
    - ``period_col='year', prior_period=2025, current_period=2026``
    - ``date_col='incurred_date'`` with prior/current start and end dates
    - explicit boolean ``prior_filter`` and ``current_filter`` masks
    """
    groups = as_list(groupby)
    required = groups + [amount_col] + ([exposure_col] if exposure_col else [])
    if period_col is not None:
        required.append(period_col)
    if date_col is not None:
        required.append(date_col)
    validate_columns(df, required)

    prior_filter, current_filter, mode = _comparison_masks(
        df,
        period_col=period_col,
        prior_period=prior_period,
        current_period=current_period,
        date_col=date_col,
        prior_start=prior_start,
        prior_end=prior_end,
        current_start=current_start,
        current_end=current_end,
        prior_filter=prior_filter,
        current_filter=current_filter,
    )

    def summarize(mask, label):
        # Aggregate only grouping, amount, and exposure columns. The comparison
        # column (for example, ``year``) is used only to select records and must
        # not leak into the final output as a summed numeric column such as
        # ``year_x`` / ``year_y``.
        summary_cols = groups + [amount_col] + ([exposure_col] if exposure_col else [])
        temp = df.loc[mask, summary_cols].copy()
        if groups:
            out = temp.groupby(groups, dropna=False, as_index=False).sum(numeric_only=True)
        else:
            out = pd.DataFrame({amount_col: [temp[amount_col].sum()]})
            if exposure_col:
                out[exposure_col] = temp[exposure_col].sum()
        out = out.rename(columns={amount_col: f"{label}_{amount_col}"})
        if exposure_col:
            out = out.rename(columns={exposure_col: f"{label}_{exposure_col}"})
            out[f"{label}_{amount_col}_per_{exposure_col}"] = safe_divide(
                out[f"{label}_{amount_col}"], out[f"{label}_{exposure_col}"]
            )
        return out

    prior = summarize(prior_filter, prior_label)
    current = summarize(current_filter, current_label)
    out = prior.merge(current, on=groups, how="outer") if groups else pd.concat([prior, current], axis=1)
    prior_metric = f"{prior_label}_{amount_col}_per_{exposure_col}" if exposure_col else f"{prior_label}_{amount_col}"
    current_metric = f"{current_label}_{amount_col}_per_{exposure_col}" if exposure_col else f"{current_label}_{amount_col}"
    out["trend"] = period_change(out[current_metric], out[prior_metric])
    if mode == "period":
        out.insert(len(groups), "prior_period", prior_period)
        out.insert(len(groups) + 1, "current_period", current_period)
    elif mode == "date":
        out.insert(len(groups), "prior_start", pd.to_datetime(prior_start))
        out.insert(len(groups) + 1, "prior_end", pd.to_datetime(prior_end))
        out.insert(len(groups) + 2, "current_start", pd.to_datetime(current_start))
        out.insert(len(groups) + 3, "current_end", pd.to_datetime(current_end))
    return out

component_driver_analysis

component_driver_analysis(
    df: DataFrame,
    *,
    period_col: str | None = None,
    prior_period=None,
    current_period=None,
    date_col: str | None = None,
    prior_start=None,
    prior_end=None,
    current_start=None,
    current_end=None,
    prior_filter=None,
    current_filter=None,
    component_cols: str | Iterable[str],
    exposure_col: str | None = None,
    groupby: str | Iterable[str] | None = None
) -> pd.DataFrame

Explain component drivers of change between two periods.

The primary comparison is based on component totals, or component amount per exposure when exposure_col is supplied. The API matches trend_summary and supports period-column, date-range, or explicit-filter comparisons.

Source code in actuarialpy/components.py
def component_driver_analysis(
    df: pd.DataFrame,
    *,
    period_col: str | None = None,
    prior_period=None,
    current_period=None,
    date_col: str | None = None,
    prior_start=None,
    prior_end=None,
    current_start=None,
    current_end=None,
    prior_filter=None,
    current_filter=None,
    component_cols: str | Iterable[str],
    exposure_col: str | None = None,
    groupby: str | Iterable[str] | None = None,
) -> pd.DataFrame:
    """Explain component drivers of change between two periods.

    The primary comparison is based on component totals, or component amount per
    exposure when ``exposure_col`` is supplied. The API matches ``trend_summary``
    and supports period-column, date-range, or explicit-filter comparisons.
    """
    groups = as_list(groupby)
    components = as_list(component_cols)
    required = groups + components + ([exposure_col] if exposure_col else [])
    if period_col is not None:
        required.append(period_col)
    if date_col is not None:
        required.append(date_col)
    validate_columns(df, required)

    prior_filter, current_filter, mode = _comparison_masks(
        df,
        period_col=period_col,
        prior_period=prior_period,
        current_period=current_period,
        date_col=date_col,
        prior_start=prior_start,
        prior_end=prior_end,
        current_start=current_start,
        current_end=current_end,
        prior_filter=prior_filter,
        current_filter=current_filter,
    )

    prior_df = df.loc[prior_filter]
    current_df = df.loc[current_filter]

    prior_sum = summarize_components(
        prior_df,
        groupby=groups,
        component_cols=components,
        exposure_col=exposure_col,
        include_shares=False,
    )
    current_sum = summarize_components(
        current_df,
        groupby=groups,
        component_cols=components,
        exposure_col=exposure_col,
        include_shares=False,
    )

    if groups:
        merged = prior_sum.merge(current_sum, on=groups, how="outer", suffixes=("_prior", "_current"))
    else:
        merged = pd.concat([prior_sum.add_suffix("_prior"), current_sum.add_suffix("_current")], axis=1)

    rows = []
    for _, row in merged.iterrows():
        key_data = {g: row[g] for g in groups} if groups else {}
        changes = {}
        total_change = 0
        for comp in components:
            metric = _per_exposure_name(comp, exposure_col) if exposure_col else comp
            prior_val = row.get(f"{metric}_prior", 0)
            current_val = row.get(f"{metric}_current", 0)
            prior_val = 0 if pd.isna(prior_val) else prior_val
            current_val = 0 if pd.isna(current_val) else current_val
            changes[comp] = current_val - prior_val
            total_change += changes[comp]

        for comp in components:
            metric = _per_exposure_name(comp, exposure_col) if exposure_col else comp
            prior_val = row.get(f"{metric}_prior", 0)
            current_val = row.get(f"{metric}_current", 0)
            prior_val = 0 if pd.isna(prior_val) else prior_val
            current_val = 0 if pd.isna(current_val) else current_val
            period_data = {}
            if mode == "period":
                period_data = {"prior_period": prior_period, "current_period": current_period}
            elif mode == "date":
                period_data = {
                    "prior_start": pd.to_datetime(prior_start),
                    "prior_end": pd.to_datetime(prior_end),
                    "current_start": pd.to_datetime(current_start),
                    "current_end": pd.to_datetime(current_end),
                }
            rows.append(
                {
                    **key_data,
                    **period_data,
                    "component": comp,
                    "prior": prior_val,
                    "current": current_val,
                    "change": current_val - prior_val,
                    "trend": safe_divide(current_val, prior_val) - 1,
                    "contribution_to_change": safe_divide(changes[comp], total_change),
                }
            )
    return pd.DataFrame(rows)

component_trend

component_trend(*args, **kwargs) -> pd.DataFrame

Alias for component_driver_analysis.

The preferred name is component_driver_analysis because the function explains drivers of total component change, not just component-specific trend.

Source code in actuarialpy/components.py
def component_trend(*args, **kwargs) -> pd.DataFrame:
    """Alias for ``component_driver_analysis``.

    The preferred name is ``component_driver_analysis`` because the function
    explains drivers of total component change, not just component-specific trend.
    """
    return component_driver_analysis(*args, **kwargs)

summarize_components

summarize_components(
    df: DataFrame,
    *,
    groupby: str | Iterable[str] | None = None,
    component_cols: str | Iterable[str],
    exposure_col: str | None = None,
    total_col: str = "total_expense",
    include_shares: bool = True
) -> pd.DataFrame

Summarize component/category amounts, per-exposure values, and shares.

Source code in actuarialpy/components.py
def summarize_components(
    df: pd.DataFrame,
    *,
    groupby: str | Iterable[str] | None = None,
    component_cols: str | Iterable[str],
    exposure_col: str | None = None,
    total_col: str = "total_expense",
    include_shares: bool = True,
) -> pd.DataFrame:
    """Summarize component/category amounts, per-exposure values, and shares."""
    groups = as_list(groupby)
    components = as_list(component_cols)
    required = groups + components + ([exposure_col] if exposure_col else [])
    validate_columns(df, required)

    amount_cols = components + ([exposure_col] if exposure_col else [])
    if groups:
        summary = df[groups + amount_cols].groupby(groups, dropna=False, as_index=False).sum(numeric_only=True)
    else:
        summary = pd.DataFrame({col: [df[col].sum()] for col in amount_cols})

    summary[total_col] = summary[components].sum(axis=1)
    if exposure_col:
        for component in components:
            summary[_per_exposure_name(component, exposure_col)] = per_exposure(summary[component], summary[exposure_col])
        summary[_per_exposure_name(total_col, exposure_col)] = per_exposure(summary[total_col], summary[exposure_col])
    if include_shares:
        for component in components:
            summary[f"{component}_share"] = safe_divide(summary[component], summary[total_col])
    return summary

cohort_summary

cohort_summary(
    df: DataFrame,
    *,
    entity_col: str,
    date_col: str,
    start_date_col: str,
    duration_months: int = 12,
    groupby: str | Iterable[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    profile: str | None = None
) -> pd.DataFrame

Summarize each entity's first N months or cohort-duration window.

Each entity is clipped to its own first duration_months months of duration (month 1 is the entity's start month), aligning entities by tenure rather than calendar time. The output also reports how much of that window is actually present, so partial (not-yet-mature) cohorts can be spotted and excluded:

  • months_observed: count of distinct duration months present (1..N).
  • last_month: latest experience month observed; with first_month this gives the available range.
  • complete: whether the full window is present, i.e. months_observed == duration_months.

For example, to keep only cohorts with a full first year::

cohorts = exp.cohort(entity_col="group", start_date_col="effective_date")
mature = cohorts[cohorts["complete"]]
Source code in actuarialpy/cohorts.py
def cohort_summary(
    df: pd.DataFrame,
    *,
    entity_col: str,
    date_col: str,
    start_date_col: str,
    duration_months: int = 12,
    groupby: str | Iterable[str] | None = None,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    profile: str | None = None,
) -> pd.DataFrame:
    """Summarize each entity's first N months or cohort-duration window.

    Each entity is clipped to its own first ``duration_months`` months of duration
    (month 1 is the entity's start month), aligning entities by tenure rather than
    calendar time. The output also reports how much of that window is actually
    present, so partial (not-yet-mature) cohorts can be spotted and excluded:

    - ``months_observed``: count of distinct duration months present (1..N).
    - ``last_month``: latest experience month observed; with ``first_month`` this
      gives the available range.
    - ``complete``: whether the full window is present, i.e.
      ``months_observed == duration_months``.

    For example, to keep only cohorts with a full first year::

        cohorts = exp.cohort(entity_col="group", start_date_col="effective_date")
        mature = cohorts[cohorts["complete"]]
    """
    groups = as_list(groupby)
    validate_columns(df, [entity_col, date_col, start_date_col] + groups)
    temp = add_duration_column(df, start_date_col, date_col, duration_col="duration_month", one_based=True)
    temp = temp[(temp["duration_month"] >= 1) & (temp["duration_month"] <= duration_months)].copy()
    temp["first_month"] = pd.to_datetime(temp[start_date_col]).dt.to_period("M")
    temp["cohort_year"] = pd.to_datetime(temp[start_date_col]).dt.year

    group_keys = [entity_col, "first_month", "cohort_year"] + groups
    summary = summarize_experience(
        temp,
        groupby=group_keys,
        expense_cols=expense_cols,
        revenue_cols=revenue_cols,
        exposure_cols=exposure_cols,
        profile=profile,
    )

    coverage = (
        temp.groupby(group_keys, dropna=False)
        .agg(months_observed=("duration_month", "nunique"), last_month=(date_col, "max"))
        .reset_index()
    )
    coverage["last_month"] = pd.to_datetime(coverage["last_month"]).dt.to_period("M")
    coverage["complete"] = coverage["months_observed"] == duration_months
    summary = summary.merge(coverage, on=group_keys, how="left")

    coverage_cols = ["months_observed", "last_month", "complete"]
    metric_cols = [c for c in summary.columns if c not in group_keys and c not in coverage_cols]
    return summary[group_keys + coverage_cols + metric_cols]

cohort_summary_by_period

cohort_summary_by_period(
    cohort_df: DataFrame,
    *,
    cohort_date_col: str = "first_month",
    freq: str = "Q",
    entity_col: str | None = None,
    expense_col: str = "total_expense",
    revenue_col: str = "total_revenue",
    exposure_cols: str | Iterable[str] | None = None
) -> pd.DataFrame

Roll entity-level cohort summaries into cohort month/quarter/year buckets.

Source code in actuarialpy/cohorts.py
def cohort_summary_by_period(
    cohort_df: pd.DataFrame,
    *,
    cohort_date_col: str = "first_month",
    freq: str = "Q",
    entity_col: str | None = None,
    expense_col: str = "total_expense",
    revenue_col: str = "total_revenue",
    exposure_cols: str | Iterable[str] | None = None,
) -> pd.DataFrame:
    """Roll entity-level cohort summaries into cohort month/quarter/year buckets."""
    temp = cohort_df.copy()
    if cohort_date_col not in temp.columns:
        raise ValueError(f"Missing required column: {cohort_date_col}")
    if isinstance(temp[cohort_date_col].iloc[0], pd.Period):
        temp["cohort_period"] = temp[cohort_date_col].dt.asfreq(freq)
    else:
        temp = add_period_column(temp, cohort_date_col, freq, "cohort_period", copy=False)
    exposures = as_list(exposure_cols)
    summary = summarize_experience(
        temp,
        groupby="cohort_period",
        expense_cols=expense_col,
        revenue_cols=revenue_col,
        exposure_cols=exposures,
    )
    if entity_col:
        counts = temp.groupby("cohort_period", dropna=False)[entity_col].nunique().reset_index(name="entity_count")
        summary = counts.merge(summary, on="cohort_period", how="right")
    return summary

duration_summary

duration_summary(
    df: DataFrame,
    *,
    entity_col: str,
    date_col: str,
    start_date_col: str,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    max_duration_month: int | None = None
) -> pd.DataFrame

Summarize experience by duration month since entity start.

Source code in actuarialpy/cohorts.py
def duration_summary(
    df: pd.DataFrame,
    *,
    entity_col: str,
    date_col: str,
    start_date_col: str,
    expense_cols: str | Iterable[str],
    revenue_cols: str | Iterable[str],
    exposure_cols: str | Iterable[str] | None = None,
    max_duration_month: int | None = None,
) -> pd.DataFrame:
    """Summarize experience by duration month since entity start."""
    temp = add_duration_column(df, start_date_col, date_col, duration_col="duration_month", one_based=True)
    temp = temp[temp["duration_month"] >= 1].copy()
    if max_duration_month is not None:
        temp = temp[temp["duration_month"] <= max_duration_month]
    return summarize_experience(
        temp,
        groupby="duration_month",
        expense_cols=expense_cols,
        revenue_cols=revenue_cols,
        exposure_cols=exposure_cols,
    )

decompose_pmpm_trend

decompose_pmpm_trend(
    prior: DataFrame,
    current: DataFrame,
    *,
    count_col: str,
    loss_col: str,
    exposure_col: str,
    on: str | Iterable[str] | None = None,
    mix_by: str | Iterable[str] | None = None,
    annualization: float = 12
) -> pd.DataFrame

Decompose the PMPM change from prior to current.

With mix_by omitted this is the two-way split: both frames are summarized with :func:frequency_severity_summary (optionally by the on keys), aligned, and the change reported two exact ways:

  • Multiplicative trend: pmpm_trend == util_trend * cost_trend, where util_trend is the frequency ratio and cost_trend the severity ratio.
  • Additive dollars: pmpm_change == util_effect + cost_effect via a symmetric (midpoint) split, so the contributions sum exactly to the PMPM change.

Pass mix_by (a column or list of columns) to add a third mix component. PMPM is then decomposed into utilization, unit cost, and the effect of the membership composition shifting across the mix_by cells. Utilization and unit cost are measured within each cell (free of composition), and mix captures the aggregate movement that comes purely from the cell weights changing -- the piece the two-way otherwise misattributes to utilization and unit cost. The split uses the LMDI (logarithmic mean Divisia index) convention, which is order-free and reconciles exactly: pmpm_trend == util_trend * cost_trend * mix_trend and pmpm_change == util_effect + cost_effect + mix_effect.

A list of columns in mix_by defines the cells as their cross -- one blended mix term, not a per-column attribution; to attribute mix to each dimension separately, run the decomposition once per dimension. on and mix_by are orthogonal: on groups the output rows, mix_by defines the mix cells within each group. Every cell must have positive count, loss, and exposure in both periods.

Source code in actuarialpy/decomposition.py
def decompose_pmpm_trend(
    prior: pd.DataFrame,
    current: pd.DataFrame,
    *,
    count_col: str,
    loss_col: str,
    exposure_col: str,
    on: str | Iterable[str] | None = None,
    mix_by: str | Iterable[str] | None = None,
    annualization: float = 12,
) -> pd.DataFrame:
    """Decompose the PMPM change from ``prior`` to ``current``.

    With ``mix_by`` omitted this is the two-way split: both frames are summarized with
    :func:`frequency_severity_summary` (optionally by the ``on`` keys), aligned, and the
    change reported two exact ways:

    - **Multiplicative trend**: ``pmpm_trend == util_trend * cost_trend``, where
      ``util_trend`` is the frequency ratio and ``cost_trend`` the severity ratio.
    - **Additive dollars**: ``pmpm_change == util_effect + cost_effect`` via a symmetric
      (midpoint) split, so the contributions sum exactly to the PMPM change.

    Pass ``mix_by`` (a column or list of columns) to add a third **mix** component. PMPM
    is then decomposed into utilization, unit cost, and the effect of the membership
    composition shifting across the ``mix_by`` cells. Utilization and unit cost are
    measured *within* each cell (free of composition), and mix captures the aggregate
    movement that comes purely from the cell weights changing -- the piece the two-way
    otherwise misattributes to utilization and unit cost. The split uses the LMDI
    (logarithmic mean Divisia index) convention, which is order-free and reconciles
    exactly: ``pmpm_trend == util_trend * cost_trend * mix_trend`` and
    ``pmpm_change == util_effect + cost_effect + mix_effect``.

    A list of columns in ``mix_by`` defines the cells as their cross -- one blended mix
    term, not a per-column attribution; to attribute mix to each dimension separately,
    run the decomposition once per dimension. ``on`` and ``mix_by`` are orthogonal:
    ``on`` groups the output rows, ``mix_by`` defines the mix cells within each group.
    Every cell must have positive count, loss, and exposure in both periods.
    """
    keys = as_list(on)
    if mix_by is not None:
        return _decompose_pmpm_trend_mix(
            prior, current,
            count_col=count_col, loss_col=loss_col, exposure_col=exposure_col,
            on=keys, mix_by=mix_by,
        )

    p = frequency_severity_summary(
        prior, count_col=count_col, loss_col=loss_col, exposure_col=exposure_col,
        groupby=on, annualization=annualization,
    )
    c = frequency_severity_summary(
        current, count_col=count_col, loss_col=loss_col, exposure_col=exposure_col,
        groupby=on, annualization=annualization,
    )
    keep = ["frequency", "severity", "pmpm"]
    if keys:
        merged = p[keys + keep].merge(c[keys + keep], on=keys, how="outer", suffixes=("_prior", "_current"))
    else:
        merged = pd.concat(
            [p[keep].add_suffix("_prior").reset_index(drop=True),
             c[keep].add_suffix("_current").reset_index(drop=True)],
            axis=1,
        )

    merged["util_trend"] = safe_divide(merged["frequency_current"], merged["frequency_prior"])
    merged["cost_trend"] = safe_divide(merged["severity_current"], merged["severity_prior"])
    merged["pmpm_trend"] = safe_divide(merged["pmpm_current"], merged["pmpm_prior"])

    freq_mean = (merged["frequency_prior"] + merged["frequency_current"]) / 2
    sev_mean = (merged["severity_prior"] + merged["severity_current"]) / 2
    merged["pmpm_change"] = merged["pmpm_current"] - merged["pmpm_prior"]
    merged["util_effect"] = (merged["frequency_current"] - merged["frequency_prior"]) * sev_mean
    merged["cost_effect"] = (merged["severity_current"] - merged["severity_prior"]) * freq_mean

    ordered = keys + [
        "pmpm_prior", "pmpm_current", "pmpm_trend", "util_trend", "cost_trend",
        "pmpm_change", "util_effect", "cost_effect",
        "frequency_prior", "frequency_current", "severity_prior", "severity_current",
    ]
    return merged[[col for col in ordered if col in merged.columns]]

frequency_severity_summary

frequency_severity_summary(
    df: DataFrame,
    *,
    count_col: str,
    loss_col: str,
    exposure_col: str,
    groupby: str | Iterable[str] | None = None,
    annualization: float = 12
) -> pd.DataFrame

Per-group claim frequency, severity, and PMPM.

Counts, losses, and exposure are aggregated first, then the rates are derived after aggregation (avoiding averaging row-level rates). The identity pmpm == frequency * severity holds for every row. frequency is claims per exposure unit (per member month for monthly data), severity is loss per claim, util_per_1000 is annualized claims per 1,000 members, and pmpm is loss per exposure unit.

Source code in actuarialpy/decomposition.py
def frequency_severity_summary(
    df: pd.DataFrame,
    *,
    count_col: str,
    loss_col: str,
    exposure_col: str,
    groupby: str | Iterable[str] | None = None,
    annualization: float = 12,
) -> pd.DataFrame:
    """Per-group claim frequency, severity, and PMPM.

    Counts, losses, and exposure are aggregated first, then the rates are derived
    after aggregation (avoiding averaging row-level rates). The identity
    ``pmpm == frequency * severity`` holds for every row. ``frequency`` is claims per
    exposure unit (per member month for monthly data), ``severity`` is loss per claim,
    ``util_per_1000`` is annualized claims per 1,000 members, and ``pmpm`` is loss per
    exposure unit.
    """
    groups = as_list(groupby)
    validate_columns(df, groups + [count_col, loss_col, exposure_col])
    amount_cols = [count_col, loss_col, exposure_col]
    if groups:
        summary = df[groups + amount_cols].groupby(groups, dropna=False, as_index=False).sum(numeric_only=True)
    else:
        summary = pd.DataFrame({col: [df[col].sum()] for col in amount_cols})

    summary["frequency"] = frequency(summary[count_col], summary[exposure_col])
    summary["severity"] = severity(summary[loss_col], summary[count_col])
    summary["util_per_1000"] = utilization_per_1000(summary[count_col], summary[exposure_col], annualization=annualization)
    summary["pmpm"] = per_exposure(summary[loss_col], summary[exposure_col])

    ordered = groups + [exposure_col, count_col, loss_col, "frequency", "severity", "util_per_1000", "pmpm"]
    return summary[[col for col in ordered if col in summary.columns]]

add_business_days

add_business_days(
    df: DataFrame,
    date_col: str,
    *,
    freq: str = "M",
    out_col: str = "business_days",
    holidays: Any = "us_federal",
    weekmask: str = "Mon Tue Wed Thu Fri",
    copy: bool = True
) -> pd.DataFrame

Add a column with the number of business days in each row's period.

Divide a paid-amount column by this to get an amount-per-business-day series that is comparable across short and long months.

Source code in actuarialpy/seasonality.py
def add_business_days(
    df: pd.DataFrame,
    date_col: str,
    *,
    freq: str = "M",
    out_col: str = "business_days",
    holidays: Any = "us_federal",
    weekmask: str = "Mon Tue Wed Thu Fri",
    copy: bool = True,
) -> pd.DataFrame:
    """Add a column with the number of business days in each row's period.

    Divide a paid-amount column by this to get an amount-per-business-day series that
    is comparable across short and long months.
    """
    validate_columns(df, [date_col])
    result = df.copy() if copy else df
    bdays = business_days_in_period(result[date_col], freq=freq, holidays=holidays, weekmask=weekmask)
    period_start = pd.PeriodIndex(pd.to_datetime(result[date_col]), freq=freq).start_time
    result[out_col] = bdays.reindex(period_start).to_numpy()
    return result

apply_seasonality

apply_seasonality(
    df: DataFrame,
    factors: Series | DataFrame,
    *,
    date_col: str,
    value_col: str,
    freq: str = "M",
    by: str | list[str] | None = None,
    factor_col: str = "seasonal_factor",
    season_name: str = "season",
    out_col: str | None = None,
    copy: bool = True
) -> pd.DataFrame

Multiply value_col by each row's seasonal factor, adding the pattern back.

factors may be flat (Series indexed by season) or a tidy per-segment table joined on by plus season; see :func:deseasonalize for the grouped-table contract.

Source code in actuarialpy/seasonality.py
def apply_seasonality(
    df: pd.DataFrame,
    factors: pd.Series | pd.DataFrame,
    *,
    date_col: str,
    value_col: str,
    freq: str = "M",
    by: str | list[str] | None = None,
    factor_col: str = "seasonal_factor",
    season_name: str = "season",
    out_col: str | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Multiply ``value_col`` by each row's seasonal factor, adding the pattern back.

    ``factors`` may be flat (Series indexed by season) or a tidy per-segment table joined
    on ``by`` plus season; see :func:`deseasonalize` for the grouped-table contract.
    """
    validate_columns(df, [date_col, value_col] + as_list(by))
    result = df.copy() if copy else df
    factor = _factor_for_rows(result, factors, date_col, freq, by=by, factor_col=factor_col, season_name=season_name)
    result[out_col or f"{value_col}_seasonalized"] = result[value_col] * factor
    return result

business_days_in_period

business_days_in_period(
    periods: Any,
    *,
    freq: str = "M",
    holidays: Any = "us_federal",
    weekmask: str = "Mon Tue Wed Thu Fri"
) -> pd.Series

Count business days (weekdays minus holidays) in each distinct period.

periods is any set of dates; they are mapped to their period (month or quarter) and de-duplicated. holidays is "us_federal" (pandas' built-in US federal calendar), None (weekdays only), or a list of holiday dates. weekmask controls which weekdays count. Returns a Series indexed by period start timestamp.

Source code in actuarialpy/seasonality.py
def business_days_in_period(
    periods: Any,
    *,
    freq: str = "M",
    holidays: Any = "us_federal",
    weekmask: str = "Mon Tue Wed Thu Fri",
) -> pd.Series:
    """Count business days (weekdays minus holidays) in each distinct period.

    ``periods`` is any set of dates; they are mapped to their period (month or
    quarter) and de-duplicated. ``holidays`` is ``"us_federal"`` (pandas' built-in
    US federal calendar), ``None`` (weekdays only), or a list of holiday dates.
    ``weekmask`` controls which weekdays count. Returns a Series indexed by period
    start timestamp.
    """
    _periods_per_year(freq)
    pidx = pd.PeriodIndex(pd.to_datetime(periods), freq=freq).unique().sort_values()
    starts = pidx.start_time.normalize()
    ends = pidx.end_time.normalize()
    hol = _resolve_holidays(holidays, starts.min(), ends.max())
    counts = np.busday_count(
        starts.values.astype("datetime64[D]"),
        (ends + pd.Timedelta(days=1)).values.astype("datetime64[D]"),
        weekmask=weekmask,
        holidays=hol,
    )
    return pd.Series(counts, index=pidx.start_time, name="business_days")

deseasonalize

deseasonalize(
    df: DataFrame,
    factors: Series | DataFrame,
    *,
    date_col: str,
    value_col: str,
    freq: str = "M",
    by: str | list[str] | None = None,
    factor_col: str = "seasonal_factor",
    season_name: str = "season",
    out_col: str | None = None,
    copy: bool = True
) -> pd.DataFrame

Divide value_col by each row's seasonal factor, removing the pattern.

factors is either a flat Series indexed by season (one pattern for the frame) or a tidy per-segment DataFrame -- grouping column(s), a season column (season_name) and a factor column (factor_col), the shape :func:seasonality_factors_by returns -- joined on by plus season. The grouped join is by value (index irrelevant), the factor table must be unique on by + [season], and a row whose (group, season) is absent yields NaN.

Source code in actuarialpy/seasonality.py
def deseasonalize(
    df: pd.DataFrame,
    factors: pd.Series | pd.DataFrame,
    *,
    date_col: str,
    value_col: str,
    freq: str = "M",
    by: str | list[str] | None = None,
    factor_col: str = "seasonal_factor",
    season_name: str = "season",
    out_col: str | None = None,
    copy: bool = True,
) -> pd.DataFrame:
    """Divide ``value_col`` by each row's seasonal factor, removing the pattern.

    ``factors`` is either a flat Series indexed by season (one pattern for the frame) or
    a tidy per-segment DataFrame -- grouping column(s), a season column (``season_name``)
    and a factor column (``factor_col``), the shape :func:`seasonality_factors_by`
    returns -- joined on ``by`` plus season. The grouped join is by value (index
    irrelevant), the factor table must be unique on ``by + [season]``, and a row whose
    ``(group, season)`` is absent yields ``NaN``.
    """
    validate_columns(df, [date_col, value_col] + as_list(by))
    result = df.copy() if copy else df
    factor = _factor_for_rows(result, factors, date_col, freq, by=by, factor_col=factor_col, season_name=season_name)
    result[out_col or f"{value_col}_deseasonalized"] = result[value_col] / factor
    return result

seasonality_factors

seasonality_factors(
    df: DataFrame,
    *,
    date_col: str,
    value_col: str,
    exposure_col: str | None = None,
    freq: str = "M",
    method: str = "ratio_to_moving_average",
    aggregate: str = "mean",
    exclude: Iterable[int] | None = None,
    min_years: int = 2
) -> pd.Series

Estimate seasonal factors -- one multiplier per calendar period, mean 1.0.

The series is first aggregated to the period grain (summing value_col and, if given, exposure_col). With exposure_col the factors are computed on the rate value / exposure (e.g. PMPM), which is the right basis for health seasonality; without it they are computed on the value directly.

Methods:

  • "ratio_to_moving_average" (default): classical multiplicative decomposition. Each period is divided by a centered moving average (which removes trend and level), and the seasonal factor for a calendar period is the average of those ratios across years. Robust to trend and membership growth.
  • "period_share": each period expressed as a share of its own year's average, then averaged by calendar period. Simpler, but assumes little within-year trend.

aggregate is "mean" or "median" (median is more robust to outlier months). exclude drops whole years from the estimate -- e.g. exclude=[2020, 2021] to keep COVID-distorted years out of the factors. A warning is raised when fewer than min_years years inform any period. Factors are normalized to average exactly 1.0.

Source code in actuarialpy/seasonality.py
def seasonality_factors(
    df: pd.DataFrame,
    *,
    date_col: str,
    value_col: str,
    exposure_col: str | None = None,
    freq: str = "M",
    method: str = "ratio_to_moving_average",
    aggregate: str = "mean",
    exclude: Iterable[int] | None = None,
    min_years: int = 2,
) -> pd.Series:
    """Estimate seasonal factors -- one multiplier per calendar period, mean 1.0.

    The series is first aggregated to the period grain (summing ``value_col`` and, if
    given, ``exposure_col``). With ``exposure_col`` the factors are computed on the
    rate ``value / exposure`` (e.g. PMPM), which is the right basis for health
    seasonality; without it they are computed on the value directly.

    Methods:

    - ``"ratio_to_moving_average"`` (default): classical multiplicative decomposition.
      Each period is divided by a centered moving average (which removes trend and
      level), and the seasonal factor for a calendar period is the average of those
      ratios across years. Robust to trend and membership growth.
    - ``"period_share"``: each period expressed as a share of its own year's average,
      then averaged by calendar period. Simpler, but assumes little within-year trend.

    ``aggregate`` is ``"mean"`` or ``"median"`` (median is more robust to outlier
    months). ``exclude`` drops whole years from the estimate -- e.g.
    ``exclude=[2020, 2021]`` to keep COVID-distorted years out of the factors. A
    warning is raised when fewer than ``min_years`` years inform any period. Factors
    are normalized to average exactly 1.0.
    """
    cols = [date_col, value_col] + ([exposure_col] if exposure_col else [])
    validate_columns(df, cols)
    pp = _periods_per_year(freq)

    work = df[cols].copy()
    work["_period"] = pd.PeriodIndex(pd.to_datetime(work[date_col]), freq=freq)
    agg = {value_col: "sum"}
    if exposure_col:
        agg[exposure_col] = "sum"
    grouped = work.groupby("_period").agg(agg).sort_index()

    rate = grouped[value_col] / grouped[exposure_col] if exposure_col else grouped[value_col].astype(float)
    full = pd.period_range(grouped.index.min(), grouped.index.max(), freq=freq)
    rate = rate.reindex(full)
    series = pd.Series(rate.to_numpy(), index=full.to_timestamp())

    if method == "ratio_to_moving_average":
        ratio = series / _centered_moving_average(series, pp)
    elif method == "period_share":
        year_mean = series.groupby(series.index.year).transform("mean")
        ratio = series / year_mean
    else:
        raise ValueError("method must be 'ratio_to_moving_average' or 'period_share'.")

    rdf = pd.DataFrame(
        {"season": _season_values(ratio.index, freq), "year": ratio.index.year, "ratio": ratio.to_numpy()}
    ).dropna(subset=["ratio"])
    if exclude is not None:
        rdf = rdf[~rdf["year"].isin(set(as_list(exclude)))]

    per_season_years = rdf.groupby("season")["year"].nunique()
    if rdf.empty or per_season_years.min() < min_years:
        warnings.warn(
            f"Seasonal factors rest on fewer than {min_years} years for some periods; "
            "factors may be unstable. Supply more history or raise min_years.",
            InsufficientDataWarning,
            stacklevel=2,
        )

    if aggregate not in ("mean", "median"):
        raise ValueError("aggregate must be 'mean' or 'median'.")
    factors = rdf.groupby("season")["ratio"].agg(aggregate).reindex(range(1, pp + 1))
    factors = factors / factors.mean()
    factors.index.name = "season"
    factors.name = "seasonal_factor"
    return factors

seasonality_factors_by

seasonality_factors_by(
    df: DataFrame,
    *,
    groupby: str | list[str],
    date_col: str,
    value_col: str,
    exposure_col: str | None = None,
    freq: str = "M",
    method: str = "ratio_to_moving_average",
    aggregate: str = "mean",
    exclude: Iterable[int] | None = None,
    min_years: int = 2,
    season_name: str = "season",
    warn: bool = True
) -> pd.DataFrame

Seasonal factors per segment as a tidy table.

Fits :func:seasonality_factors within each segment of groupby and stacks the results into one row per (segment, season) -- columns are the grouping column(s), season_name, and seasonal_factor -- the shape :func:deseasonalize and :func:apply_seasonality consume via by=. Seasons absent from a segment's history are omitted for that segment (they surface as NaN on join). Set warn=False to silence the thin-history :class:InsufficientDataWarning per segment.

Source code in actuarialpy/seasonality.py
def seasonality_factors_by(
    df: pd.DataFrame,
    *,
    groupby: str | list[str],
    date_col: str,
    value_col: str,
    exposure_col: str | None = None,
    freq: str = "M",
    method: str = "ratio_to_moving_average",
    aggregate: str = "mean",
    exclude: Iterable[int] | None = None,
    min_years: int = 2,
    season_name: str = "season",
    warn: bool = True,
) -> pd.DataFrame:
    """Seasonal factors per segment as a tidy table.

    Fits :func:`seasonality_factors` within each segment of ``groupby`` and stacks the
    results into one row per ``(segment, season)`` -- columns are the grouping column(s),
    ``season_name``, and ``seasonal_factor`` -- the shape :func:`deseasonalize` and
    :func:`apply_seasonality` consume via ``by=``. Seasons absent from a segment's history
    are omitted for that segment (they surface as ``NaN`` on join). Set ``warn=False`` to
    silence the thin-history :class:`InsufficientDataWarning` per segment.
    """
    group_cols = as_list(groupby)
    if not group_cols:
        raise ValueError("groupby must name at least one column")
    cols = group_cols + [date_col, value_col] + ([exposure_col] if exposure_col else [])
    validate_columns(df, cols)
    by_key = group_cols[0] if len(group_cols) == 1 else group_cols

    records: list[dict[str, Any]] = []
    for key, part in df.groupby(by_key, sort=True):
        key_tuple = key if isinstance(key, tuple) else (key,)
        key_map = dict(zip(group_cols, key_tuple))
        with warnings.catch_warnings():
            if not warn:
                warnings.simplefilter("ignore", InsufficientDataWarning)
            factors = seasonality_factors(
                part,
                date_col=date_col,
                value_col=value_col,
                exposure_col=exposure_col,
                freq=freq,
                method=method,
                aggregate=aggregate,
                exclude=exclude,
                min_years=min_years,
            )
        for season, factor in factors.items():
            if pd.notna(factor):
                records.append({**key_map, season_name: int(season), "seasonal_factor": float(factor)})
    if not records:
        return pd.DataFrame(columns=group_cols + [season_name, "seasonal_factor"])
    return pd.DataFrame(records)