Advanced Topics
Deep dive into the mathematical foundations, advanced configurations, and production deployment of GeoLift.
Mathematical Background
Synthetic Control Method
The synthetic control method creates a weighted combination of control units to serve as a counterfactual for treated units. For a treated unit \(j\), the synthetic control is:
where \(\mathcal{D}\) denotes the donor pool (control units), and \(w_i\) are donor weights that minimize the pre-treatment prediction error:
subject to the constraints:
\(\sum_{i \in \mathcal{D}} w_i = 1\) (weights sum to one)
\(w_i \geq 0\) for all \(i \in \mathcal{D}\) (non-negative weights)
where \(T_0\) denotes the last pre-treatment period.
SparseSC Enhancement
SparseSC improves traditional synthetic control by jointly optimizing:
Feature weights (V-matrix): Diagonal matrix determining which pre-treatment features matter most
Unit weights (W-matrix): Vector of weights for combining control units
The optimization problem becomes:
where:
\(X_1 \in \mathbb{R}^{K \times 1}\) represents \(K\) pre-treatment features for the treated unit
\(X_0 \in \mathbb{R}^{K \times N}\) represents \(K\) pre-treatment features for \(N\) control units
\(W \in \mathbb{R}^{N \times 1}\) is the vector of unit weights
\(V \in \mathbb{R}^{K \times K}\) is a diagonal matrix of feature weights
\(\lambda > 0\) is the regularization parameter controlling sparsity
\(\|x\|_V^2 = x^T V x\) is the V-weighted squared norm
\(\|W\|_1 = \sum_{i=1}^{N} |w_i|\) is the L1 norm promoting sparsity
Statistical Inference
Bootstrap Inference
Resample the data with replacement to create empirical distribution of treatment effects:
# Bootstrap procedure for statistical inference
bootstrap_effects = []
n_bootstrap = 1000 # Number of bootstrap iterations
for b in range(n_bootstrap):
# Resample control units with replacement
boot_indices = np.random.choice(len(control_units),
size=len(control_units),
replace=True)
boot_data = original_data[boot_indices]
# Refit synthetic control model
boot_effect = fit_and_estimate(boot_data, treated_unit)
bootstrap_effects.append(boot_effect)
# Calculate 95% confidence intervals (percentile method)
ci_lower = np.percentile(bootstrap_effects, 2.5)
ci_upper = np.percentile(bootstrap_effects, 97.5)
# Standard error estimation
se_bootstrap = np.std(bootstrap_effects)
Placebo Inference (Permutation Test)
Test significance by applying the same method to untreated units:
# Placebo test procedure for inference
placebo_effects = []
true_effect = estimate_effect_for_unit(treated_unit)
for control_unit in control_units:
# Pretend control unit received treatment at same time
# Use remaining controls as donor pool
remaining_controls = [c for c in control_units if c != control_unit]
placebo_effect = fit_synthetic_control(
treated=control_unit,
controls=remaining_controls,
pre_period=pre_period,
post_period=post_period
)
placebo_effects.append(placebo_effect)
# Calculate two-sided p-value using Fisher's exact test logic
# Proportion of placebo effects as extreme as true effect
p_value = np.mean(np.abs(placebo_effects) >= np.abs(true_effect))
# Alternative: one-sided test if direction is hypothesized
p_value_right = np.mean(placebo_effects >= true_effect)
p_value_left = np.mean(placebo_effects <= true_effect)
Advanced Configuration
Custom Donor Selection
# Manual donor specification
analyzer.set_custom_donors(
donor_markets=[501, 505, 506, 507, 508, 509, 510],
donor_weights=[0.25, 0.20, 0.15, 0.15, 0.10, 0.10, 0.05]
)
# Geographic constraints
analyzer.set_donor_constraints(
exclude_regions=['West Coast'], # Exclude specific regions
min_distance_km=500, # Minimum distance from treatment
max_correlation=0.95 # Avoid overly similar markets
)
# Time-varying donor weights
analyzer.set_time_varying_donors(
pre_period_donors=[501, 502, 503],
post_period_donors=[504, 505, 506] # Different donors for different periods
)
Multiple Treatment Cohorts
# Staggered adoption design
cohort_config = {
'cohort_1': {
'markets': [502, 503],
'start_date': '2023-06-01',
'end_date': '2023-08-31'
},
'cohort_2': {
'markets': [504, 505],
'start_date': '2023-07-01',
'end_date': '2023-09-30'
},
'cohort_3': {
'markets': [506, 507],
'start_date': '2023-08-01',
'end_date': '2023-10-31'
}
}
results = analyzer.analyze_staggered_adoption(cohort_config)
Regularization and Model Selection
# Cross-validation for optimal regularization
from sparsesc.cross_validation import CV_score
# Grid search over lambda values
lambda_grid = np.logspace(-4, 2, 20)
cv_scores = []
for lam in lambda_grid:
score = CV_score(
X=features,
Y=outcomes,
treated_units=treated,
control_units=controls,
lambda_=lam,
cv_folds=5
)
cv_scores.append(score)
optimal_lambda = lambda_grid[np.argmin(cv_scores)]
Performance Optimization
Large-Scale Analysis
# Parallel processing for multiple markets
from multiprocessing import Pool
import numpy as np
def analyze_market(market_config):
analyzer = GeoLiftAnalyzer()
return analyzer.run_analysis(**market_config)
# Process multiple markets in parallel
market_configs = [config1, config2, config3, ...]
with Pool(processes=4) as pool:
results = pool.map(analyze_market, market_configs)
Memory Optimization
# Chunked processing for large datasets
def process_large_dataset(data_path, chunk_size=10000):
results = []
for chunk in pd.read_csv(data_path, chunksize=chunk_size):
# Process chunk
chunk_result = process_chunk(chunk)
results.append(chunk_result)
return combine_results(results)
# Sparse matrix operations for efficiency
from scipy.sparse import csr_matrix
# Convert to sparse format for large feature matrices
X_sparse = csr_matrix(X_dense)
results = fit_sparse(X_sparse, Y, treated_units, control_units)
Caching and Persistence
# Cache intermediate results
import joblib
# Save fitted model
joblib.dump(fitted_model, 'models/geolift_model.pkl')
# Load cached model
cached_model = joblib.load('models/geolift_model.pkl')
# Incremental updates
def incremental_update(existing_model, new_data):
# Update model with new data without full refit
updated_model = existing_model.update(new_data)
return updated_model
Production Deployment
Azure Batch Processing
# Azure Batch configuration
batch_config = {
'pool_id': 'geolift-pool',
'vm_size': 'Standard_D4s_v3',
'node_count': 10,
'container_image': 'geolift:latest'
}
# Submit batch job
from geolift.azure_batch import BatchProcessor
processor = BatchProcessor(batch_config)
job_id = processor.submit_analysis_job(
campaigns=campaign_list,
output_container='results'
)
# Monitor progress
status = processor.get_job_status(job_id)
Docker Deployment
# Dockerfile for GeoLift
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ ./src/
COPY configs/ ./configs/
CMD ["python", "-m", "geolift.cli", "analyze", "--config", "configs/production.yaml"]
API Service
# FastAPI service for GeoLift analysis
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
app = FastAPI()
class AnalysisRequest(BaseModel):
data_path: str
treatment_markets: list
treatment_start_date: str
treatment_end_date: str
@app.post("/analyze")
async def run_analysis(request: AnalysisRequest, background_tasks: BackgroundTasks):
# Queue analysis job
job_id = queue_analysis_job(request)
return {"job_id": job_id, "status": "queued"}
@app.get("/results/{job_id}")
async def get_results(job_id: str):
results = fetch_results(job_id)
return results
Custom Extensions
Custom Inference Methods
class BayesianInference:
"""
Bayesian inference for treatment effects using conjugate normal prior.
Assumes: Effect ~ N(μ, σ²) with prior μ ~ N(μ₀, τ²)
"""
def __init__(self, prior_mean=0, prior_var=1):
self.prior_mean = prior_mean # μ₀
self.prior_var = prior_var # τ²
def calculate_posterior(self, observed_effect, se):
"""
Calculate posterior distribution using Bayes' theorem.
Posterior precision = prior precision + data precision
"""
# Precision-weighted updating
prior_precision = 1 / self.prior_var
data_precision = 1 / (se ** 2)
# Posterior variance (inverse of summed precisions)
posterior_var = 1 / (prior_precision + data_precision)
# Posterior mean (precision-weighted average)
posterior_mean = posterior_var * (
self.prior_mean * prior_precision +
observed_effect * data_precision
)
return posterior_mean, posterior_var
def credible_interval(self, posterior_mean, posterior_var, level=0.95):
"""
Calculate Bayesian credible interval (highest posterior density).
For normal posterior, this equals the equal-tailed interval.
"""
from scipy.stats import norm
alpha = 1 - level
z_critical = norm.ppf(1 - alpha/2)
margin = z_critical * np.sqrt(posterior_var)
return (posterior_mean - margin, posterior_mean + margin)
# Use custom inference with informative prior
analyzer.set_inference_method(
BayesianInference(prior_mean=0.05, prior_var=0.01)
)
Custom Outcome Models
class HierarchicalModel:
def __init__(self, group_effects=True):
self.group_effects = group_effects
def fit(self, X, Y, groups=None):
# Hierarchical modeling with group effects
if self.group_effects and groups is not None:
# Fit mixed effects model
model = self.fit_mixed_effects(X, Y, groups)
else:
# Standard model
model = self.fit_standard(X, Y)
return model
def predict(self, model, X_new, groups_new=None):
# Generate predictions with uncertainty
predictions = model.predict(X_new)
prediction_intervals = model.prediction_interval(X_new)
return predictions, prediction_intervals
# Use custom model
analyzer.set_outcome_model(HierarchicalModel(group_effects=True))
Debugging and Diagnostics
Advanced Diagnostics
# Comprehensive diagnostic suite
def run_full_diagnostics(analyzer, results):
diagnostics = {}
# Pre-period fit quality
diagnostics['pre_fit'] = analyzer.evaluate_pre_fit()
# Residual analysis
diagnostics['residuals'] = analyzer.analyze_residuals()
# Influence analysis
diagnostics['influence'] = analyzer.donor_influence_analysis()
# Robustness checks
diagnostics['robustness'] = analyzer.robustness_checks()
# Cross-validation
diagnostics['cv_performance'] = analyzer.cross_validate()
return diagnostics
# Generate diagnostic report
diagnostics = run_full_diagnostics(analyzer, results)
analyzer.export_diagnostic_report(diagnostics, 'diagnostic_report.html')
Sensitivity Analysis
# Systematic sensitivity testing
sensitivity_tests = {
'pre_period_length': [12, 16, 20, 24],
'donor_threshold': [0.6, 0.7, 0.8, 0.9],
'regularization': [0.001, 0.01, 0.1, 1.0],
'inference_method': ['bootstrap', 'placebo', 'jackknife']
}
sensitivity_results = {}
for param, values in sensitivity_tests.items():
param_results = []
for value in values:
# Update configuration
config_copy = config.copy()
config_copy[param] = value
# Run analysis
result = analyzer.run_analysis(**config_copy)
param_results.append({
'value': value,
'effect': result.relative_lift,
'p_value': result.p_value
})
sensitivity_results[param] = param_results
# Visualize sensitivity
analyzer.plot_sensitivity_analysis(sensitivity_results)
Research Extensions
Difference-in-Differences Integration
# Combine synthetic control with DiD for robust causal inference
class SyntheticDiD:
"""
Synthetic Difference-in-Differences (SDID) estimator.
Combines SC weights with DiD time weights for double robustness.
"""
def __init__(self, sc_weight=0.7, did_weight=0.3):
self.sc_weight = sc_weight
self.did_weight = did_weight
def fit(self, data, treatment_markets, control_markets):
# Fit synthetic control weights
sc_weights = self.compute_sc_weights(
data, treatment_markets, control_markets
)
# Compute DiD estimator
# ATT = E[Y₁(1) - Y₀(1)|D=1] where D is treatment indicator
did_effect = self.compute_did(
data, treatment_markets, control_markets
)
# Compute SC estimator with optimal weights
sc_effect = self.compute_sc_effect(
data, treatment_markets, control_markets, sc_weights
)
# Weighted average of estimators (ensemble approach)
combined_effect = (
self.sc_weight * sc_effect +
self.did_weight * did_effect
)
# Compute variance using Delta method
combined_variance = (
self.sc_weight**2 * sc_effect.variance +
self.did_weight**2 * did_effect.variance
)
return combined_effect, np.sqrt(combined_variance)
def compute_did(self, data, treated, controls):
"""
Classic 2x2 DiD: (Ȳ₁ᵗʳᵉᵃᵗ - Ȳ₀ᵗʳᵉᵃᵗ) - (Ȳ₁ᶜᵒⁿᵗʳᵒˡ - Ȳ₀ᶜᵒⁿᵗʳᵒˡ)
"""
# Pre and post period means
y_treat_post = data[data.unit.isin(treated) & data.post].y.mean()
y_treat_pre = data[data.unit.isin(treated) & ~data.post].y.mean()
y_control_post = data[data.unit.isin(controls) & data.post].y.mean()
y_control_pre = data[data.unit.isin(controls) & ~data.post].y.mean()
# DiD estimate
did = (y_treat_post - y_treat_pre) - (y_control_post - y_control_pre)
return did
# Use hybrid approach with optimal weighting
hybrid_analyzer = SyntheticDiD(sc_weight=0.8, did_weight=0.2)
Machine Learning Integration
# ML-enhanced donor selection
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
class MLDonorSelector:
def __init__(self, model=None):
self.model = model or RandomForestRegressor(n_estimators=100)
def select_donors(self, features, outcomes, treatment_markets):
# Use ML to predict which donors will work best
X_train, y_train = self.prepare_training_data(features, outcomes)
self.model.fit(X_train, y_train)
# Score potential donors
donor_scores = self.model.predict(donor_features)
best_donors = np.argsort(donor_scores)[-10:] # Top 10
return best_donors
# Use ML-enhanced selection
ml_selector = MLDonorSelector()
analyzer.set_donor_selector(ml_selector)
This advanced guide covers the mathematical foundations, production deployment options, and research extensions for GeoLift. For practical usage, start with the Quick Start Guide and User Guide.