rep_payout_history, plan_parameters, quota_assignments, attainment_monthly. Grain: one row per rep per measurement period. Contains actual payout, quota, attainment%, plan ID, and all curve parameters.opportunity_history, activity_log, pipeline_snapshot. Grain: one row per opportunity per stage-date. Contains deal size, stage velocity, win rate, product mix, customer segment. Critical for behavioral features.employee_profile, territory_assignment, tenure_history. Grain: one row per employee per period. Contains tenure, role, segment, manager, quota-to-market-size ratio. Used for fairness testing and cohort analysis.-- Base table: one row per rep per fiscal quarter
-- Grain: rep_id × fiscal_quarter
-- Source: Snowflake (Microsoft internal DW on Azure Synapse / Fabric)
WITH rep_base AS (
SELECT
r.rep_id,
r.fiscal_quarter,
r.segment, -- Enterprise / SMC / Partner
r.solution_area, -- Azure / M365 / Security
r.quota_amount,
r.actual_revenue,
r.actual_revenue / r.quota_amount AS attainment_pct,
r.plan_id,
p.ti_amount,
p.accel_rate,
p.threshold_pct,
p.cap_multiplier,
p.base_salary,
r.actual_revenue / r.quota_amount * p.ti_amount AS raw_incentive_earned
FROM attainment_monthly r
JOIN plan_parameters p ON r.plan_id = p.plan_id
AND r.fiscal_quarter BETWEEN p.effective_start AND p.effective_end
),
rep_attributes AS (
SELECT
e.rep_id,
e.tenure_months,
e.manager_id,
e.territory_id,
t.tam_usd, -- total addressable market
t.current_penetration_pct,
e.quota_amount / NULLIF(t.tam_usd, 0) AS quota_to_tam_ratio
FROM employee_profile e
JOIN territory_assignment t ON e.territory_id = t.territory_id
),
rep_behavior AS (
SELECT
rep_id,
fiscal_quarter,
COUNT(DISTINCT opportunity_id) AS deal_count,
AVG(deal_size_usd) AS avg_deal_size,
SUM(CASE WHEN product_family = 'Azure'
THEN deal_size_usd ELSE 0 END)
/ NULLIF(SUM(deal_size_usd), 0) AS azure_mix_pct,
AVG(days_to_close) AS avg_cycle_days,
SUM(CASE WHEN close_quarter = fiscal_quarter
THEN deal_size_usd ELSE 0 END)
/ NULLIF(SUM(deal_size_usd), 0) AS q_end_concentration -- sandbagging signal
FROM opportunity_history
GROUP BY rep_id, fiscal_quarter
)
SELECT
b.*,
a.tenure_months,
a.tam_usd,
a.quota_to_tam_ratio,
a.current_penetration_pct,
bh.deal_count,
bh.avg_deal_size,
bh.azure_mix_pct,
bh.avg_cycle_days,
bh.q_end_concentration
FROM rep_base b
LEFT JOIN rep_attributes a ON b.rep_id = a.rep_id
LEFT JOIN rep_behavior bh ON b.rep_id = bh.rep_id
AND b.fiscal_quarter = bh.fiscal_quarter
ORDER BY b.fiscal_quarter, b.rep_id;
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
# Load analytical base table
df = spark.table("comp_analytics.rep_base_table")
# 1. Marginal incentive rate — the behavioral lever
# dPayout/dAttain at current attainment
def marginal_incentive(attain, ti, accel, floor, cap):
if attain < floor:
return 0
elif attain <= 1.0:
return ti / (1 - floor) # slope of linear zone
elif attain <= cap:
return ti * accel # accelerator zone slope
else:
return 0 # above cap — no marginal incentive
df = df.withColumn("marginal_incentive_rate",
F.udf(marginal_incentive)(
F.col("attainment_pct"),
F.col("ti_amount"),
F.col("accel_rate"),
F.col("threshold_pct"),
F.col("cap_multiplier")
))
# 2. Distance to next kink point (accelerator step)
# How far is the rep from the next behavioral inflection?
df = df.withColumn("distance_to_kink",
F.when(F.col("attainment_pct") < 1.0,
F.lit(1.0) - F.col("attainment_pct")) # distance to quota
.when(F.col("attainment_pct") < 1.2,
F.lit(1.2) - F.col("attainment_pct")) # distance to step 2
.when(F.col("attainment_pct") < 1.5,
F.lit(1.5) - F.col("attainment_pct")) # distance to step 3
.otherwise(F.lit(0)))
# 3. Quota difficulty index — controls for structural territory hardness
df = df.withColumn("quota_difficulty_index",
F.col("quota_amount") /
(F.col("tam_usd") * F.col("market_growth_rate") + F.lit(1e-6)))
# 4. Trailing 3-quarter attainment — controls for rep ability
window = Window.partitionBy("rep_id").orderBy("fiscal_quarter_int")
.rowsBetween(-3, -1)
df = df.withColumn("trailing_attainment_3q",
F.avg("attainment_pct").over(window))
# 5. Sandbagging signal — Q-end deal concentration
df = df.withColumn("sandbagging_flag",
(F.col("q_end_concentration") > 1.5).cast("int"))
# 6. Interaction term: marginal rate × distance to kink
# Captures: high incentive + close to step = maximum behavioral pull
df = df.withColumn("incentive_proximity_interaction",
F.col("marginal_incentive_rate") * (1 / (F.col("distance_to_kink") + 0.01)))
Payout_i = β₀ + β₁(attainment_i) + β₂(TI_i) + β₃(accel_rate_i) + β₄(attainment_i²) + β₅(attainment_i × accel_rate_i) + β₆(tenure_i) + β₇(quota_difficulty_i) + ε_i
The attainment² term: Captures the nonlinearity of the accelerator zone. Without it, OLS underestimates payout for high attainers.Attainment_it = α + β₁(Treated_i) + β₂(Post_t) + β₃(Treated_i × Post_t) + γX_it + ε_it
β₃ is the DiD estimator — the causal effect of the plan change. This is the behavioral elasticity.Attainment_it = α_i + β₁(QuotaDifficulty_it) + β₂(TAM_growth_it) + β₃(MarginalRate_it) + ε_it
α_i = rep fixed effect. If QuotaDifficulty_it has a large negative coefficient after controlling for fixed effects, quota is structurally harder for certain territory types — a fairness finding.
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.model_selection import KFold, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import statsmodels.api as sm
import statsmodels.formula.api as smf
# ── 1. TRAIN / VALIDATION / TEST SPLIT ──────────────────────────────
# Temporal split — NEVER random split on time-series data
# Train: FY2021–FY2022 | Val: FY2023 Q1-Q2 | Test: FY2023 Q3-Q4
train = df[df['fiscal_year'] <= 2022]
val = df[(df['fiscal_year'] == 2023) & (df['fiscal_quarter'].isin(['Q1','Q2']))]
test = df[(df['fiscal_year'] == 2023) & (df['fiscal_quarter'].isin(['Q3','Q4']))]
# ── 2. BASELINE OLS (Iteration 1) ────────────────────────────────────
# Start simple — establish baseline R²
model_v1 = smf.ols(
'payout ~ attainment_pct + ti_amount + accel_rate',
data=train
).fit()
print(f"Baseline R²: {model_v1.rsquared:.3f}") # expect ~0.55–0.65
# ── 3. ADD NONLINEAR TERM (Iteration 2) ──────────────────────────────
# attainment² captures accelerator zone nonlinearity
train['attainment_sq'] = train['attainment_pct'] ** 2
model_v2 = smf.ols(
'payout ~ attainment_pct + attainment_sq + ti_amount + accel_rate',
data=train
).fit()
print(f"R² with quadratic term: {model_v2.rsquared:.3f}") # expect +0.05–0.08
# ── 4. ADD INTERACTION TERM (Iteration 3) ────────────────────────────
# attainment × accel_rate: does the accelerator effect grow with attainment?
model_v3 = smf.ols(
'payout ~ attainment_pct + attainment_sq + ti_amount + accel_rate '
'+ attainment_pct:accel_rate',
data=train
).fit()
print(f"R² with interaction: {model_v3.rsquared:.3f}") # expect +0.02–0.04
# ── 5. ADD BEHAVIORAL AND CONTEXT FEATURES (Iteration 4) ─────────────
model_v4 = smf.ols(
'payout ~ attainment_pct + attainment_sq + ti_amount + accel_rate '
'+ attainment_pct:accel_rate '
'+ trailing_attainment_3q + quota_difficulty_index '
'+ tenure_months + marginal_incentive_rate',
data=train
).fit()
print(f"R² with full features: {model_v4.rsquared:.3f}") # expect ~0.78–0.85
# ── 6. SIGNIFICANCE TESTING ──────────────────────────────────────────
# Keep features only if p < 0.05 AND theoretically justified
print(model_v4.summary())
# Drop features with p > 0.10 unless theoretically critical
# Check VIF for multicollinearity
from statsmodels.stats.outliers_influence import variance_inflation_factor
vif = pd.DataFrame({
'feature': train[features].columns,
'VIF': [variance_inflation_factor(train[features].values, i)
for i in range(len(features))]
})
# Flag VIF > 10 as multicollinearity risk
# ── 7. CROSS-VALIDATION ──────────────────────────────────────────────
# 5-fold CV on training data — ensure R² is not overfit
pipeline = Pipeline([('scaler', StandardScaler()),
('model', LinearRegression())])
cv_scores = cross_val_score(pipeline, train[features], train['payout'],
cv=KFold(n_splits=5, shuffle=False),
scoring='r2')
print(f"CV R² mean: {cv_scores.mean():.3f} ± {cv_scores.std():.3f}")
# ── 8. RESIDUAL DIAGNOSTICS ──────────────────────────────────────────
# Check: normality, homoscedasticity, no autocorrelation
residuals = model_v4.resid
# Durbin-Watson test for autocorrelation (want ~2.0)
from statsmodels.stats.stattools import durbin_watson
dw = durbin_watson(residuals)
# Breusch-Pagan for heteroscedasticity
from statsmodels.stats.diagnostic import het_breuschpagan
bp_test = het_breuschpagan(residuals, model_v4.model.exog)
import numpy as np
from scipy import stats
def run_monte_carlo(rep_df, new_plan_params, n_sims=2000, seed=42):
"""
rep_df: DataFrame with predicted_attain_mean and residual_std per rep
new_plan_params: dict with ti, accel_rate, threshold, cap, base
"""
np.random.seed(seed)
N = len(rep_df)
total_costs = np.zeros(n_sims)
for sim in range(n_sims):
# Draw attainment for every rep from model-calibrated Normal
attainments = np.random.normal(
loc=rep_df['predicted_attain_mean'].values,
scale=rep_df['residual_std'].values
).clip(min=0)
# Apply new payout formula to each rep
payouts = vectorized_payout(
attainments,
ti=new_plan_params['ti'],
accel=new_plan_params['accel_rate'],
floor=new_plan_params['threshold'],
cap=new_plan_params['cap'],
base=new_plan_params['base_salary']
)
total_costs[sim] = payouts.sum()
return {
'p10': np.percentile(total_costs, 10),
'p50': np.percentile(total_costs, 50),
'p90': np.percentile(total_costs, 90),
'prob_over_budget': (total_costs > budget).mean(),
'distribution': total_costs
}
def sensitivity_analysis(rep_df, base_params, param_perturbations, budget):
"""
Runs Monte Carlo for each parameter perturbation.
Returns tornado chart data: which parameter moves P90 most.
"""
results = {}
base = run_monte_carlo(rep_df, base_params)
base_p90 = base['p90']
for param, delta in param_perturbations.items():
perturbed = base_params.copy()
perturbed[param] += delta
result = run_monte_carlo(rep_df, perturbed)
results[param] = result['p90'] - base_p90
return pd.Series(results).sort_values(ascending=False)