logo资料库

新冠数据整理和简单分析(二)——SIR及其变种.pdf

第1页 / 共8页
第2页 / 共8页
第3页 / 共8页
第4页 / 共8页
第5页 / 共8页
第6页 / 共8页
第7页 / 共8页
第8页 / 共8页
资料共8页,全文预览结束
新冠数据整理和简单分析(二)——SIR及其变种 新冠数据整理和简单分析(二) 及其变种 新冠数据整理和简单分析(二)——使用使用SIR及其变种 新冠数据整理和简单分析(二) 及其变种 这篇文章主要是想介绍一下使用SIR模型对新冠病毒传播建模。在数据分析方面的研究目前绝大多数都是基于SIR模型变种来模拟病毒传播的过程。所以,我准备 以新冠病毒的数据为例,简单介绍一下SIR以及其变种的应用。 准备工作 准备工作 数据来源和参考 数据来源和参考 SIR模型是一个简单的传染病模型,它将人群分为三类,分别是易感染者(Susceptibles)、感染者(Infectives)、移除者(Removed)。为了得到相应这三类人 群的数据,我通过Kaggle的开源数据集对当前的数据进行了补充。以下是我的数据链接。 病例数据 人口 人口结构 管控措施 在本文的前半部分,我主要参考了Lisphilar的notebook。而后半部分我主要参考了几篇不错的COVID19传播建模论文。跟大家分享一下我的收获。 使用的工具和包 使用的工具和包 from collections import defaultdict from datetime import timedelta, datetime from dateutil.relativedelta import relativedelta from pprint import pprint import warnings from fbprophet import Prophet from fbprophet.plot import add_changepoints_to_plot import pystan.misc # in model.fit(): AttributeError: module 'pystan' has no attribute 'misc' import matplotlib.pyplot as plt import matplotlib.cm as cm import matplotlib from matplotlib.ticker import ScalarFormatter %matplotlib inline import numpy as np import optuna optuna.logging.disable_default_handler() import pandas as pd import dask.dataframe as dd pd.plotting.register_matplotlib_converters() import seaborn as sns from scipy.integrate import solve_ivp from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans 定义函数和方法 定义函数和方法 为了简化介绍和缩短文章长度,我只将主要的函数放在这里。 SIR模型模型 class SIR(ModelBase): NAME = "SIR" VARIABLES = ["x", "y", "z"] PRIORITIES = np.array([1, 1, 1]) MONOTONIC = ["z"] def __init__(self, rho, sigma): super().__init__() self.rho = rho self.sigma = sigma def __call__(self, t, X): # x, y, z = [X[i] for i in range(len(self.VARIABLES))] # dxdt = - self.rho * x * y # dydt = self.rho * x * y - self.sigma * y # dzdt = self.sigma * y dxdt = - self.rho * X[0] * X[1] dydt = self.rho * X[0] * X[1] - self.sigma * X[1] dzdt = self.sigma * X[1] return np.array([dxdt, dydt, dzdt]) @classmethod def param_dict(cls, train_df_divided=None, q_range=None): param_dict = super().param_dict() q_range = super().QUANTILE_RANGE[:] if q_range is None else q_range if train_df_divided is not None: df = train_df_divided.copy() # rho = - (dx/dt) / x / y rho_series = 0 - df["x"].diff() / df["t"].diff() / df["x"] / df["y"] param_dict["rho"] = rho_series.quantile(q_range) # sigma = (dz/dt) / y sigma_series = df["z"].diff() / df["t"].diff() / df["y"] param_dict["sigma"] = sigma_series.quantile(q_range) return param_dict param_dict["rho"] = (0, 1) param_dict["sigma"] = (0, 1) return param_dict @staticmethod def calc_variables(df): df["X"] = df["Susceptible"] df["Y"] = df["Infected"] df["Z"] = df["Recovered"] + df["Fatal"] return df.loc[:, ["T", "X", "Y", "Z"]] @staticmethod def calc_variables_reverse(df): df["Susceptible"] = df["X"] df["Infected"] = df["Y"] df["Recovered/Deaths"] = df["Z"] return df def calc_r0(self): if self.sigma == 0: return np.nan r0 = self.rho / self.sigma return round(r0, 2) def calc_days_dict(self, tau): _dict = dict() _dict["1/beta [day]"] = int(tau / 24 / 60 / self.rho) _dict["1/gamma [day]"] = int(tau / 24 / 60 / self.sigma) return _dict SIRD模型模型
class SIRD(ModelBase): NAME = "SIR-D" VARIABLES = ["x", "y", "z", "w"] PRIORITIES = np.array([1, 10, 10, 2]) MONOTONIC = ["z", "w"] def __init__(self, kappa, rho, sigma): super().__init__() self.kappa = kappa self.rho = rho self.sigma = sigma def __call__(self, t, X): # x, y, z, w = [X[i] for i in range(len(self.VARIABLES))] # dxdt = - self.rho * x * y # dydt = self.rho * x * y - (self.sigma + self.kappa) * y # dzdt = self.sigma * y # dwdt = self.kappa * y dxdt = - self.rho * X[0] * X[1] dydt = self.rho * X[0] * X[1] - (self.sigma + self.kappa) * X[1] dzdt = self.sigma * X[1] dwdt = self.kappa * X[1] return np.array([dxdt, dydt, dzdt, dwdt]) @classmethod def param_dict(cls, train_df_divided=None, q_range=None): param_dict = super().param_dict() q_range = super().QUANTILE_RANGE[:] if q_range is None else q_range if train_df_divided is not None: df = train_df_divided.copy() # kappa = (dw/dt) / y kappa_series = df["w"].diff() / df["t"].diff() / df["y"] param_dict["kappa"] = kappa_series.quantile(q_range) # rho = - (dx/dt) / x / y rho_series = 0 - df["x"].diff() / df["t"].diff() / df["x"] / df["y"] param_dict["rho"] = rho_series.quantile(q_range) # sigma = (dz/dt) / y sigma_series = df["z"].diff() / df["t"].diff() / df["y"] param_dict["sigma"] = sigma_series.quantile(q_range) return param_dict param_dict["kappa"] = (0, 1) param_dict["rho"] = (0, 1) param_dict["sigma"] = (0, 1) return param_dict @staticmethod def calc_variables(df): df["X"] = df["Susceptible"] df["Y"] = df["Infected"] df["Z"] = df["Recovered"] df["W"] = df["Fatal"] return df.loc[:, ["T", "X", "Y", "Z", "W"]] @staticmethod def calc_variables_reverse(df): df["Susceptible"] = df["X"] df["Infected"] = df["Y"] df["Recovered"] = df["Z"] df["Deaths"] = df["W"] return df def calc_r0(self): try: r0 = self.rho / (self.sigma + self.kappa) except ZeroDivisionError: return np.nan return round(r0, 2) def calc_days_dict(self, tau): _dict = dict() if self.kappa == 0: _dict["1/alpha2 [day]"] = 0 else: _dict["1/alpha2 [day]"] = int(tau / 24 / 60 / self.kappa) _dict["1/beta [day]"] = int(tau / 24 / 60 / self.rho) if self.sigma == 0: _dict["1/gamma [day]"] = 0 else: _dict["1/gamma [day]"] = int(tau / 24 / 60 / self.sigma) return _dict SIRF模型模型 class SIRF(ModelBase): NAME = "SIR-F" VARIABLES = ["x", "y", "z", "w"] PRIORITIES = np.array([1, 10, 10, 2]) MONOTONIC = ["z", "w"] def __init__(self, theta, kappa, rho, sigma): super().__init__() self.theta = theta self.kappa = kappa self.rho = rho self.sigma = sigma def __call__(self, t, X): # x, y, z, w = [X[i] for i in range(len(self.VARIABLES))] # dxdt = - self.rho * x * y # dydt = self.rho * (1 - self.theta) * x * y - (self.sigma + self.kappa) * y # dzdt = self.sigma * y # dwdt = self.rho * self.theta * x * y + self.kappa * y dxdt = - self.rho * X[0] * X[1] dydt = self.rho * (1 - self.theta) * X[0] * X[1] - (self.sigma + self.kappa) * X[1] dzdt = self.sigma * X[1] dwdt = self.rho * self.theta * X[0] * X[1] + self.kappa * X[1] return np.array([dxdt, dydt, dzdt, dwdt]) @classmethod def param_dict(cls, train_df_divided=None, q_range=None): param_dict = super().param_dict() q_range = super().QUANTILE_RANGE[:] if q_range is None else q_range param_dict["theta"] = (0, 1) param_dict["kappa"] = (0, 1) if train_df_divided is not None: df = train_df_divided.copy() # rho = - (dx/dt) / x / y rho_series = 0 - df["x"].diff() / df["t"].diff() / df["x"] / df["y"] param_dict["rho"] = rho_series.quantile(q_range) # sigma = (dz/dt) / y sigma_series = df["z"].diff() / df["t"].diff() / df["y"] param_dict["sigma"] = sigma_series.quantile(q_range) return param_dict param_dict["rho"] = (0, 1) param_dict["sigma"] = (0, 1) return param_dict @staticmethod def calc_variables(df): df["X"] = df["Susceptible"] df["Y"] = df["Infected"] df["Z"] = df["Recovered"] df["W"] = df["Fatal"] return df.loc[:, ["T", "X", "Y", "Z", "W"]]
@staticmethod def calc_variables_reverse(df): df["Susceptible"] = df["X"] df["Infected"] = df["Y"] df["Recovered"] = df["Z"] df["Fatal"] = df["W"] return df def calc_r0(self): try: r0 = self.rho * (1 - self.theta) / (self.sigma + self.kappa) except ZeroDivisionError: return np.nan return round(r0, 2) def calc_days_dict(self, tau): _dict = dict() _dict["alpha1 [-]"] = round(self.theta, 3) if self.kappa == 0: _dict["1/alpha2 [day]"] = 0 else: _dict["1/alpha2 [day]"] = int(tau / 24 / 60 / self.kappa) _dict["1/beta [day]"] = int(tau / 24 / 60 / self.rho) if self.sigma == 0: _dict["1/gamma [day]"] = 0 else: _dict["1/gamma [day]"] = int(tau / 24 / 60 / self.sigma) return _dict SEWIRF模型模型 class SEWIRF(ModelBase): NAME = "SEWIR-F" VARIABLES = ["x1", "x2", "x3", "y", "z", "w"] PRIORITIES = np.array([0, 0, 0, 10, 10, 2]) MONOTONIC = ["z", "w"] def __init__(self, theta, kappa, rho1, rho2, rho3, sigma): super().__init__() self.theta = theta self.kappa = kappa self.rho1 = rho1 self.rho2 = rho2 self.rho3 = rho3 self.sigma = sigma def __call__(self, t, X): # x1, x2, x3, y, z, w = [X[i] for i in range(len(self.VARIABLES))] # dx1dt = - self.rho1 * x1 * (x3 + y) # dx2dt = self.rho1 * x1 * (x3 + y) - self.rho2 * x2 # dx3dt = self.rho2 * x2 - self.rho3 * x3 # dydt = self.rho3 * (1 - self.theta) * x3 - (self.sigma + self.kappa) * y # dzdt = self.sigma * y # dwdt = self.rho3 * self.theta * x3 + self.kappa * y dx1dt = - self.rho1 * X[0] * (X[2] + X[3]) dx2dt = self.rho1 * X[0] * (X[2] + X[3]) - self.rho2 * X[1] dx3dt = self.rho2 * X[1] - self.rho3 * X[2] dydt = self.rho3 * (1 - self.theta) * X[2] - (self.sigma + self.kappa) * X[3] dzdt = self.sigma * X[3] dwdt = self.rho3 * self.theta * X[2] + self.kappa * X[3] return np.array([dx1dt, dx2dt, dx3dt, dydt, dzdt, dwdt]) @classmethod def param_dict(cls, train_df_divided=None, q_range=None): param_dict = super().param_dict() q_range = super().QUANTILE_RANGE[:] if q_range is None else q_range param_dict["theta"] = (0, 1) param_dict["kappa"] = (0, 1) param_dict["rho1"] = (0, 1) param_dict["rho2"] = (0, 1) param_dict["rho3"] = (0, 1) if train_df_divided is not None: df = train_df_divided.copy() # sigma = (dz/dt) / y sigma_series = df["z"].diff() / df["t"].diff() / df["y"] param_dict["sigma"] = sigma_series.quantile(q_range) return param_dict param_dict["sigma"] = (0, 1) return param_dict @staticmethod def calc_variables(df): df["X1"] = df["Susceptible"] df["X2"] = 0 df["X3"] = 0 df["Y"] = df["Infected"] df["Z"] = df["Recovered"] df["W"] = df["Fatal"] return df.loc[:, ["T", "X1", "X2", "X3", "Y", "Z", "W"]] @staticmethod def calc_variables_reverse(df): df["Susceptible"] = df["X1"] df["Infected"] = df["Y"] df["Recovered"] = df["Z"] df["Fatal"] = df["W"] df["Exposed"] = df["X2"] df["Waiting"] = df["X3"] return df def calc_r0(self): try: r0 = self.rho1 * (1 - self.theta) / (self.sigma + self.kappa) except ZeroDivisionError: return np.nan return round(r0, 2) def calc_days_dict(self, tau): _dict = dict() _dict["alpha1 [-]"] = round(self.theta, 3) if self.kappa == 0: _dict["1/alpha2 [day]"] = 0 else: _dict["1/alpha2 [day]"] = int(tau / 24 / 60 / self.kappa) _dict["1/beta1 [day]"] = int(tau / 24 / 60 / self.rho1) _dict["1/beta2 [day]"] = int(tau / 24 / 60 / self.rho2) _dict["1/beta3 [day]"] = int(tau / 24 / 60 / self.rho3) if self.sigma == 0: _dict["1/gamma [day]"] = 0 else: _dict["1/gamma [day]"] = int(tau / 24 / 60 / self.sigma) return _dict SIRFV模型模型 class SIRFV(ModelBase): NAME = "SIR-FV"
VARIABLES = ["x", "y", "z", "w"] PRIORITIES = np.array([1, 10, 10, 2]) MONOTONIC = ["z", "w"] def __init__(self, theta, kappa, rho, sigma, omega=None, n=None, v_per_day=None): """ (n and v_per_day) or omega must be applied. @n : total population @v_par_day : vacctinated persons per day """ super().__init__() self.theta = theta self.kappa = kappa self.rho = rho self.sigma = sigma if omega is None: try: self.omega = float(v_per_day) / float(n) except TypeError: s = "Neither (n and va_per_day) nor omega must be applied!" raise TypeError(s) else: self.omega = float(omega) def __call__(self, t, X): # x, y, z, w = [X[i] for i in range(len(self.VARIABLES))] # x with vacctination dxdt = - self.rho * X[0] * X[1] - self.omega dxdt = 0 - X[0] if X[0] + dxdt < 0 else dxdt # y, z, w dydt = self.rho * (1 - self.theta) * X[0] * X[1] - (self.sigma + self.kappa) * X[1] dzdt = self.sigma * X[1] dwdt = self.rho * self.theta * X[0] * X[1] + self.kappa * X[1] return np.array([dxdt, dydt, dzdt, dwdt]) @classmethod def param_dict(cls, train_df_divided=None, q_range=None): param_dict = super().param_dict() q_range = super().QUANTILE_RANGE[:] if q_range is None else q_range param_dict["theta"] = (0, 1) param_dict["kappa"] = (0, 1) param_dict["omega"] = (0, 1) if train_df_divided is not None: df = train_df_divided.copy() # rho = - (dx/dt) / x / y rho_series = 0 - df["x"].diff() / df["t"].diff() / df["x"] / df["y"] param_dict["rho"] = rho_series.quantile(q_range) # sigma = (dz/dt) / y sigma_series = df["z"].diff() / df["t"].diff() / df["y"] param_dict["sigma"] = sigma_series.quantile(q_range) return param_dict param_dict["rho"] = (0, 1) param_dict["sigma"] = (0, 1) return param_dict @staticmethod def calc_variables(df): df["X"] = df["Susceptible"] df["Y"] = df["Infected"] df["Z"] = df["Recovered"] df["W"] = df["Fatal"] return df.loc[:, ["T", "X", "Y", "Z", "W"]] @staticmethod def calc_variables_reverse(df): df["Susceptible"] = df["X"] df["Infected"] = df["Y"] df["Recovered"] = df["Z"] df["Fatal"] = df["W"] df["Immuned"] = 1 - df[["X", "Y", "Z", "W"]].sum(axis=1) return df def calc_r0(self): try: r0 = self.rho * (1 - self.theta) / (self.sigma + self.kappa) except ZeroDivisionError: return np.nan return round(r0, 2) def calc_days_dict(self, tau): _dict = dict() _dict["alpha1 [-]"] = round(self.theta, 3) if self.kappa == 0: _dict["1/alpha2 [day]"] = 0 else: _dict["1/alpha2 [day]"] = int(tau / 24 / 60 / self.kappa) _dict["1/beta [day]"] = int(tau / 24 / 60 / self.rho) if self.sigma == 0: _dict["1/gamma [day]"] = 0 else: _dict["1/gamma [day]"] = int(tau / 24 / 60 / self.sigma) return _dict 模型简介和数据拟合结果展示 模型简介和数据拟合结果展示 接下里,我将分别使用上面定义的SIR模型及其变种来拟合美国的数据。 SIR模型模型 SIR模型是最基础的感染病模型。 %%time sir_estimator = Estimator( SIR, ncov_df, population_dict[critical_country], name=critical_country, excluded_places=[(critical_country, None)], start_date=critical_country_start ) sir_dict = sir_estimator.run() 比较由SIR计算出的感染人数与真实感染人数之间的区别。 sir_estimator.compare_graph()
预测未来几个月的情况。 sir_estimator.predict_graph(step_n=500000)
SIR-D模型模型 在原始的SIR模型中,死亡和痊愈都被纳入移除组(R)。但是在我们的数据中,我们有能力将死亡(D)和痊愈(R)区分开来,因此建立了SIR-D模型。 %%time sird_estimator = Estimator( SIRD, ncov_df, population_dict[critical_country], name=critical_country, excluded_places=[(critical_country, None)], start_date=critical_country_start ) sird_dict = sird_estimator.run() 比较由SIR-D计算出的感染人数与真实感染人数之间的区别。 sird_estimator.compare_graph()
预测未来几个月的情况。 sird_estimator.predict_graph(step_n=400) SIR-F模型模型 有些病例在临床诊断之前就被确定为致死病例了。所以,在这个模型里加入了从易感者到感染但未被收治者到致死病例这一条通路。 %%time sirf_estimator = Estimator( SIRF, ncov_df, population_dict[critical_country], name=critical_country, places=[(critical_country, None)], start_date=critical_country_start
) sirf_dict = sirf_estimator.run() 比较由SIR-F计算出的感染人数与真实感染人数之间的区别。 sirf_estimator.compare_graph() 效果非常不好,可能是这个模型的设计还存在缺陷,就先不预测了。 SEWIR-F模型模型 进一步描述感染的过程,感染的过程可以被分为,暴露期(S)、潜伏期(E)、等待期(W)和确诊期(I)。潜伏期的人群被认为是无传染能力的,而等待期的 人群已经度过潜伏期,但并未获得确诊而且带有感染能力。这个模型是基于泛化SEIR模型的变种。 模型的考虑因素(未完待续) 模型的考虑因素(未完待续) 作者:Shiyang_
分享到:
收藏