Source code for causalpy.data.simulate_data

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
Functions that generate data sets used in examples
"""

import numpy as np
import pandas as pd
from scipy.stats import dirichlet, gamma, norm, uniform
from statsmodels.nonparametric.smoothers_lowess import lowess

default_lowess_kwargs: dict[str, float | int] = {"frac": 0.2, "it": 0}
RANDOM_SEED: int = 8927
rng: np.random.Generator = np.random.default_rng(RANDOM_SEED)


def _smoothed_gaussian_random_walk(
    gaussian_random_walk_mu: float,
    gaussian_random_walk_sigma: float,
    N: int,
    lowess_kwargs: dict,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Generates Gaussian random walk data and applies LOWESS.

    :param gaussian_random_walk_mu:
        Mean of the random walk
    :param gaussian_random_walk_sigma:
        Standard deviation of the random walk
    :param N:
        Length of the random walk
    :param lowess_kwargs:
        Keyword argument dictionary passed to statsmodels lowess
    """
    x = np.arange(N)
    y = norm(gaussian_random_walk_mu, gaussian_random_walk_sigma).rvs(N).cumsum()
    filtered = lowess(y, x, **lowess_kwargs)
    y = filtered[:, 1]
    return (x, y)


[docs] def generate_synthetic_control_data( N: int = 100, treatment_time: int = 70, grw_mu: float = 0.25, grw_sigma: float = 1, lowess_kwargs: dict = default_lowess_kwargs, ) -> tuple[pd.DataFrame, np.ndarray]: """ Generates data for synthetic control example. :param N: Number of data points :param treatment_time: Index where treatment begins in the generated dataframe :param grw_mu: Mean of Gaussian Random Walk :param grw_sigma: Standard deviation of Gaussian Random Walk :lowess_kwargs: Keyword argument dictionary passed to statsmodels lowess Example -------- >>> from causalpy.data.simulate_data import generate_synthetic_control_data >>> df, weightings_true = generate_synthetic_control_data(treatment_time=70) """ # 1. Generate non-treated variables df = pd.DataFrame( { "a": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1], "b": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1], "c": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1], "d": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1], "e": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1], "f": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1], "g": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1], } ) # 2. Generate counterfactual, based on weighted sum of non-treated variables. This # is the counterfactual with NO treatment. weightings_true = dirichlet(np.ones(7)).rvs(1) df["counterfactual"] = np.dot(df.to_numpy(), weightings_true.T) # 3. Generate the causal effect causal_effect = gamma(10).pdf(np.arange(0, N, 1) - treatment_time) df["causal effect"] = causal_effect * -50 # 4. Generate the actually observed data, ie the treated with the causal effect # applied df["actual"] = df["counterfactual"] + df["causal effect"] # 5. apply observation noise to all relevant variables for var in ["actual", "a", "b", "c", "d", "e", "f", "g"]: df[var] += norm(0, 0.25).rvs(N) return df, weightings_true
[docs] def generate_time_series_data( N: int = 100, treatment_time: int = 70, beta_temp: float = -1, beta_linear: float = 0.5, beta_intercept: float = 3, ) -> pd.DataFrame: """ Generates interrupted time series example data :param N: Length of the time series :param treatment_time: Index of when treatment begins :param beta_temp: The temperature coefficient :param beta_linear: The linear coefficient :param beta_intercept: The intercept """ x = np.arange(0, N, 1) df = pd.DataFrame( { "temperature": np.sin(x * 0.5) + 1, "linear": np.linspace(0, 1, N), "causal effect": 10 * gamma(10).pdf(np.arange(0, N, 1) - treatment_time), } ) df["deaths_counterfactual"] = ( beta_intercept + beta_temp * df["temperature"] + beta_linear * df["linear"] ) # generate the actually observed data # ie the treated with the causal effect applied df["deaths_actual"] = df["deaths_counterfactual"] + df["causal effect"] # apply observation noise to all relevant variables # NOTE: no observation noise on the linear trend component for var in ["deaths_actual", "temperature"]: df[var] += norm(0, 0.1).rvs(N) # add intercept column of ones (for modeling purposes) # This is correctly a column of ones, not beta_intercept, as beta_intercept # is already incorporated in the data generation above df["intercept"] = np.ones(df.shape[0]) return df
[docs] def generate_time_series_data_seasonal( treatment_time: pd.Timestamp, ) -> pd.DataFrame: """ Generates 10 years of monthly data with seasonality """ dates = pd.date_range( start=pd.to_datetime("2010-01-01"), end=pd.to_datetime("2020-01-01"), freq="M" ) df = pd.DataFrame(data={"date": dates}) df = df.assign( year=lambda x: x["date"].dt.year, month=lambda x: x["date"].dt.month, t=df.index, ).set_index("date", drop=True) month_effect = np.array([11, 13, 12, 15, 19, 23, 21, 28, 20, 17, 15, 12]) df["y"] = 0.2 * df["t"] + 2 * month_effect[np.asarray(df.month.values) - 1] N = df.shape[0] idx = np.arange(N)[df.index > treatment_time] df["causal effect"] = 100 * gamma(10).pdf( np.array(np.arange(0, N, 1)) - int(np.min(idx)) ) df["y"] += df["causal effect"] df["y"] += norm(0, 2).rvs(N) # add intercept df["intercept"] = np.ones(df.shape[0]) return df
[docs] def generate_time_series_data_simple( treatment_time: pd.Timestamp, slope: float = 0.0 ) -> pd.DataFrame: """Generate simple interrupted time series data, with no seasonality or temporal structure. """ dates = pd.date_range( start=pd.to_datetime("2010-01-01"), end=pd.to_datetime("2020-01-01"), freq="M" ) df = pd.DataFrame(data={"date": dates}) df = df.assign( linear_trend=df.index, ).set_index("date", drop=True) df["timeseries"] = slope * df["linear_trend"] N = df.shape[0] df["causal effect"] = (df.index > treatment_time) * 2 df["timeseries"] += df["causal effect"] # add intercept df["intercept"] = np.ones(df.shape[0]) # add observation noise df["timeseries"] += norm(0, 0.25).rvs(N) return df
[docs] def generate_did() -> pd.DataFrame: """ Generate Difference in Differences data Example -------- >>> from causalpy.data.simulate_data import generate_did >>> df = generate_did() """ # true parameters control_intercept = 1 treat_intercept_delta = 0.25 trend = 1 Δ = 0.5 intervention_time = 0.5 # local functions def outcome( t: np.ndarray, control_intercept: float, treat_intercept_delta: float, trend: float, Δ: float, group: np.ndarray, post_treatment: np.ndarray, ) -> np.ndarray: """Compute the outcome of each unit""" return ( control_intercept + (treat_intercept_delta * group) + (t * trend) + (Δ * post_treatment * group) ) df = pd.DataFrame( { "group": [0, 0, 1, 1] * 10, "t": [0.0, 1.0, 0.0, 1.0] * 10, "unit": np.concatenate([[i] * 2 for i in range(20)]), } ) df["post_treatment"] = df["t"] > intervention_time df["y"] = outcome( np.asarray(df["t"]), control_intercept, treat_intercept_delta, trend, Δ, np.asarray(df["group"]), np.asarray(df["post_treatment"]), ) df["y"] += rng.normal(0, 0.1, df.shape[0]) return df
[docs] def generate_regression_discontinuity_data( N: int = 100, true_causal_impact: float = 0.5, true_treatment_threshold: float = 0.0 ) -> pd.DataFrame: """ Generate regression discontinuity example data Example -------- >>> import pathlib >>> from causalpy.data.simulate_data import generate_regression_discontinuity_data >>> df = generate_regression_discontinuity_data(true_treatment_threshold=0.5) >>> df.to_csv( ... pathlib.Path.cwd() / "regression_discontinuity.csv", index=False ... ) # doctest: +SKIP """ def is_treated(x: np.ndarray) -> np.ndarray: """Check if x was treated""" return np.greater_equal(x, true_treatment_threshold) def impact(x: np.ndarray) -> np.ndarray: """Assign true_causal_impact to all treated entries""" y = np.zeros(len(x)) y[is_treated(x)] = true_causal_impact return y x = np.sort((uniform.rvs(size=N) - 0.5) * 2) y = np.sin(x * 3) + impact(x) + norm.rvs(scale=0.1, size=N) return pd.DataFrame({"x": x, "y": y, "treated": is_treated(x)})
[docs] def generate_ancova_data( N: int = 200, pre_treatment_means: np.ndarray | None = None, treatment_effect: int = 2, sigma: int = 1, ) -> pd.DataFrame: """ Generate ANCOVA example data Example -------- >>> import pathlib >>> from causalpy.data.simulate_data import generate_ancova_data >>> df = generate_ancova_data( ... N=200, pre_treatment_means=np.array([10, 12]), treatment_effect=2, sigma=1 ... ) >>> df.to_csv(pathlib.Path.cwd() / "ancova_data.csv", index=False) # doctest: +SKIP """ if pre_treatment_means is None: pre_treatment_means = np.array([10, 12]) group = np.random.choice(2, size=N) pre = np.random.normal(loc=pre_treatment_means[group]) post = pre + treatment_effect * group + np.random.normal(size=N) * sigma df = pd.DataFrame({"group": group, "pre": pre, "post": post}) return df
[docs] def generate_geolift_data() -> pd.DataFrame: """Generate synthetic data for a geolift example. This will consists of 6 untreated countries. The treated unit `Denmark` is a weighted combination of the untreated units. We additionally specify a treatment effect which takes effect after the `treatment_time`. The timeseries data is observed at weekly resolution and has annual seasonality, with this seasonality being a drawn from a Gaussian Process with a periodic kernel.""" n_years = 4 treatment_time = pd.to_datetime("2022-01-01") causal_impact = 0.2 time = pd.date_range(start="2019-01-01", periods=52 * n_years, freq="W") untreated = [ "Austria", "Belgium", "Bulgaria", "Croatia", "Cyprus", "Czech_Republic", ] df = ( pd.DataFrame( { country: create_series(n_years=n_years, intercept=3) for country in untreated } ) .assign(time=time) .set_index("time") ) # create treated unit as a weighted sum of the untreated units weights = np.random.dirichlet(np.ones(len(untreated)), size=1)[0] df = df.assign(Denmark=np.dot(df[untreated].values, weights)) # add observation noise for col in untreated + ["Denmark"]: df[col] += np.random.normal(size=len(df), scale=0.1) # add treatment effect df["Denmark"] += np.where(df.index < treatment_time, 0, causal_impact) # ensure we never see any negative sales df = df.clip(lower=0) return df
[docs] def generate_multicell_geolift_data() -> pd.DataFrame: """Generate synthetic data for a geolift example. This will consists of 6 untreated countries. The treated unit `Denmark` is a weighted combination of the untreated units. We additionally specify a treatment effect which takes effect after the `treatment_time`. The timeseries data is observed at weekly resolution and has annual seasonality, with this seasonality being a drawn from a Gaussian Process with a periodic kernel.""" n_years = 4 treatment_time = pd.to_datetime("2022-01-01") causal_impact = 0.2 time = pd.date_range(start="2019-01-01", periods=52 * n_years, freq="W") untreated = [ "u1", "u2", "u3", "u4", "u5", "u6", "u7", "u8", "u9", "u10", "u11", "u12", ] df = ( pd.DataFrame( { country: create_series(n_years=n_years, intercept=3) for country in untreated } ) .assign(time=time) .set_index("time") ) treated = ["t1", "t2", "t3", "t4"] for treated_geo in treated: # create treated unit as a weighted sum of the untreated units weights = np.random.dirichlet(np.ones(len(untreated)), size=1)[0] df[treated_geo] = np.dot(df[untreated].values, weights) # add treatment effect df[treated_geo] += np.where(df.index < treatment_time, 0, causal_impact) # add observation noise to all geos for col in untreated + treated: df[col] += np.random.normal(size=len(df), scale=0.1) # ensure we never see any negative sales df = df.clip(lower=0) return df
[docs] def generate_event_study_data( n_units: int = 20, n_time: int = 20, treatment_time: int = 10, treated_fraction: float = 0.5, event_window: tuple[int, int] = (-5, 5), treatment_effects: dict[int, float] | None = None, unit_fe_sigma: float = 1.0, time_fe_sigma: float = 0.5, noise_sigma: float = 0.2, predictor_effects: dict[str, float] | None = None, ar_phi: float = 0.9, ar_scale: float = 1.0, seed: int | None = None, ) -> pd.DataFrame: """ Generate synthetic panel data for event study / dynamic DiD analysis. Creates panel data with unit and time fixed effects, where a fraction of units receive treatment at a common treatment time. Treatment effects can vary by event time (time relative to treatment). Optionally includes time-varying predictor variables generated via AR(1) processes. Parameters ---------- n_units : int Total number of units (treated + control). Default 20. n_time : int Number of time periods. Default 20. treatment_time : int Time period when treatment occurs (0-indexed). Default 10. treated_fraction : float Fraction of units that are treated. Default 0.5. event_window : tuple[int, int] Range of event times (K_min, K_max) for which treatment effects are defined. Default (-5, 5). treatment_effects : dict[int, float], optional Dictionary mapping event time k to treatment effect beta_k. Default creates effects that are 0 for k < 0 (pre-treatment) and gradually increase post-treatment. unit_fe_sigma : float Standard deviation for unit fixed effects. Default 1.0. time_fe_sigma : float Standard deviation for time fixed effects. Default 0.5. noise_sigma : float Standard deviation for observation noise. Default 0.2. predictor_effects : dict[str, float], optional Dictionary mapping predictor names to their true coefficients. Each predictor is generated as an AR(1) time series that varies over time but is the same for all units at a given time. For example, ``{'temperature': 0.3, 'humidity': -0.1}`` creates two predictors. Default None (no predictors). ar_phi : float AR(1) autoregressive coefficient controlling persistence of predictors. Values closer to 1 produce smoother, more persistent series. Default 0.9. ar_scale : float Standard deviation of the AR(1) innovation noise for predictors. Default 1.0. seed : int, optional Random seed for reproducibility. Returns ------- pd.DataFrame Panel data with columns: - unit: Unit identifier - time: Time period - y: Outcome variable - treat_time: Treatment time for unit (NaN if never treated) - treated: Whether unit is in treated group (0 or 1) - <predictor_name>: One column per predictor (if predictor_effects provided) Example -------- >>> from causalpy.data.simulate_data import generate_event_study_data >>> df = generate_event_study_data( ... n_units=20, n_time=20, treatment_time=10, seed=42 ... ) >>> df.shape (400, 5) >>> df.columns.tolist() ['unit', 'time', 'y', 'treat_time', 'treated'] With predictors: >>> df = generate_event_study_data( ... n_units=10, ... n_time=10, ... treatment_time=5, ... seed=42, ... predictor_effects={"temperature": 0.3, "humidity": -0.1}, ... ) >>> df.shape (100, 7) >>> "temperature" in df.columns and "humidity" in df.columns True """ if seed is not None: np.random.seed(seed) # Default treatment effects: zero pre-treatment, gradual increase post-treatment if treatment_effects is None: treatment_effects = {} for k in range(event_window[0], event_window[1] + 1): if k < 0: treatment_effects[k] = 0.0 # No anticipation else: # Gradual treatment effect that increases post-treatment treatment_effects[k] = 0.5 + 0.1 * k # Determine treated units n_treated = int(n_units * treated_fraction) treated_units = set(range(n_treated)) # Generate unit fixed effects unit_fe = np.random.normal(0, unit_fe_sigma, n_units) # Generate time fixed effects time_fe = np.random.normal(0, time_fe_sigma, n_time) # Generate predictor time series (if any) # Each predictor is an AR(1) series that varies over time but is the same # for all units at a given time predictors: dict[str, np.ndarray] = {} if predictor_effects is not None: for predictor_name in predictor_effects: predictors[predictor_name] = generate_ar1_series( n=n_time, phi=ar_phi, scale=ar_scale ) # Build panel data data = [] for unit in range(n_units): is_treated = unit in treated_units unit_treat_time = treatment_time if is_treated else np.nan for t in range(n_time): # Base outcome: unit FE + time FE + noise y = unit_fe[unit] + time_fe[t] + np.random.normal(0, noise_sigma) # Add predictor contributions to outcome if predictor_effects is not None: for predictor_name, coef in predictor_effects.items(): y += coef * predictors[predictor_name][t] # Add treatment effect for treated units in event window if is_treated: event_time = t - treatment_time if ( event_window[0] <= event_time <= event_window[1] and event_time in treatment_effects ): y += treatment_effects[event_time] row = { "unit": unit, "time": t, "y": y, "treat_time": unit_treat_time, "treated": 1 if is_treated else 0, } # Add predictor values to the row for predictor_name, series in predictors.items(): row[predictor_name] = series[t] data.append(row) return pd.DataFrame(data)
# ----------------- # UTILITY FUNCTIONS # -----------------
[docs] def generate_ar1_series( n: int, phi: float = 0.9, scale: float = 1.0, initial: float = 0.0, ) -> np.ndarray: """ Generate an AR(1) autoregressive time series. The AR(1) process is defined as: x_{t+1} = phi * x_t + eta_t, where eta_t ~ N(0, scale^2) Parameters ---------- n : int Length of the time series to generate. phi : float Autoregressive coefficient controlling persistence. Values closer to 1 produce smoother, more persistent series. Must be in (-1, 1) for stationarity. Default 0.9. scale : float Standard deviation of the innovation noise. Default 1.0. initial : float Initial value of the series. Default 0.0. Returns ------- np.ndarray Array of length n containing the AR(1) time series. Example ------- >>> from causalpy.data.simulate_data import generate_ar1_series >>> np.random.seed(42) >>> series = generate_ar1_series(n=10, phi=0.9, scale=0.5) >>> len(series) 10 """ series = np.zeros(n) series[0] = initial innovations = np.random.normal(0, scale, n - 1) for t in range(1, n): series[t] = phi * series[t - 1] + innovations[t - 1] return series
[docs] def generate_seasonality( n: int = 12, amplitude: int = 1, length_scale: float = 0.5 ) -> np.ndarray: """Generate monthly seasonality by sampling from a Gaussian process with a Gaussian kernel, using numpy code""" # Generate the covariance matrix x = np.linspace(0, 1, n) x1, x2 = np.meshgrid(x, x) cov = periodic_kernel( x1, x2, period=1, length_scale=length_scale, amplitude=amplitude ) # Generate the seasonality seasonality = np.random.multivariate_normal(np.zeros(n), cov) return seasonality
[docs] def periodic_kernel( x1: np.ndarray, x2: np.ndarray, period: int = 1, length_scale: float = 1.0, amplitude: int = 1, ) -> np.ndarray: """Generate a periodic kernel for gaussian process""" return amplitude**2 * np.exp( -2 * np.sin(np.pi * np.abs(x1 - x2) / period) ** 2 / length_scale**2 )
[docs] def create_series( n: int = 52, amplitude: int = 1, length_scale: int = 2, n_years: int = 4, intercept: int = 3, ) -> np.ndarray: """ Returns numpy tile with generated seasonality data repeated over multiple years """ return np.tile( generate_seasonality(n=n, amplitude=amplitude, length_scale=2) + intercept, n_years, )