"""
Module, whose only objective is to update and copy the data necessary for the software to work correctly.
The source can be anywhere and is defaulted to the support_files (assuming the file is used from within
the repository and not after an install. If installed, the files must be specified each time). Updating
the information from SwissGrid uses source codes from https://swissgrid.ch. It is much more intensive and
requires a specific parametrization of the updater function.
"""
import os
import shutil
from concurrent.futures import ProcessPoolExecutor
from datetime import timedelta
from time import time
import numpy as np
import pandas as pd
from pandas import DataFrame
from ecodynelec.preprocessing.auxiliary import get_default_file, read_ofen_pdf_file
from ecodynelec.preprocessing.enr_residual_utils import get_enr_data_from_pronovo_ec
from ecodynelec.preprocessing.loading import adjust_generation, import_generation
from ecodynelec.preprocessing.sfoe_extracting import updating_ofen_data
[docs]
def update_all(path_dir=None, path_swissGrid=None, is_verbose=False):
"""
Updates all possible software files at once.
Parameter
---------
path_dir: str, optional
path to the directory containing updated files. Typically, this is the `support_files/`
directory of the cloned git repository of EcoDynElec. The directory must contain ALL
the files of interest, otherwise the execution is aboarded.
If None, an attempt to use a default path is made, with no promises.
path_swissGrid: str, optional
path to a directory containting files downloaded from https://swissgrid.ch. If not given,
the swiss-grid information is not updated. This will have no impact if no path_dir is given.
To solely update the swiss-grid data, please use the `update_sg` function.
is_verbose: bool, optional
to display information. Default to False.
"""
### Verify the path_dir
if path_dir is None:
### Try to reach a default directory
path_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'support_files')
if not os.path.isdir(path_dir): # Verify if path is valid
raise FileNotFoundError(
f"Need to specify a directory containing updated files to save them into software files.")
### Verify the names if using path_dir
expected = ["Neighbourhood_EU.csv", "Unit_Impact_Vector.csv", "SFOE_data.csv", "Residual_model.xlsx"]
files = os.listdir(path_dir)
if not all(exp in files for exp in expected):
missing = [f for f in expected if f not in files]
raise FileNotFoundError(
f"The following files are missing. Please use individual update functions for individual updates. {missing}")
### Process all common updates
update_neighbours(os.path.join(path_dir, "Neighbourhood_EU.csv"))
if is_verbose: print(f"Updated Neighbourhood file")
update_UIVector(os.path.join(path_dir, "Unit_Impact_Vector.csv"))
if is_verbose: print(f"Updated UI vector file")
update_UVEKVector(path_dir, is_verbose)
if is_verbose: print(f"Updated UVEK UI vector file")
update_SFOE(os.path.join(path_dir, "ofen_data"))
update_Losses(os.path.join(path_dir, "SFOE_data.csv"))
if is_verbose: print(f"Updated Losses file")
update_residual_share(os.path.join(path_dir, "Residual_model.xlsx"), save=True)
if is_verbose: print(f"Updated Residual share file")
### Go on with SwissGrid
if path_swissGrid is not None:
if os.path.isdir(path_swissGrid):
content = os.listdir(path_swissGrid)
if len(content) > 0:
if all(os.path.splitext(f)[1].startswith(".xls") for f in content):
update_sg(path_dir=os.path.abspath(path_swissGrid), save=True, is_verbose=is_verbose)
else:
raise KeyError(f"Not all files are source .xls or .xlsx in directory {path_swissGrid}")
else:
raise FileNotFoundError(f"{path_swissGrid} is an empty directory...")
else:
raise FileNotFoundError(f"{path_swissGrid} is no directory.")
elif is_verbose:
print("SwissGrid files were not updated.")
### Go on with ENR data
update_enr_data_from_pronovo(path_dir, verbose=is_verbose)
if is_verbose: print(f"Updated ENR data files")
return
###################################
####### GENERAL UPDATES ###########
###################################
[docs]
def update_copy(path, name):
### Verify
if not os.path.isfile(path): raise FileNotFoundError(f"Could not find {path}")
### Where to save
savepath = get_default_file(name)
### Copy
shutil.copy(path, savepath)
[docs]
def update_neighbours(path):
update_copy(path, "Neighbourhood_EU.csv")
[docs]
def update_UIVector(path):
update_copy(path, "Unit_Impact_Vector.csv")
[docs]
def update_UVEKVector(path, is_verbose):
mapping_path = os.path.join(path, "Mapping_UVEK.csv")
UI_vector_path = os.path.join(path, "Unit_Impact_Vector.csv")
UVEK_path = os.path.join(path, "UVEK_LCA.xlsx")
vector = impact_mapping_matching(UVEK_path, UI_vector_path, mapping_path, is_verbose=is_verbose)
vector.to_csv(os.path.join(path, "UVEK_Unit_Impact_Vector.csv"), index=True)
update_copy(os.path.join(path, "UVEK_Unit_Impact_Vector.csv"), "UVEK_Unit_Impact_Vector.csv")
[docs]
def update_SFOE(path):
DF = []
keys = ['annee', 'mois']
for file in os.listdir(path):
if "2023" in file:
continue # skip 2023
if not file.lower().endswith(".pdf"):
continue
pdf_path = os.path.join(path, file)
DF.append(updating_ofen_data(pdf_path))
df = DF[0].merge(DF[1][keys].drop_duplicates(), on=keys, how="left", indicator=True) \
.query("_merge == 'left_only'") \
.drop(columns="_merge")
df_all = pd.concat([df, DF[1]], ignore_index=True)
df_all.to_csv(os.path.join(os.path.dirname(path), 'SFOE_data.csv'), index=False, encoding="utf-8")
[docs]
def update_Losses(path):
update_copy(path, "SFOE_data.csv")
[docs]
def update_residual_share(path, save=True):
"""Extracts the data relative to residual share estimate and save it in software files"""
### Verification
# Error will be raised by pandas if needed
### Extraction
interest = {'Centrales au fil de l’eau': "Hydro_Run-of-river_and_poundage_Res",
'Centrales à accumulation': "Hydro_Water_Reservoir_Res",
'Centrales therm. classiques et renouvelables': "Other_Res"}
df = pd.read_excel(path, header=60, index_col=0).loc[interest.keys()].rename(index=interest)
df = df.T
df.index = pd.to_datetime(df.index, dayfirst=True, errors='coerce')
### Saving
if save:
savepath = get_default_file("Share_residual.csv")
df.to_csv(savepath, index=True)
return df
###################################
##### SPECIFIC TO SWISS-GRID ######
###################################
[docs]
def update_sg(path_dir=None, path_files=None, save=True, is_verbose=False):
"""
Function to update the SwissGrid values from source files.
It requires the source files from swissgrid.ch to be downloaded manually.
The files are downloaded in parallel to save time, as .xlsx are
particularly long to load.
The data is returned and automatically overwrites previous version in the
software files if save=True.
Parameters
----------
path_dir: str, optional
path do directory containing EXCLUSIVELY the files from swissgrid.ch
Either path_dir or path_files must be specified.
path_files: list-like, optional
list of paths to the files downloaded from swissgrid.ch on local computer
Either path_dir or path_files must be specified.
save: bool, optional
to decide whether to overwrite the software files with the new extracted data
default is True.
is_verbose: to display information
Returns
-------
pandas.DataFrame
"""
### List the elements / files
if (path_dir is None) & (path_files is None):
raise FileNotFoundError("Needs to specify a directory or a list of files")
elif path_dir is not None:
files = [os.path.abspath(os.path.join(path_dir, f))
for f in sorted(os.listdir(path_dir))]
else:
files = path_files
### Verification
faulty = [f for f in files if not os.path.isfile(f)]
if faulty: raise FileNotFoundError(f"Following files were not found: {faulty}")
### Extract data
if is_verbose: print("Extracting SwissGrid files...")
t0 = time()
whole_sg = []
with ProcessPoolExecutor() as pool:
for table in pool.map(_prepare_sg_year, files):
whole_sg.append(table)
whole_sg = pd.concat(whole_sg, axis=0).sort_index()
if is_verbose:
print(f"\tLoaded {len(files)} tables: {time() - t0:.2f} sec")
print(f"\tMemory usage: {whole_sg.memory_usage().sum() / 1024 ** 2:.1f} MB")
### Save the data
if save:
target = get_default_file("SwissGrid_total.csv")
if is_verbose: print(f"Re-writing {target}...")
## Build the path to file
whole_sg.to_csv(target, index=True)
### Return
if is_verbose: print(f"Updating SG total: {time() - t0:.2f} sec")
return whole_sg
def _rename_columns_sg(columns):
new_cols = {}
for c in columns:
if 'energy consumed by end users' in c:
new_cols[c] = "Consommation_CH"
elif 'energy production' in c:
new_cols[c] = 'Production_CH'
elif 'energy consumption' in c:
new_cols[c] = 'Consommation_Brut_CH'
elif "->" in c:
new_cols[c] = c.strip()[-6:]
return new_cols
def _set_index_sg(idx):
start = pd.to_datetime(idx[0])
new_idx = pd.date_range(start=start, freq='15T', periods=len(idx))
return new_idx - pd.Timedelta('15min')
def _prepare_sg_year(path, year=None):
if os.path.isfile(path):
sg_file = path
else:
if year is None:
raise ValueError("If path does not point at a file, a year value is needed")
elif any(isinstance(year, k) for k in (int, float, np.number)):
year = str(int(year))
elif not isinstance(year, str):
raise TypeError(f"year must be a string or a number. Not {year}")
sg_file = os.path.join(path, [f for f in os.listdir(path) if year in f][0])
### Import data
col_selection = 'A:D,K:R'
data = pd.read_excel(sg_file, sheet_name='Zeitreihen0h15', header=0, index_col=0,
parse_dates=False, usecols=col_selection).drop(index='Zeitstempel', errors='ignore')
### Clean data
data.index = _set_index_sg(data.index)
return data.rename(columns=_rename_columns_sg(data.columns)).astype("int32")
##############################################
##### SPECIFIC TO ENERGY CHARTS-PRONOVO ######
##############################################
[docs]
def update_enr_data_from_pronovo(path_dir=None, output_file=None, verbose=False):
"""
| Function to update the renewable electricity data from Pronovo and Energy Charts.
| This updates the `enr_prod_2016-2022_completed.csv` file in the `support_files` directory.
| The source files should be manually downloaded following the procedure described in the documentation of the
:ref:`ecodynelec.preprocessing.enr_residual_utils <pronovo-and-energycharts-data-downloading>` module.
| The source files should be placed in the `path_dir` directory :
- a 'pronovo_data' sub-directory should contain the files from Pronovo ('prod_year' directories containing the .csv
files downloaded on the Pronovo's website, AND a 'EC_Solar_year.csv' EnergyCharts solar production file to scale the
hourly Pronovo data to the real daily production given by EnergyCharts
(see :ref:`ecodynelec.preprocessing.enr_residual_utils <pronovo-and-energycharts-data-downloading>` module documentation)
- a 'energy_charts_data' sub-directory should contain the files from Energy Charts
- a 'enr_prod_2016-2019.csv' file should be in the `path_dir` directory
| This file is generated using the Ecd-EnrModel project. It contains predicted solar and wind electricity production
from 2016 to 2019.
Parameters
----------
path_dir : str, optional
path to the directory containing the source files. Typically, this is the `support_files/`
directory of the cloned git repository of EcoDynElec. The directory must contain ALL
the files of interest, otherwise the execution is aborted.
If None, an attempt to use a default path is made, with no promises.
output_file : str, optional
path to the file to save the updated data. If None, the file is saved in a 'enr_prod_2016-2022_completed.csv'
file in the `path_dir` directory.
verbose : bool, optional
to display information. Default to False.
Returns
-------
None
"""
### Verify the path_dir
if path_dir is None:
### Try to reach a default directory
path_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'support_files')
if not os.path.isdir(path_dir): # Verify if path is valid
raise FileNotFoundError(
f"Need to specify a directory containing updated files to save them into software files.")
predicted_data = os.path.join(path_dir, 'enr_prod_2016-2019.csv')
if not os.path.isfile(predicted_data):
raise FileNotFoundError(
f"The file {predicted_data} is missing in {path_dir}. Please generate it using Ecd-EnrModel project and save it in {path_dir}.")
predicted_data = pd.read_csv(predicted_data, index_col=0, parse_dates=[0])
mapped_data = get_enr_data_from_pronovo_ec(path_dir, verbose)
# Merge the predicted data (ending in 2019) with the mapped data (starting in 2020) and ensure they don't overlap
ndf = pd.concat([predicted_data.loc[:(mapped_data.index[0] - timedelta(hours=1))], mapped_data], axis=0)
ndf.fillna(0, inplace=True)
if output_file is None:
output_file = os.path.join(path_dir, 'enr_prod_2016-2024_completed.csv')
if verbose:
print(f"Saving {output_file}...")
ndf.to_csv(output_file)
[docs]
def impact_mapping_matching(uvek_path, ui_vector_path, mapping_path, is_verbose=False):
"""
Matches technologies with their environmental impact values by mapping data from
various sources (UVEK, UI impact vector, and mapping details). Generates a new
dataframe aligning UI technologies to their corresponding impact values.
Parameters
----------
uvek_path : str
File path to the UVEK impact data file.
ui_vector_path : str
File path to the UI impact vector data file.
mapping_path : str
File path to the mapping file linking EcoInvent and UVEK.
is_verbose : bool, optional
Flag to enable verbose output during the process, default is False.
Returns
-------
pandas.DataFrame
A new impact vector with combined environmental data for technologies from
the UI impact vector and the mapping to UVEK impact data.
"""
# Loading UVEK impact data
ctry = ['CH','AT','DE','FR','CZ','IT']
uvek_impact = extract_uvek_impact(ctry, vector_path=uvek_path, is_verbose=is_verbose)
# Loading Mapping data for EcoInvent and UVEK
mapping = pd.read_csv(mapping_path, sep=';', encoding='latin-1')
mapping['Pourcentage'] = mapping['Pourcentage'].astype(str).str.replace(',', '.', regex=False)
mapping['Pourcentage'] = pd.to_numeric(mapping['Pourcentage'], errors='coerce')
# Loading UI impact vector
ui_impact = pd.read_csv(ui_vector_path)
new_impact_vector = pd.DataFrame(columns=uvek_impact.columns)
for techno in ui_impact.iloc[:, 0]:
techno_map = mapping[mapping['EcoDynElec'] == techno]
if not techno_map.empty:
new_impact_vector.loc[techno] = [0,0,0,0]
for index, row in techno_map.iterrows():
if pd.isna(row['UVEK']): pass
else:
new_impact_vector.loc[techno] += row['Pourcentage']*uvek_impact.loc[row['UVEK']]
else:
if is_verbose:
print(f"No mapping found for : {techno}")
return new_impact_vector