Advanced Topics

Deep dive into the mathematical foundations, advanced configurations, and production deployment of GeoLift.

Mathematical Background

Synthetic Control Method

The synthetic control method creates a weighted combination of control units to serve as a counterfactual for treated units. For a treated unit \(j\), the synthetic control is:

\[\hat{Y}_{jt}^{synthetic} = \sum_{i \in \mathcal{D}} w_i Y_{it}\]

where \(\mathcal{D}\) denotes the donor pool (control units), and \(w_i\) are donor weights that minimize the pre-treatment prediction error:

\[\min_{w} \sum_{t=1}^{T_0} \left(Y_{jt} - \sum_{i \in \mathcal{D}} w_i Y_{it}\right)^2\]

subject to the constraints:

  • \(\sum_{i \in \mathcal{D}} w_i = 1\) (weights sum to one)

  • \(w_i \geq 0\) for all \(i \in \mathcal{D}\) (non-negative weights)

where \(T_0\) denotes the last pre-treatment period.

SparseSC Enhancement

SparseSC improves traditional synthetic control by jointly optimizing:

  1. Feature weights (V-matrix): Diagonal matrix determining which pre-treatment features matter most

  2. Unit weights (W-matrix): Vector of weights for combining control units

The optimization problem becomes:

\[\min_{V,W} \|X_1 - X_0 W\|_V^2 + \lambda \|W\|_1\]

where:

  • \(X_1 \in \mathbb{R}^{K \times 1}\) represents \(K\) pre-treatment features for the treated unit

  • \(X_0 \in \mathbb{R}^{K \times N}\) represents \(K\) pre-treatment features for \(N\) control units

  • \(W \in \mathbb{R}^{N \times 1}\) is the vector of unit weights

  • \(V \in \mathbb{R}^{K \times K}\) is a diagonal matrix of feature weights

  • \(\lambda > 0\) is the regularization parameter controlling sparsity

  • \(\|x\|_V^2 = x^T V x\) is the V-weighted squared norm

  • \(\|W\|_1 = \sum_{i=1}^{N} |w_i|\) is the L1 norm promoting sparsity

Statistical Inference

Bootstrap Inference

Resample the data with replacement to create empirical distribution of treatment effects:

# Bootstrap procedure for statistical inference
bootstrap_effects = []
n_bootstrap = 1000  # Number of bootstrap iterations

for b in range(n_bootstrap):
    # Resample control units with replacement
    boot_indices = np.random.choice(len(control_units), 
                                   size=len(control_units), 
                                   replace=True)
    boot_data = original_data[boot_indices]
    
    # Refit synthetic control model
    boot_effect = fit_and_estimate(boot_data, treated_unit)
    bootstrap_effects.append(boot_effect)

# Calculate 95% confidence intervals (percentile method)
ci_lower = np.percentile(bootstrap_effects, 2.5)
ci_upper = np.percentile(bootstrap_effects, 97.5)

# Standard error estimation
se_bootstrap = np.std(bootstrap_effects)

Placebo Inference (Permutation Test)

Test significance by applying the same method to untreated units:

# Placebo test procedure for inference
placebo_effects = []
true_effect = estimate_effect_for_unit(treated_unit)

for control_unit in control_units:
    # Pretend control unit received treatment at same time
    # Use remaining controls as donor pool
    remaining_controls = [c for c in control_units if c != control_unit]
    placebo_effect = fit_synthetic_control(
        treated=control_unit,
        controls=remaining_controls,
        pre_period=pre_period,
        post_period=post_period
    )
    placebo_effects.append(placebo_effect)

# Calculate two-sided p-value using Fisher's exact test logic
# Proportion of placebo effects as extreme as true effect
p_value = np.mean(np.abs(placebo_effects) >= np.abs(true_effect))

# Alternative: one-sided test if direction is hypothesized
p_value_right = np.mean(placebo_effects >= true_effect)
p_value_left = np.mean(placebo_effects <= true_effect)

Advanced Configuration

Custom Donor Selection

# Manual donor specification
analyzer.set_custom_donors(
    donor_markets=[501, 505, 506, 507, 508, 509, 510],
    donor_weights=[0.25, 0.20, 0.15, 0.15, 0.10, 0.10, 0.05]
)

# Geographic constraints
analyzer.set_donor_constraints(
    exclude_regions=['West Coast'],  # Exclude specific regions
    min_distance_km=500,            # Minimum distance from treatment
    max_correlation=0.95            # Avoid overly similar markets
)

# Time-varying donor weights
analyzer.set_time_varying_donors(
    pre_period_donors=[501, 502, 503],
    post_period_donors=[504, 505, 506]  # Different donors for different periods
)

Multiple Treatment Cohorts

# Staggered adoption design
cohort_config = {
    'cohort_1': {
        'markets': [502, 503],
        'start_date': '2023-06-01',
        'end_date': '2023-08-31'
    },
    'cohort_2': {
        'markets': [504, 505],
        'start_date': '2023-07-01', 
        'end_date': '2023-09-30'
    },
    'cohort_3': {
        'markets': [506, 507],
        'start_date': '2023-08-01',
        'end_date': '2023-10-31'
    }
}

results = analyzer.analyze_staggered_adoption(cohort_config)

Regularization and Model Selection

# Cross-validation for optimal regularization
from sparsesc.cross_validation import CV_score

# Grid search over lambda values
lambda_grid = np.logspace(-4, 2, 20)
cv_scores = []

for lam in lambda_grid:
    score = CV_score(
        X=features,
        Y=outcomes, 
        treated_units=treated,
        control_units=controls,
        lambda_=lam,
        cv_folds=5
    )
    cv_scores.append(score)

optimal_lambda = lambda_grid[np.argmin(cv_scores)]

Performance Optimization

Large-Scale Analysis

# Parallel processing for multiple markets
from multiprocessing import Pool
import numpy as np

def analyze_market(market_config):
    analyzer = GeoLiftAnalyzer()
    return analyzer.run_analysis(**market_config)

# Process multiple markets in parallel
market_configs = [config1, config2, config3, ...]
with Pool(processes=4) as pool:
    results = pool.map(analyze_market, market_configs)

Memory Optimization

# Chunked processing for large datasets
def process_large_dataset(data_path, chunk_size=10000):
    results = []
    for chunk in pd.read_csv(data_path, chunksize=chunk_size):
        # Process chunk
        chunk_result = process_chunk(chunk)
        results.append(chunk_result)
    
    return combine_results(results)

# Sparse matrix operations for efficiency
from scipy.sparse import csr_matrix

# Convert to sparse format for large feature matrices
X_sparse = csr_matrix(X_dense)
results = fit_sparse(X_sparse, Y, treated_units, control_units)

Caching and Persistence

# Cache intermediate results
import joblib

# Save fitted model
joblib.dump(fitted_model, 'models/geolift_model.pkl')

# Load cached model
cached_model = joblib.load('models/geolift_model.pkl')

# Incremental updates
def incremental_update(existing_model, new_data):
    # Update model with new data without full refit
    updated_model = existing_model.update(new_data)
    return updated_model

Production Deployment

Azure Batch Processing

# Azure Batch configuration
batch_config = {
    'pool_id': 'geolift-pool',
    'vm_size': 'Standard_D4s_v3',
    'node_count': 10,
    'container_image': 'geolift:latest'
}

# Submit batch job
from geolift.azure_batch import BatchProcessor

processor = BatchProcessor(batch_config)
job_id = processor.submit_analysis_job(
    campaigns=campaign_list,
    output_container='results'
)

# Monitor progress
status = processor.get_job_status(job_id)

Docker Deployment

# Dockerfile for GeoLift
FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY src/ ./src/
COPY configs/ ./configs/

CMD ["python", "-m", "geolift.cli", "analyze", "--config", "configs/production.yaml"]

API Service

# FastAPI service for GeoLift analysis
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel

app = FastAPI()

class AnalysisRequest(BaseModel):
    data_path: str
    treatment_markets: list
    treatment_start_date: str
    treatment_end_date: str

@app.post("/analyze")
async def run_analysis(request: AnalysisRequest, background_tasks: BackgroundTasks):
    # Queue analysis job
    job_id = queue_analysis_job(request)
    return {"job_id": job_id, "status": "queued"}

@app.get("/results/{job_id}")
async def get_results(job_id: str):
    results = fetch_results(job_id)
    return results

Custom Extensions

Custom Inference Methods

class BayesianInference:
    """
    Bayesian inference for treatment effects using conjugate normal prior.
    Assumes: Effect ~ N(μ, σ²) with prior μ ~ N(μ₀, τ²)
    """
    def __init__(self, prior_mean=0, prior_var=1):
        self.prior_mean = prior_mean  # μ₀
        self.prior_var = prior_var    # τ²
    
    def calculate_posterior(self, observed_effect, se):
        """
        Calculate posterior distribution using Bayes' theorem.
        Posterior precision = prior precision + data precision
        """
        # Precision-weighted updating
        prior_precision = 1 / self.prior_var
        data_precision = 1 / (se ** 2)
        
        # Posterior variance (inverse of summed precisions)
        posterior_var = 1 / (prior_precision + data_precision)
        
        # Posterior mean (precision-weighted average)
        posterior_mean = posterior_var * (
            self.prior_mean * prior_precision + 
            observed_effect * data_precision
        )
        return posterior_mean, posterior_var
    
    def credible_interval(self, posterior_mean, posterior_var, level=0.95):
        """
        Calculate Bayesian credible interval (highest posterior density).
        For normal posterior, this equals the equal-tailed interval.
        """
        from scipy.stats import norm
        alpha = 1 - level
        z_critical = norm.ppf(1 - alpha/2)
        margin = z_critical * np.sqrt(posterior_var)
        return (posterior_mean - margin, posterior_mean + margin)

# Use custom inference with informative prior
analyzer.set_inference_method(
    BayesianInference(prior_mean=0.05, prior_var=0.01)
)

Custom Outcome Models

class HierarchicalModel:
    def __init__(self, group_effects=True):
        self.group_effects = group_effects
    
    def fit(self, X, Y, groups=None):
        # Hierarchical modeling with group effects
        if self.group_effects and groups is not None:
            # Fit mixed effects model
            model = self.fit_mixed_effects(X, Y, groups)
        else:
            # Standard model
            model = self.fit_standard(X, Y)
        return model
    
    def predict(self, model, X_new, groups_new=None):
        # Generate predictions with uncertainty
        predictions = model.predict(X_new)
        prediction_intervals = model.prediction_interval(X_new)
        return predictions, prediction_intervals

# Use custom model
analyzer.set_outcome_model(HierarchicalModel(group_effects=True))

Debugging and Diagnostics

Advanced Diagnostics

# Comprehensive diagnostic suite
def run_full_diagnostics(analyzer, results):
    diagnostics = {}
    
    # Pre-period fit quality
    diagnostics['pre_fit'] = analyzer.evaluate_pre_fit()
    
    # Residual analysis
    diagnostics['residuals'] = analyzer.analyze_residuals()
    
    # Influence analysis
    diagnostics['influence'] = analyzer.donor_influence_analysis()
    
    # Robustness checks
    diagnostics['robustness'] = analyzer.robustness_checks()
    
    # Cross-validation
    diagnostics['cv_performance'] = analyzer.cross_validate()
    
    return diagnostics

# Generate diagnostic report
diagnostics = run_full_diagnostics(analyzer, results)
analyzer.export_diagnostic_report(diagnostics, 'diagnostic_report.html')

Sensitivity Analysis

# Systematic sensitivity testing
sensitivity_tests = {
    'pre_period_length': [12, 16, 20, 24],
    'donor_threshold': [0.6, 0.7, 0.8, 0.9],
    'regularization': [0.001, 0.01, 0.1, 1.0],
    'inference_method': ['bootstrap', 'placebo', 'jackknife']
}

sensitivity_results = {}
for param, values in sensitivity_tests.items():
    param_results = []
    for value in values:
        # Update configuration
        config_copy = config.copy()
        config_copy[param] = value
        
        # Run analysis
        result = analyzer.run_analysis(**config_copy)
        param_results.append({
            'value': value,
            'effect': result.relative_lift,
            'p_value': result.p_value
        })
    
    sensitivity_results[param] = param_results

# Visualize sensitivity
analyzer.plot_sensitivity_analysis(sensitivity_results)

Research Extensions

Difference-in-Differences Integration

# Combine synthetic control with DiD for robust causal inference
class SyntheticDiD:
    """
    Synthetic Difference-in-Differences (SDID) estimator.
    Combines SC weights with DiD time weights for double robustness.
    """
    def __init__(self, sc_weight=0.7, did_weight=0.3):
        self.sc_weight = sc_weight
        self.did_weight = did_weight
    
    def fit(self, data, treatment_markets, control_markets):
        # Fit synthetic control weights
        sc_weights = self.compute_sc_weights(
            data, treatment_markets, control_markets
        )
        
        # Compute DiD estimator
        # ATT = E[Y₁(1) - Y₀(1)|D=1] where D is treatment indicator
        did_effect = self.compute_did(
            data, treatment_markets, control_markets
        )
        
        # Compute SC estimator with optimal weights
        sc_effect = self.compute_sc_effect(
            data, treatment_markets, control_markets, sc_weights
        )
        
        # Weighted average of estimators (ensemble approach)
        combined_effect = (
            self.sc_weight * sc_effect + 
            self.did_weight * did_effect
        )
        
        # Compute variance using Delta method
        combined_variance = (
            self.sc_weight**2 * sc_effect.variance +
            self.did_weight**2 * did_effect.variance
        )
        
        return combined_effect, np.sqrt(combined_variance)
    
    def compute_did(self, data, treated, controls):
        """
        Classic 2x2 DiD: (Ȳ₁ᵗʳᵉᵃᵗ - Ȳ₀ᵗʳᵉᵃᵗ) - (Ȳ₁ᶜᵒⁿᵗʳᵒˡ - Ȳ₀ᶜᵒⁿᵗʳᵒˡ)
        """
        # Pre and post period means
        y_treat_post = data[data.unit.isin(treated) & data.post].y.mean()
        y_treat_pre = data[data.unit.isin(treated) & ~data.post].y.mean()
        y_control_post = data[data.unit.isin(controls) & data.post].y.mean()
        y_control_pre = data[data.unit.isin(controls) & ~data.post].y.mean()
        
        # DiD estimate
        did = (y_treat_post - y_treat_pre) - (y_control_post - y_control_pre)
        return did

# Use hybrid approach with optimal weighting
hybrid_analyzer = SyntheticDiD(sc_weight=0.8, did_weight=0.2)

Machine Learning Integration

# ML-enhanced donor selection
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

class MLDonorSelector:
    def __init__(self, model=None):
        self.model = model or RandomForestRegressor(n_estimators=100)
    
    def select_donors(self, features, outcomes, treatment_markets):
        # Use ML to predict which donors will work best
        X_train, y_train = self.prepare_training_data(features, outcomes)
        self.model.fit(X_train, y_train)
        
        # Score potential donors
        donor_scores = self.model.predict(donor_features)
        best_donors = np.argsort(donor_scores)[-10:]  # Top 10
        
        return best_donors

# Use ML-enhanced selection
ml_selector = MLDonorSelector()
analyzer.set_donor_selector(ml_selector)

This advanced guide covers the mathematical foundations, production deployment options, and research extensions for GeoLift. For practical usage, start with the Quick Start Guide and User Guide.