Code source de teachcompute.datasets.eurostat

import os
import gzip
import urllib.request
from typing import Optional
import numpy
import pandas
from .. import log


[docs] def mortality_table( to: str = ".", stop_at: Optional[int] = None, verbose: bool = False ) -> pandas.DataFrame: """ This function retrieves mortality table from EuroStat or INSEE. A copy is provided. The link is changing. :param to: data needs to be downloaded, location of this place :param stop_at: the overall process is quite long, if not None, it only keeps the first rows :return: data_frame The function checks the file final_name exists. If it is the case, the data is not downloaded twice. The header contains a weird format as coordinates are separated by a comma:: indic_de,sex,age,geo\\time 2013 2012 2011 2010 2009 We need to preprocess the data to split this information into columns. The overall process takes 4-5 minutes, 10 seconds to download (< 10 Mb), 4-5 minutes to preprocess the data (it could be improved). The processed data contains the following columns:: ['annee', 'valeur', 'age', 'age_num', 'indicateur', 'genre', 'pays'] Columns *age* and *age_num* look alike. *age_num* is numeric and is equal to *age* except when *age_num* is 85. Everybody above that age fall into the same category. The table contains many indicators: * PROBSURV: Probabilité de survie entre deux âges exacts (px) * LIFEXP: Esperance de vie à l'âge exact (ex) * SURVIVORS: Nombre des survivants à l'âge exact (lx) * PYLIVED: Nombre d'années personnes vécues entre deux âges exacts (Lx) * DEATHRATE: Taux de mortalité à l'âge x (Mx) * PROBDEATH: Probabilité de décès entre deux âges exacts (qx) * TOTPYLIVED: Nombre total d'années personne vécues après l'âge exact (Tx) """ final_name = os.path.join(to, "mortality.txt") if os.path.exists(final_name) and os.stat(final_name).st_size > 1e7: return final_name dest = os.path.join(to, "demo_mlifetable.tsv.gz") if not os.path.exists(dest) or os.stat(dest).st_size < 1e7: url = ( "https://github.com/sdpython/data/raw/main/mortality/demo_mlifetable.tsv.gz" ) with urllib.request.urlopen(url) as u: content = u.read() with open(dest, "wb") as f: f.write(content) with gzip.open(dest, "rb") as f: file_content = f.read() content = str(file_content, encoding="utf8") with open(final_name, "w", encoding="utf8") as f: f.write(content) def format_age(s): "local function" if s.startswith("Y_"): if s.startswith("Y_LT"): return "YLT" + s[4:] if s.startswith("Y_GE"): return "YGE" + s[4:] raise SyntaxError(s) # pragma: no cover i = int(s.strip("Y")) return "Y%02d" % i def format_age_num(s): "local function" if s.startswith("Y_"): if s.startswith("Y_LT"): return float(s.replace("Y_LT", "")) if s.startswith("Y_GE"): return float(s.replace("Y_GE", "")) raise SyntaxError(s) # pragma: no cover i = int(s.strip("Y")) return float(i) def format_value(s): "local function" if s.strip() == ":": return numpy.nan return float(s.strip(" ebp")) log(verbose, lambda: "[mortality_table] read") dff = pandas.read_csv(final_name, sep="\t", encoding="utf8") if stop_at is not None: if verbose: print(f"[mortality_table] read only {stop_at} rows") dfsmall = dff.head(n=stop_at) df = dfsmall else: df = dff log(verbose, lambda: f"[mortality] step 1, shape is {df.shape}") dfi = df.reset_index().set_index("indic_de,sex,age,geo\\time") dfi = dfi.drop("index", axis=1) dfs = dfi.stack() dfs = pandas.DataFrame({"valeur": dfs}) log(verbose, lambda: f"[mortality] step 2, shape is {dfs.shape}") dfs["valeur"] = dfs["valeur"].astype(str) dfs["valeur"] = dfs["valeur"].apply(format_value) dfs = dfs[dfs.valeur >= 0].copy() dfs = dfs.reset_index() dfs.columns = ["index", "annee", "valeur"] log(verbose, lambda: f"[mortality] step 3, shape is {dfs.shape}") dfs["age"] = dfs["index"].apply(lambda i: format_age(i.split(",")[2])) dfs["age_num"] = dfs["index"].apply(lambda i: format_age_num(i.split(",")[2])) dfs["indicateur"] = dfs["index"].apply(lambda i: i.split(",")[0]) dfs["genre"] = dfs["index"].apply(lambda i: i.split(",")[1]) dfs["pays"] = dfs["index"].apply(lambda i: i.split(",")[3]) log(verbose, lambda: f"[mortality] step 4, shape is {dfs.shape}") dfy = dfs.drop("index", axis=1) dfy.to_csv(final_name, sep="\t", encoding="utf8", index=False) return final_name