# Advanced Topics Deep dive into the mathematical foundations, advanced configurations, and production deployment of GeoLift. ## Mathematical Background ### Synthetic Control Method The synthetic control method creates a weighted combination of control units to serve as a counterfactual for treated units. For a treated unit $j$, the synthetic control is: $$\hat{Y}_{jt}^{synthetic} = \sum_{i \in \mathcal{D}} w_i Y_{it}$$ where $\mathcal{D}$ denotes the donor pool (control units), and $w_i$ are donor weights that minimize the pre-treatment prediction error: $$\min_{w} \sum_{t=1}^{T_0} \left(Y_{jt} - \sum_{i \in \mathcal{D}} w_i Y_{it}\right)^2$$ subject to the constraints: - $\sum_{i \in \mathcal{D}} w_i = 1$ (weights sum to one) - $w_i \geq 0$ for all $i \in \mathcal{D}$ (non-negative weights) where $T_0$ denotes the last pre-treatment period. ### SparseSC Enhancement SparseSC improves traditional synthetic control by jointly optimizing: 1. **Feature weights (V-matrix)**: Diagonal matrix determining which pre-treatment features matter most 2. **Unit weights (W-matrix)**: Vector of weights for combining control units The optimization problem becomes: $$\min_{V,W} \|X_1 - X_0 W\|_V^2 + \lambda \|W\|_1$$ where: - $X_1 \in \mathbb{R}^{K \times 1}$ represents $K$ pre-treatment features for the treated unit - $X_0 \in \mathbb{R}^{K \times N}$ represents $K$ pre-treatment features for $N$ control units - $W \in \mathbb{R}^{N \times 1}$ is the vector of unit weights - $V \in \mathbb{R}^{K \times K}$ is a diagonal matrix of feature weights - $\lambda > 0$ is the regularization parameter controlling sparsity - $\|x\|_V^2 = x^T V x$ is the V-weighted squared norm - $\|W\|_1 = \sum_{i=1}^{N} |w_i|$ is the L1 norm promoting sparsity ### Statistical Inference #### Bootstrap Inference Resample the data with replacement to create empirical distribution of treatment effects: ```python # Bootstrap procedure for statistical inference bootstrap_effects = [] n_bootstrap = 1000 # Number of bootstrap iterations for b in range(n_bootstrap): # Resample control units with replacement boot_indices = np.random.choice(len(control_units), size=len(control_units), replace=True) boot_data = original_data[boot_indices] # Refit synthetic control model boot_effect = fit_and_estimate(boot_data, treated_unit) bootstrap_effects.append(boot_effect) # Calculate 95% confidence intervals (percentile method) ci_lower = np.percentile(bootstrap_effects, 2.5) ci_upper = np.percentile(bootstrap_effects, 97.5) # Standard error estimation se_bootstrap = np.std(bootstrap_effects) ``` #### Placebo Inference (Permutation Test) Test significance by applying the same method to untreated units: ```python # Placebo test procedure for inference placebo_effects = [] true_effect = estimate_effect_for_unit(treated_unit) for control_unit in control_units: # Pretend control unit received treatment at same time # Use remaining controls as donor pool remaining_controls = [c for c in control_units if c != control_unit] placebo_effect = fit_synthetic_control( treated=control_unit, controls=remaining_controls, pre_period=pre_period, post_period=post_period ) placebo_effects.append(placebo_effect) # Calculate two-sided p-value using Fisher's exact test logic # Proportion of placebo effects as extreme as true effect p_value = np.mean(np.abs(placebo_effects) >= np.abs(true_effect)) # Alternative: one-sided test if direction is hypothesized p_value_right = np.mean(placebo_effects >= true_effect) p_value_left = np.mean(placebo_effects <= true_effect) ``` ## Advanced Configuration ### Custom Donor Selection ```python # Manual donor specification analyzer.set_custom_donors( donor_markets=[501, 505, 506, 507, 508, 509, 510], donor_weights=[0.25, 0.20, 0.15, 0.15, 0.10, 0.10, 0.05] ) # Geographic constraints analyzer.set_donor_constraints( exclude_regions=['West Coast'], # Exclude specific regions min_distance_km=500, # Minimum distance from treatment max_correlation=0.95 # Avoid overly similar markets ) # Time-varying donor weights analyzer.set_time_varying_donors( pre_period_donors=[501, 502, 503], post_period_donors=[504, 505, 506] # Different donors for different periods ) ``` ### Multiple Treatment Cohorts ```python # Staggered adoption design cohort_config = { 'cohort_1': { 'markets': [502, 503], 'start_date': '2023-06-01', 'end_date': '2023-08-31' }, 'cohort_2': { 'markets': [504, 505], 'start_date': '2023-07-01', 'end_date': '2023-09-30' }, 'cohort_3': { 'markets': [506, 507], 'start_date': '2023-08-01', 'end_date': '2023-10-31' } } results = analyzer.analyze_staggered_adoption(cohort_config) ``` ### Regularization and Model Selection ```python # Cross-validation for optimal regularization from sparsesc.cross_validation import CV_score # Grid search over lambda values lambda_grid = np.logspace(-4, 2, 20) cv_scores = [] for lam in lambda_grid: score = CV_score( X=features, Y=outcomes, treated_units=treated, control_units=controls, lambda_=lam, cv_folds=5 ) cv_scores.append(score) optimal_lambda = lambda_grid[np.argmin(cv_scores)] ``` ## Performance Optimization ### Large-Scale Analysis ```python # Parallel processing for multiple markets from multiprocessing import Pool import numpy as np def analyze_market(market_config): analyzer = GeoLiftAnalyzer() return analyzer.run_analysis(**market_config) # Process multiple markets in parallel market_configs = [config1, config2, config3, ...] with Pool(processes=4) as pool: results = pool.map(analyze_market, market_configs) ``` ### Memory Optimization ```python # Chunked processing for large datasets def process_large_dataset(data_path, chunk_size=10000): results = [] for chunk in pd.read_csv(data_path, chunksize=chunk_size): # Process chunk chunk_result = process_chunk(chunk) results.append(chunk_result) return combine_results(results) # Sparse matrix operations for efficiency from scipy.sparse import csr_matrix # Convert to sparse format for large feature matrices X_sparse = csr_matrix(X_dense) results = fit_sparse(X_sparse, Y, treated_units, control_units) ``` ### Caching and Persistence ```python # Cache intermediate results import joblib # Save fitted model joblib.dump(fitted_model, 'models/geolift_model.pkl') # Load cached model cached_model = joblib.load('models/geolift_model.pkl') # Incremental updates def incremental_update(existing_model, new_data): # Update model with new data without full refit updated_model = existing_model.update(new_data) return updated_model ``` ## Production Deployment ### Azure Batch Processing ```python # Azure Batch configuration batch_config = { 'pool_id': 'geolift-pool', 'vm_size': 'Standard_D4s_v3', 'node_count': 10, 'container_image': 'geolift:latest' } # Submit batch job from geolift.azure_batch import BatchProcessor processor = BatchProcessor(batch_config) job_id = processor.submit_analysis_job( campaigns=campaign_list, output_container='results' ) # Monitor progress status = processor.get_job_status(job_id) ``` ### Docker Deployment ```dockerfile # Dockerfile for GeoLift FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY src/ ./src/ COPY configs/ ./configs/ CMD ["python", "-m", "geolift.cli", "analyze", "--config", "configs/production.yaml"] ``` ### API Service ```python # FastAPI service for GeoLift analysis from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel app = FastAPI() class AnalysisRequest(BaseModel): data_path: str treatment_markets: list treatment_start_date: str treatment_end_date: str @app.post("/analyze") async def run_analysis(request: AnalysisRequest, background_tasks: BackgroundTasks): # Queue analysis job job_id = queue_analysis_job(request) return {"job_id": job_id, "status": "queued"} @app.get("/results/{job_id}") async def get_results(job_id: str): results = fetch_results(job_id) return results ``` ## Custom Extensions ### Custom Inference Methods ```python class BayesianInference: """ Bayesian inference for treatment effects using conjugate normal prior. Assumes: Effect ~ N(μ, σ²) with prior μ ~ N(μ₀, τ²) """ def __init__(self, prior_mean=0, prior_var=1): self.prior_mean = prior_mean # μ₀ self.prior_var = prior_var # τ² def calculate_posterior(self, observed_effect, se): """ Calculate posterior distribution using Bayes' theorem. Posterior precision = prior precision + data precision """ # Precision-weighted updating prior_precision = 1 / self.prior_var data_precision = 1 / (se ** 2) # Posterior variance (inverse of summed precisions) posterior_var = 1 / (prior_precision + data_precision) # Posterior mean (precision-weighted average) posterior_mean = posterior_var * ( self.prior_mean * prior_precision + observed_effect * data_precision ) return posterior_mean, posterior_var def credible_interval(self, posterior_mean, posterior_var, level=0.95): """ Calculate Bayesian credible interval (highest posterior density). For normal posterior, this equals the equal-tailed interval. """ from scipy.stats import norm alpha = 1 - level z_critical = norm.ppf(1 - alpha/2) margin = z_critical * np.sqrt(posterior_var) return (posterior_mean - margin, posterior_mean + margin) # Use custom inference with informative prior analyzer.set_inference_method( BayesianInference(prior_mean=0.05, prior_var=0.01) ) ``` ### Custom Outcome Models ```python class HierarchicalModel: def __init__(self, group_effects=True): self.group_effects = group_effects def fit(self, X, Y, groups=None): # Hierarchical modeling with group effects if self.group_effects and groups is not None: # Fit mixed effects model model = self.fit_mixed_effects(X, Y, groups) else: # Standard model model = self.fit_standard(X, Y) return model def predict(self, model, X_new, groups_new=None): # Generate predictions with uncertainty predictions = model.predict(X_new) prediction_intervals = model.prediction_interval(X_new) return predictions, prediction_intervals # Use custom model analyzer.set_outcome_model(HierarchicalModel(group_effects=True)) ``` ## Debugging and Diagnostics ### Advanced Diagnostics ```python # Comprehensive diagnostic suite def run_full_diagnostics(analyzer, results): diagnostics = {} # Pre-period fit quality diagnostics['pre_fit'] = analyzer.evaluate_pre_fit() # Residual analysis diagnostics['residuals'] = analyzer.analyze_residuals() # Influence analysis diagnostics['influence'] = analyzer.donor_influence_analysis() # Robustness checks diagnostics['robustness'] = analyzer.robustness_checks() # Cross-validation diagnostics['cv_performance'] = analyzer.cross_validate() return diagnostics # Generate diagnostic report diagnostics = run_full_diagnostics(analyzer, results) analyzer.export_diagnostic_report(diagnostics, 'diagnostic_report.html') ``` ### Sensitivity Analysis ```python # Systematic sensitivity testing sensitivity_tests = { 'pre_period_length': [12, 16, 20, 24], 'donor_threshold': [0.6, 0.7, 0.8, 0.9], 'regularization': [0.001, 0.01, 0.1, 1.0], 'inference_method': ['bootstrap', 'placebo', 'jackknife'] } sensitivity_results = {} for param, values in sensitivity_tests.items(): param_results = [] for value in values: # Update configuration config_copy = config.copy() config_copy[param] = value # Run analysis result = analyzer.run_analysis(**config_copy) param_results.append({ 'value': value, 'effect': result.relative_lift, 'p_value': result.p_value }) sensitivity_results[param] = param_results # Visualize sensitivity analyzer.plot_sensitivity_analysis(sensitivity_results) ``` ## Research Extensions ### Difference-in-Differences Integration ```python # Combine synthetic control with DiD for robust causal inference class SyntheticDiD: """ Synthetic Difference-in-Differences (SDID) estimator. Combines SC weights with DiD time weights for double robustness. """ def __init__(self, sc_weight=0.7, did_weight=0.3): self.sc_weight = sc_weight self.did_weight = did_weight def fit(self, data, treatment_markets, control_markets): # Fit synthetic control weights sc_weights = self.compute_sc_weights( data, treatment_markets, control_markets ) # Compute DiD estimator # ATT = E[Y₁(1) - Y₀(1)|D=1] where D is treatment indicator did_effect = self.compute_did( data, treatment_markets, control_markets ) # Compute SC estimator with optimal weights sc_effect = self.compute_sc_effect( data, treatment_markets, control_markets, sc_weights ) # Weighted average of estimators (ensemble approach) combined_effect = ( self.sc_weight * sc_effect + self.did_weight * did_effect ) # Compute variance using Delta method combined_variance = ( self.sc_weight**2 * sc_effect.variance + self.did_weight**2 * did_effect.variance ) return combined_effect, np.sqrt(combined_variance) def compute_did(self, data, treated, controls): """ Classic 2x2 DiD: (Ȳ₁ᵗʳᵉᵃᵗ - Ȳ₀ᵗʳᵉᵃᵗ) - (Ȳ₁ᶜᵒⁿᵗʳᵒˡ - Ȳ₀ᶜᵒⁿᵗʳᵒˡ) """ # Pre and post period means y_treat_post = data[data.unit.isin(treated) & data.post].y.mean() y_treat_pre = data[data.unit.isin(treated) & ~data.post].y.mean() y_control_post = data[data.unit.isin(controls) & data.post].y.mean() y_control_pre = data[data.unit.isin(controls) & ~data.post].y.mean() # DiD estimate did = (y_treat_post - y_treat_pre) - (y_control_post - y_control_pre) return did # Use hybrid approach with optimal weighting hybrid_analyzer = SyntheticDiD(sc_weight=0.8, did_weight=0.2) ``` ### Machine Learning Integration ```python # ML-enhanced donor selection from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error class MLDonorSelector: def __init__(self, model=None): self.model = model or RandomForestRegressor(n_estimators=100) def select_donors(self, features, outcomes, treatment_markets): # Use ML to predict which donors will work best X_train, y_train = self.prepare_training_data(features, outcomes) self.model.fit(X_train, y_train) # Score potential donors donor_scores = self.model.predict(donor_features) best_donors = np.argsort(donor_scores)[-10:] # Top 10 return best_donors # Use ML-enhanced selection ml_selector = MLDonorSelector() analyzer.set_donor_selector(ml_selector) ``` This advanced guide covers the mathematical foundations, production deployment options, and research extensions for GeoLift. For practical usage, start with the [Quick Start Guide](QUICK_START.md) and [User Guide](USER_GUIDE.md).