from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GeneralizedLinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, lit, log, col
from pyspark.ml.feature import StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
This was run on a big machine where I could allocate 100GB RAM to both the executor and driver. You'll want to modify for your system specs.
I changed from the defaults to avoid an issue I was having where the Java runtime only had access to a few GB RAM by default.
I am running on a single big machine instead of clusters due to available resources (not really the idea of Pyspark, I know...but its the hardware I have).
spark = SparkSession \
.builder \
.appName("covid weather") \
.config('spark.executor.cores', '20') \
.config("spark.executor.memory", '100g') \
.config("spark.driver.memory", '100g') \
.getOrCreate()
Source data is created in r_data_acquisition.ipynb
. See that notebook for details.
df = spark.read.csv('cov_weather_mobility.csv', inferSchema = True, header = True)
df.limit(5).toPandas()
df.count()
df.printSchema()
We end up with a row for each county for each day with columns containing COVID-19 metrics, weather features, and mobility data.
Subsetting to date range of interest:
df = df.filter(df.YEARMODA >= '2020-06-01')
df = df.filter(df.YEARMODA < '2021-03-01')
Some large outliers in mobility metric and infectious probaility. Infectious probability outliers come from COVID data reporting issues. From what I can see in the Google mobility project, there are similar strange outliers that come from the data collection methodology.
df = df.filter(df.m50_index < 500)
df = df.filter(df.infect_prob < 100)
df = df.filter(df.infect_prob > 0)
We also want to drop any rows without infectious probability data since this is our dependent variable. Again, these errors come from excentricities in the COVID-19 reporting data.
df = df.na.drop(subset = ['infect_prob'])
Down to ~660k rows after basic filtering.
df.count()
If there is a relationship between weather/mobility and COVID, we would expect a lag roughly equal to the incubation period of the virus. We estimate this to be 7 days, which appears to be the median incubation period of SARS-COV-2.
We need to window by county (FIPS code) and maintain the date order.
df = df.withColumn("infect_prob_7day_lag",lead(df.infect_prob,7).over(Window.partitionBy("FIPS").orderBy("YEARMODA")))
#first few rows now null for infectprob after the lag
df = df.na.drop(subset = ['infect_prob_7day_lag'])
df.sort(['FIPS', 'YEARMODA']).select(['YEARMODA','FIPS','infect_prob','infect_prob_7day_lag']).limit(15).toPandas()
As you can see here, our dependent variable (infect_prob_7day_lag
) represents what the infectious probability will be 7 days in the future in a given county. We are going to be predicting what happens in the future (7 days) given what we know now.
Now we need to select features and assemble them into a single vector column.
features = ['TEMP','PRCP','RH','VISIB','MXSPD','GUST','m50_index']
assembler = VectorAssembler(inputCols = features, outputCol = 'features', handleInvalid = 'skip')
df_feat = assembler.transform(df)
df_feat.select('features').limit(5).toPandas()
Next, we scale the features:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)
scale_mod = scaler.fit(df_feat)
df_scaled = scale_mod.transform(df_feat)
df_scaled.select('scaled_features').limit(5).toPandas()
df_scaled = df_scaled.withColumn("infect_prob_7day_lag", log(10.0,"infect_prob_7day_lag"))
Due to the size of the data, we can downsample by half and still have ~210k rows. In practice this sped up our grid search for tuning hyperparameters.
We need to stratify the downsampling by county to make sure we don't miss or overrepresent any areas.
fractions = df_scaled.select("FIPS").distinct().withColumn("fraction", lit(0.5)).rdd.collectAsMap()
df_scaled_sample = df_scaled.sampleBy("FIPS", fractions)
df_scaled_sample.count()
Splitting to train/test sets:
train, test = df_scaled_sample.randomSplit([0.7, 0.3])
train.limit(5).toPandas()
df.describe().toPandas().transpose()
Here we look at the (non-scaled) variables to see what our ranges are. There is clearly some strange data in the COVID variables (see seven_day_percap), but we are not using these. The weather variables all seem to represent reasonable ranges of values.
Looking at some correlations, it looks like temperature has some negative correlation with COVID-19 infection, but it isn't particularly strong. Mobility also interestingly has a negative correlation. This could be due to the fact that restrictions on movement tend to correspond with increased infections. There is the potential for a causal loop with that variable - but we won't worry about that in this first modeling effort.
In general, it appears there is relatively low correlation across the board between the independent and dependent variables. This suggests there isn't a simple linear relationship that we are missing.
features = ['TEMP','PRCP','RH','VISIB','MXSPD','GUST','m50_index']
cor_df = pd.DataFrame(columns = ['variable', 'correlation_with_infectprob'])
for fe in features:
cor_df = cor_df.append({'variable': fe, 'correlation_with_infectprob': df.stat.corr('infect_prob', fe)}, ignore_index=True)
cor_df
fractions = df.select("FIPS").distinct().withColumn("fraction", lit(0.5)).rdd.collectAsMap()
df_local = df.sampleBy("FIPS", fractions).toPandas()
fig, ax = plt.subplots(3, 3, figsize = (16, 8), sharey = True, sharex = False)
ax[0,0].scatter(df_local.TEMP, df_local.infect_prob, s = 1)
ax[0,0].set_title('Temperature')
ax[0,1].scatter(df_local.PRCP, df_local.infect_prob, s = 1)
ax[0,1].set_title('Precipitation')
ax[0,2].scatter(df_local.RH, df_local.infect_prob, s = 1)
ax[0,2].set_title('Humidity')
ax[1,0].scatter(df_local.VISIB, df_local.infect_prob, s = 1)
ax[1,0].set_title('Visibility')
ax[1,1].scatter(df_local.MXSPD, df_local.infect_prob, s = 1)
ax[1,1].set_title('Max Windspeed')
ax[1,2].scatter(df_local.GUST, df_local.infect_prob, s = 1)
ax[1,2].set_title('Wind Gust')
ax[2,0].scatter(df_local.m50_index, df_local.infect_prob, s = 1)
ax[2,0].set_title('Mobility')
fig.delaxes(ax[2,1])
fig.delaxes(ax[2,2])
fig.text(0, 0.5, 'Infectious Probability', va='center', rotation='vertical', fontsize = 12)
fig.suptitle("Relationship between Features and Infectious Probability", fontsize=16)
plt.tight_layout()
plt.savefig('corr_plots.png', dpi=300, facecolor = 'w', transparent = False)
Because there is a time series component to the data, it is interesting to examine each variable over time.
Infectious probability has an initial summer peak before rising to an even higher plateau in the winter. Temperature generally falls as we move from summer to winder. The rest of the variables have some oscillations and it is possible there are some trends here, but nothing jumps out as immediately obvious.
It is important to keep in mind that these represent averages for all of these variables across the entire United States - which has a significant amount of climate variation.
df_avg = df.groupBy('YEARMODA').agg(F.avg('TEMP'), F.avg('infect_prob'), F.avg('PRCP'),
F.avg('VISIB'), F.avg('MXSPD'), F.avg('GUST'),
F.avg('RH'), F.avg('m50_index')).toPandas()
df_avg.columns = ['date','TEMP','infect_prob', 'PRCP', 'VISIB', 'MXSPD','GUST','RH', 'm50_index']
df_avg.date = pd.to_datetime(df_avg.date)
df_avg = df_avg.sort_values(by = 'date')
fig, ax = plt.subplots(3, 3, figsize = (16, 8), sharey = False, sharex = True)
fig.autofmt_xdate(bottom=0.2, rotation=45, ha='center')
ax[0,0].plot(df_avg.date, df_avg.infect_prob)
ax[0,0].set_title('infect_prob')
ax[0,1].plot(df_avg.date, df_avg.TEMP)
ax[0,1].set_title('TEMP')
ax[0,2].plot(df_avg.date, df_avg.PRCP)
ax[0,2].set_title('PRCP')
ax[1,0].plot(df_avg.date, df_avg.RH)
ax[1,0].set_title('RH')
ax[1,1].plot(df_avg.date, df_avg.VISIB)
ax[1,1].set_title('VISIB')
ax[1,2].plot(df_avg.date, df_avg.MXSPD)
ax[1,2].set_title('MXSPD')
ax[2,0].plot(df_avg.date, df_avg.GUST)
ax[2,0].set_title('GUST')
ax[2,1].plot(df_avg.date, df_avg.m50_index)
ax[2,1].set_title('m50_index')
ax[0,0].xaxis.set_major_locator(mdates.MonthLocator())
ax[0,0].xaxis.set_major_formatter(mdates.DateFormatter('%b %y'))
fig.delaxes(ax[2,2])
fig.suptitle("Timeseries Averages of all Features (and Infectious Probability)", fontsize=16)
plt.tight_layout()
plt.savefig('ts_plots.png', dpi=300, facecolor = 'w', transparent = False)
Our first task is to determine what class of models gives us the best results. We'll use Root Mean Squared Error (RMSE) and $R^2$ to compare performance because we are using all regression techniques (as we have a continuous dependent variable).
model_df = pd.DataFrame()
lr_mod = GeneralizedLinearRegression(labelCol = 'infect_prob_7day_lag', featuresCol = 'scaled_features')
lr = lr_mod.fit(train)
pred = lr.transform(test)
evaluator = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'rmse')
rmse = evaluator.evaluate(pred)
evaluator2 = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'r2')
R2 = evaluator2.evaluate(pred)
model_df['model'] = ['Linear Regression']
model_df['rmse'] = [rmse]
model_df['R2'] = [R2]
actual = np.array(pred.select("infect_prob_7day_lag").collect())
predicted = np.array(pred.select("prediction").collect())
plt.figure(figsize=(8,8))
plt.plot([0,predicted.max()],[0, predicted.max()], color = 'k', ls='--') # Diag. Line
plt.scatter(actual, predicted )
plt.xlabel("Actual")
plt.ylabel("Predicted")
plt.title("Predicted vs Actual: Generalized Linear Regression")
plt.show()
plt.figure(figsize=(8,8))
plt.scatter(predicted, actual - predicted )
plt.axhline(0)
plt.ylabel("Residuals")
plt.xlabel("Predicted")
plt.title("Predicted vs Residuals: Generalized Linear Regression")
plt.show()
mod = RandomForestRegressor(labelCol = 'infect_prob_7day_lag', featuresCol = 'scaled_features')
rf = mod.fit(train)
pred = rf.transform(test)
evaluator = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'rmse')
rmse = evaluator.evaluate(pred)
evaluator2 = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'r2')
R2 = evaluator2.evaluate(pred)
model_df = model_df.append({'model': 'Random Forest', 'rmse': rmse , 'R2' : R2}, ignore_index=True)
actual = np.array(pred.select("infect_prob_7day_lag").collect())
predicted = np.array(pred.select("prediction").collect())
plt.figure(figsize=(8,8))
plt.scatter(actual, predicted )
plt.plot([0,predicted.max()],[0, predicted.max()], color = 'k', ls='--') # Diag. Line
plt.xlabel("Actual")
plt.ylabel("Predicted")
plt.title("Predicted vs Actual: Random Forest Regression")
plt.show()
plt.figure(figsize=(8,8))
plt.scatter(predicted, actual - predicted )
plt.axhline(0)
plt.ylabel("Residuals")
plt.xlabel("Predicted")
plt.title("Predicted vs Residuals: Random Forest Regression")
plt.show()
mod = GBTRegressor(labelCol = 'infect_prob_7day_lag', featuresCol = 'scaled_features')
lr = mod.fit(train)
pred = lr.transform(test)
evaluator = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'rmse')
rmse = evaluator.evaluate(pred)
evaluator2 = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'r2')
R2 = evaluator2.evaluate(pred)
model_df = model_df.append({'model': 'GBT', 'rmse': rmse, 'R2' : R2}, ignore_index=True)
actual = np.array(pred.select("infect_prob_7day_lag").collect())
predicted = np.array(pred.select("prediction").collect())
plt.figure(figsize=(8,8))
plt.scatter(actual, predicted )
plt.plot([0,predicted.max()],[0, predicted.max()], color = 'k', ls='--') # Diag. Line
plt.xlabel("Actual")
plt.ylabel("Predicted")
plt.title("Predicted vs Actual: Gradient Boosted Trees")
plt.show()
plt.figure(figsize=(8,8))
plt.scatter(predicted, actual - predicted )
plt.axhline(0)
plt.ylabel("Residuals")
plt.xlabel("Predicted")
plt.title("Predicted vs Residuals: Gradient Boosted Trees")
plt.show()
mod = DecisionTreeRegressor(labelCol = 'infect_prob_7day_lag', featuresCol = 'scaled_features')
lr = mod.fit(train)
pred = lr.transform(test)
evaluator = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'rmse')
rmse = evaluator.evaluate(pred)
evaluator2 = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'r2')
R2 = evaluator2.evaluate(pred)
model_df = model_df.append({'model': 'Decision Tree Regressor', 'rmse': rmse, 'R2' : R2}, ignore_index=True)
actual = np.array(pred.select("infect_prob_7day_lag").collect())
predicted = np.array(pred.select("prediction").collect())
plt.figure(figsize=(8,8))
plt.scatter(actual, predicted )
plt.plot([0,predicted.max()],[0, predicted.max()], color = 'k', ls='--') # Diag. Line
plt.xlabel("Actual")
plt.ylabel("Predicted")
plt.title("Predicted vs Actual: Decision Tree Regressor")
plt.show()
plt.figure(figsize=(8,8))
plt.scatter(predicted, actual - predicted )
plt.axhline(0)
plt.ylabel("Residuals")
plt.xlabel("Predicted")
plt.title("Predicted vs Residuals: Decision Tree Regressor")
plt.show()
model_df
The GBT model had the best performance in terms of RMSE and $R^2$. We expected the tree models to outperform linear regression given the large number of variables and lack of clear linear relationships in our initial evaluation. It was surprising that GBT outperformed the other tree-based models to the degree that it did.
It is clear from the above plots and the RMSE/$R^2$ results that there is significant variation in infectious probability that is not accounted for by our models. We expected this to be the case given the large number of omitted variables that likely affect COVID-19 spread. We were, however, satisfied that our models have some predictive ability even when just including weather and mobility data.
The best peformance seems to be from GBT. Given additional time, we would probably want to tune hyperparemeters for all of these models to see if we can get improvements, but for something with the scope of this project, we will continue with GBT as our champion model.
The hyperparameters we will look at are the maximum iterations and max depth of the trees. Again, we would probably explore a larger grid if given access to more computing power. We will focus on a robust set of parameters (3 different max iterations and 2 different max depths) that is small enough to run in a reasonable amount of time with our available resources.
source: https://docs.azuredatabricks.net/_static/notebooks/gbt-regression.html
Note, the below code takes a long time to run. You may need to modify setParallelism()
based on your own hardware.
mod = GBTRegressor(labelCol = 'infect_prob_7day_lag', featuresCol = 'scaled_features')
evaluator = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'rmse')
evaluator2 = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'r2')
paramGrid = ParamGridBuilder() \
.addGrid(mod.maxIter, [10, 50, 100]) \
.addGrid(mod.maxDepth, [2, 5]) \
.build()
crossval = CrossValidator(estimator=mod,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=4)
cvModel = crossval.setParallelism(20).fit(train)
predictions = cvModel.transform(test)
best_mod = cvModel.bestModel
best_mod.getOrDefault('maxIter')
best_mod.getOrDefault('maxDepth')
rmse = evaluator.evaluate(predictions)
rmse
r2 = evaluator2.evaluate(predictions)
r2
100 max iterations and 5 depth seem to produce the best results. As mentioned above, we might want to explore even more iterations or depths. We ran a few tests with larger values in separate notebooks. Some of these runs took multiple days to complete and resulted in modest gains to RMSE and $R^2$. The values we found here seem to balance computation time and quality well.
Because we're down to a single model with set parameters, we can now run over the full dataset without creating too much computational burden.
train, test = df_scaled.randomSplit([0.7, 0.3])
mod = GBTRegressor(labelCol = 'infect_prob_7day_lag', featuresCol = 'scaled_features',
maxIter = best_mod.getOrDefault('maxIter'), maxDepth = best_mod.getOrDefault('maxDepth'))
lr = mod.fit(train)
pred = lr.transform(test)
evaluator = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'rmse')
rmse = evaluator.evaluate(pred)
evaluator2 = RegressionEvaluator(labelCol = 'infect_prob_7day_lag', predictionCol = 'prediction', metricName = 'r2')
r2 = evaluator2.evaluate(pred)
rmse
r2
We find the RMSE and $R^2$ values to be similar to those we found when tuning hyperparameters and using cross validation.
imp_df = pd.DataFrame({'importance': lr.featureImportances, 'features': features})
imp_df = imp_df.sort_values(by = 'importance', ascending=False)
x_values = list(range(len(imp_df.importance)))
plt.figure(figsize = (10, 5))
plt.bar(x_values, imp_df.importance, orientation = 'vertical')
plt.xticks(x_values, imp_df.features, rotation=40)
plt.ylabel('Importance', fontsize = 14)
plt.xlabel('Feature', fontsize = 14)
plt.title('Feature Importances', fontsize = 16);
plt.savefig('feature_importance.png', dpi=300, facecolor = 'w', transparent = False)
It looks like tempurature is by far the most important features in our GPT model. We expected that mobility (MXSPD) would have a much greater effect. It seems like we have some evidence that temperature is important variable in the spread of COVID, even beyond the fact that mobility is affected by temperature.
chicago = pred.filter(pred.FIPS == '17031')
chicago = chicago.toPandas()
chicago.YEARMODA = pd.to_datetime(chicago.YEARMODA)
fig, ax = plt.subplots(figsize = (10, 5))
ax.plot(chicago.YEARMODA, chicago.infect_prob_7day_lag, label = 'COVID')
ax.plot(chicago.YEARMODA, chicago.prediction, label = 'Prediction')
ax.xaxis.set_major_locator(mdates.MonthLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %y'))
ax.legend()
ax.set_title(label = 'Lagged COVID-19 Infectious Probability vs. Prediction for Cook County (Chicago)', fontsize = 14)
plt.savefig('chicago_predict.png', dpi=300, facecolor = 'w', transparent = False)
It is clear that our method is causing predictions that jump around quite a bit. We also notably lag the spike by a bit, but we do everntually catch up. This is likely due to the fact that our model doesn't consider time in any meaningful way. There are some timeseries approaches that I've been reading about, but many of them do not appear to be implemented in Pyspark. This is definitely in the "future work" category, in my opinion. Our model makes predictions that are somewhat erratic, but still interesting. Moreover, we were more interested in using this as a kind of screening experiment ot see if weather features were related to COVID-19 spread - accurate prediction wasn't ever really the goal. I think we have more than enough here to suggest COVID and temperature might be related in a way that goes beyond people tending to move around more in warmer temperatures.
Also in "future work", I think this problem would benefit from a hierarchical modeling framework like the ones that they teach in the Bayesian ML course in this department. The idea would be to have some commonality in the parameters that cause COVID to spread, but allow those to vary across the different counties to some degree to account for things like different attitudes toward the pandemic across the country. It is important to note that the Bayesian methods are very computationally intensive and data of this size may not work well there either. I'm not aware of hierarchical methods available in Pyspark, but it could be worth looking into.
Another thing I would want to explore is deeper trees. We ran into some computational hurdles with the timeline of this project that limited the depth of tree we were able to explore. It seems like we're getting pretty marginal returns to increases in max tree depth that are not worth the extra computation time, but for a true study, it would be worth exploring this further. With access to a better clustering system (e.g., AWS EMR), it might be more practical to explore these hyperparameter spaces.