"""
Module to load production and cross-border flows from ENTSO-E
"""
import os
from time import time
import numpy as np
import pandas as pd
#################### Local functions
from ecodynelec.checking import check_frequency, check_regularity_frequency, check_residual_availability
from ecodynelec.preprocessing.autocomplete import get_steps_per_hour
from ecodynelec.preprocessing.auxiliary import load_gap_content
from ecodynelec.preprocessing.extracting import extract
from ecodynelec.preprocessing.residual import include_global_residual
from ecodynelec.progress_info import ProgressInfo
# +
# Module to load production and cross-border flows from Entso-E
# +
#####################################
# ####################################
# IMPORT DATA
# ####################################
# ####################################
# -
[docs]
def import_data(ctry, start=None, end=None, freq="H", involved_countries=None, path_gap=None, sg_data=None,
enr_prod_ch=None, net_exchange=False,
path_gen=None, gen_preprocessed=None, path_imp=None, imp_preprocessed=None, savedir=None,
residual_global=False, correct_imp=True,
clean_data=True, n_hours=2, days_around=7, limit=.4, is_verbose=True,
progress_bar: ProgressInfo = None):
"""
Main function managing the import and pre-treatment of Entso-e production and cross-border flow data.
Parameters
----------
ctry: list
list of countries to include in the computing (list)
start:
starting date, as str or datetime
end:
ending date, as str or datetime
freq: str, default to 'H'
frequency of time steps to consider
involved_countries: list, default to None
list of all countries involved, with the countries to include in the computing
and their neighbours (to implement the exchanges with 'Other' countries)
path_gap: str or None, default to None
path to the file containing the information about the nature of the residual
refer to parameter.path.gap for more information
sg_data: pandas.DataFrame, default to None
information from Swiss Grid
enr_prod_ch: pandas.DataFrame, default to None
Wind and solar production in Switzerland, as modeled with EcoDynElec-Enr-Model
See Parameter.ch_enr_model_path for more information
net_exchange: bool, default to False
to simplify cross-border flows to net after resampling
path_gen: str, default to None
directory where raw Entso-e generation files are stored
path_imp: str, default to None
directory where raw Entso-e files for cross-border flow data are stored
gen_preprocessed: str, default to None
directory where preprocessed Entso-e generation files are stored
imp_preprocessed: str, default to None
directory containing the files for preprocessed cross-border flow data
savedir: str, default to None
directory to save information
residual_global: bool, default to False
to consider the production residual as produced electricity that can be exchanged with neighbour countries
correct_imp: bool, default to False
to replace cross-border flow of Entso-e for Swizerland with data from Swiss Grid
clean_data: bool, default to True
to enable automatic data cleaning / filling
n_hours: int, default to 2
max number of successive missing hours to be considered as occasional event
days_around: int, default to 7
number of days after and before a gap to consider to create a 'typical mean day'
limit: float, default to 0.4
max relative length of a gap to fill the data. Longer gaps are filled with zeros.
is_verbose: bool, default to False
to display information
progress_bar: ProgressInfo, default to None
to display a progress bar
Returns
-------
pandas.DataFrame
pandas DataFrame with all productions and all exchanges from all included countries.
"""
t0 = time()
if progress_bar:
progress_bar = ProgressInfo("Import generation data...", 6, width='40%')
### GENERATION DATA
Gen = import_generation(path_gen=path_gen, path_prep=gen_preprocessed, ctry=ctry, start=start, end=end,
savedir=savedir, n_hours=n_hours, days_around=days_around, limit=limit,
clean_generation=clean_data, is_verbose=is_verbose,
progress_bar=progress_bar) # import generation data
if progress_bar:
progress_bar.progress('Adjust generation data...')
Gen = adjust_generation(Gen, freq=freq, residual_global=residual_global, sg_data=sg_data, start=start, end=end,
path_gap=path_gap, enr_prod_ch=enr_prod_ch, is_verbose=is_verbose) # adjust the generation data
if progress_bar:
progress_bar.progress('Import exchanges data...')
### EXCHANGE DATA
Cross = import_exchanges(ctry=ctry, start=start, end=end, savedir=savedir,
n_hours=n_hours, days_around=days_around, limit=limit, clean_imports=clean_data,
path_imp=path_imp, path_prep=imp_preprocessed, freq=freq, is_verbose=is_verbose,
progress_bar=progress_bar) # Import data
if progress_bar:
progress_bar.progress('Adjust exchanges data...')
Cross = adjust_exchanges(Cross=Cross, neighbourhood=involved_countries, net_exchange=net_exchange, freq=freq,
sg_data=sg_data if correct_imp else None, is_verbose=is_verbose)
if progress_bar:
progress_bar.progress('Join data...')
### GATHER GENERATION AND EXCHANGE
electric_mix = _join_generation_exchanges(Gen=Gen, Cross=Cross, is_verbose=is_verbose)
if progress_bar:
progress_bar.hide()
if is_verbose: print("Import of data: {:.1f} sec".format(time() - t0))
return electric_mix
# +
#####################################
# ####################################
# Import Generation
# ####################################
# ####################################
# -
[docs]
def import_generation(ctry, start, end, path_gen=None, path_prep=None, savedir=None,
n_hours: int = 2, days_around: int = 7, limit: float = .4, clean_generation: bool = True,
is_verbose=False, progress_bar: ProgressInfo = None):
"""
Function to import generation data from Entso-e information source.
Parameters
----------
ctry: list
countries to incldue in the study (list)
start:
starting date, as str or datetime
end:
ending date, as str or datetime
path_gen: str, default to None
directory where raw Entso-e generation files are stored (str) [secondary path]
path_prep: str, default to None
directory where preprocessed Entso-e generation files are stored (str) [prioritary path]
Where preprocessed files are saved if both `path_prep` and `path_gen` are passed and different.
savedir: str, default to None
directory path to save results (str, default: None)
n_hours: int, default to 2
max number of successive missing hours to be considered as occasional event
days_around: int, default to 7
number of days after and before a gap to consider to create a 'typical mean day'
limit: float, default to 0.4
max relative length of a gap to fill the data. Longer gaps are filled with zeros.
clean_generation: bool, default to True
to enable automatic data cleaning / filling
is_verbose: bool, default to False
to display information
Returns
-------
dict
processed generation data per country
"""
path, savegen = _infer_paths(path_prep, path_gen, case='Generation')
#######################
###### Generation data
#######################
if is_verbose: print("Load generation data...")
# Selecton of right files according to the choice of countres
if path == path_prep:
files = {}
for c in ctry: # Gather prepared files per country
try:
files[c] = [f for f in os.listdir(path) if ((f.startswith(c)) & (f.endswith('MW.csv')))][0]
except Exception as e:
raise KeyError(f"No pre-processed generation data for {c}: {e}")
Gen = {} # Dict for the generation of each country
elif path == path_gen: # Just fill the Gen directly for row files
Gen = extract(ctry=ctry, start=start, end=end, dir_gen=path, savedir_gen=savegen, save_resolution=savedir,
n_hours=n_hours, days_around=days_around, limit=limit, correct_gen=clean_generation,
is_verbose=is_verbose, progress_bar=progress_bar) # if from raw files
for c in ctry: # Preprocess all files / data per country
# Extract the generation data file
if path == path_prep: # Load preprocessed files
Gen[c] = pd.read_csv(os.path.join(path, files[c]), index_col=0) # Extraction of preprocessed files
# Check and modify labels if needed
Gen[c].columns = Gen[c].columns.str.rstrip() + " " # (first remove if any, then) set additional ' ' at the end
# Set indexes to time data
Gen[c].index = pd.to_datetime(Gen[c].index, yearfirst=True) # Convert index into datetime
# Only select the required piece of information
Gen[c] = Gen[c].loc[start:end]
source = list(Gen[c].columns) # production plants types
if "Other " in source: # Expected this label for "Other fossil" from ENTSO-E data
source[source.index("Other ")] = "Other fossil " # rename one specific column
Gen[c].columns = [s.replace(" ", "_") + c for s in source] # rename columns
return Gen
# +
#####################################
# ####################################
# Adjust Generation
# ####################################
# ####################################
# -
[docs]
def adjust_generation(Gen, freq='h', residual_global=False, start=None, end=None,
sg_data=None, path_gap=None, enr_prod_ch=None, is_verbose=False):
"""Function that leads organizes the data adjustment.
It sorts finds and sorts missing values, fills it, resample the data and
add a residual as global production
Parameters
----------
Gen: dict
dict of dataFrames containing the generation for each country
freq: str, default to 'H'
time step durtion
residual_global: bool, default to False
whether to include the residual or not
start: str or None, default to None
starting date of the study
end: str or None, default to None
ending date of the study
sg_data: pandas.DataFrame, default to None
information from Swiss Grid
path_gap: str or None, default to None
path to the file containing the information about the nature of the residual
refer to parameter.path.gap for more information
enr_prod_ch: pandas.DataFrame, default to None
Renewable energy production in Switzerland, as modeled with EcoDynElec-Enr-Model
See parameter.ch_enr_model_path for more information
is_verbose: bool, default to False
whether to display information or not.
Returns
-------
dict
dict of pandas DataFrames containing modified Gen dict.
"""
### Resample data to the right frequence
if is_verbose: print(f"\t4/{4 + int(residual_global)} - Resample exchanges to {freq} steps...")
Gen = resample_data(Gen, freq=freq)
### Load gap data -> if Residual
if residual_global:
if is_verbose: print('Loading gap data')
if enr_prod_ch is not None:
delta = enr_prod_ch - Gen['CH'].loc[:, enr_prod_ch.columns]
delta[delta < 0] = 0
else:
delta = None
prod_gap = load_gap_content(path_gap=path_gap, start=start, end=end, freq=freq, enr_prod_residual_ch=delta)
else:
prod_gap = None
### Include the enr production as modeled with EcoDynElec-Enr-Model
if enr_prod_ch is not None:
# Check the availability of enr production data
check_residual_availability(prod=Gen['CH'], residual=enr_prod_ch, freq=freq)
if start.year < 2023:
enr_prod_ch = enr_prod_ch.drop(columns=['Hydro_Pumped_Storage_CH', 'Hydro_Pumpage_CH']) # Using ENTSO-E data for 2023 and under
# And include it
Gen['CH'].loc[:, enr_prod_ch.columns] = enr_prod_ch
### Then include residual production
if residual_global:
Gen = include_global_residual(Gen=Gen, freq=freq, sg_data=sg_data, prod_gap=prod_gap,
is_verbose=is_verbose)
return Gen
# +
#####################################
# ####################################
# Import Exchanges
# ####################################
# ####################################
# -
[docs]
def import_exchanges(ctry, start, end, path_imp=None, path_prep=None, savedir=None, freq='H',
n_hours: int = 2, days_around: int = 7, limit: float = .4, clean_imports: bool = True,
is_verbose=False, progress_bar: ProgressInfo = None):
"""
Function to import the cross-border flows.
Finds the useful files to load, load the data, group relevant information and adjust time step.
Parameters
----------
ctry: list
countries to incldue in the study (list)
start:
starting date, as str or datetime
end:
ending date, as str or datetime
path_imp: str, default to None
directory where raw Entso-e exchange files are stored (str) [secondary path]
path_prep: str, default to None
directory where preprocessed Entso-e exchange files are stored (str) [prioritary path]
Where preprocessed files are saved if both `path_prep` and `path_imp` are passed and different.
savedir: str, default to None
directory path to save results (str, default: None)
freq: str, default to 'H'
the frequency to consier
n_hours: int, default to 2
max number of successive missing hours to be considered as occasional event
days_around: int, default to 7
number of days after and before a gap to consider to create a 'typical mean day'
limit: float, default to 0.4
max relative length of a gap to fill the data. Longer gaps are filled with zeros.
clean_generation: bool, default to True
to enable automatic data cleaning / filling
is_verbose: bool, default to False
to display information
Returns
-------
dict
dict of pandas.DataFrame containing cross-border flows.
"""
path, saveimp = _infer_paths(path_prep, path_imp, case='Exchanges')
if is_verbose: print("Get and reduce importation data...")
### Files to consider
if path == path_prep:
files = {}
for c in ctry:
try:
files[c] = [f for f in os.listdir(path) if ((f.startswith(c)) & (f.endswith('MW.csv')))][0]
except Exception as e:
raise KeyError(f'No pre-processed exchange data for "{c}": {e}')
Cross = {} # Dict for the generation of each country
elif path == path_imp: # Just fill the Gen directly for row files
Cross = extract(ctry=ctry, start=start, end=end, dir_imp=path, savedir_imp=saveimp, save_resolution=savedir,
n_hours=n_hours, days_around=days_around, limit=limit, correct_imp=clean_imports,
is_verbose=is_verbose, progress_bar=progress_bar) # if from raw files
for i, c in enumerate(ctry): # File extraction
if path == path_prep:
if is_verbose: print("\t{}/{} - {}...".format(i + 1, len(files), c))
Cross[c] = pd.read_csv(os.path.join(path, files[c]), index_col=0) # Extraction
# Transform index in time data, then keeps only period of interest
Cross[c].index = pd.to_datetime(Cross[c].index, yearfirst=True) # Considered period only
Cross[c] = Cross[c].loc[start:end] # select right period
return Cross
# +
#####################################
# ####################################
# Adjust exchanges
# ####################################
# ####################################
# -
[docs]
def adjust_exchanges(Cross, neighbourhood, net_exchange=False, freq='H', sg_data=None, is_verbose=False):
"""
Bring adjustments to the exchange data: add SwissGrid data, fill data,
adjust frequency and set exchanges to net.
Parameters
----------
Cross: dict
the Cross-border flow data, as dict of pandas DataFrame
neighbourhood: list
list of involved countries, as main countries or neighbours
net_exchange: bool, default to False
to consider net cross-border flows
freq: str, default to 'H'
time step
sg_data: pandas.DataFrame, default to None
information from Swiss Grid
is_verbose: bool, default to False
to display information
Returns
-------
dict
dict of pandas DataFrame with adjusted cross-border flow data.
"""
### ADJUST THE FREQUENCY AND CONVERT TO MWh
if is_verbose: print(f"Resample exchanges to {freq} steps...")
Cross = resample_data(Cross, freq=freq)
### ADJUST WITH SWISSGRID DATA (AT SWISS BORDER ONLY)
if sg_data is not None: # Adjust with SG data
Cross = set_swissGrid(Cross, sg_data)
### CREATE THE 'OTHER' AND REMOVE UNUSED
for c in Cross:
other = [k for k in neighbourhood if k not in Cross.keys()] # Label as 'other' all non-main selected countries
Cross[c]['Other'] = Cross[c].loc[:, other].sum(axis=1).copy() # Add the aggregated 'Other'
involved = [k for k in neighbourhood if k in Cross.keys()] + ['Other'] # All neighbours involved in computation
Cross[c] = Cross[c].loc[:, involved] # Select only relevant information
Cross[c] = Cross[c].rename(columns=lambda s: f"Mix_{s}_{c}") # Rename columns
### DEAL WITH NET-RAW EXCHANGES
if net_exchange:
Cross = create_net_exchange(Cross)
return Cross
#
###############################################################################
# ###########################
# # Set SwissGrid at Swiss border
# ###########################
# ###########################
#
[docs]
def set_swissGrid(Cross, sg_data):
"""
Function to replace the cross-border flow data of ENTSO-E by the cross-border flow data of SwissGrid. Data passed must be in 15min.
Parameters
----------
Cross: dict
the Cross-border flow data, as dict of pandas DataFrame
sg_data: pandas.DataFrame
information from Swiss Grid
Returns
-------
dict
dict of pandas DataFrame with cross-border flow data for all the countries of the studied area.
"""
#### Replace the data in the DataFrames
places = ["AT", "DE", "FR", "IT"] # Neighbours of Swizerland (as the function is only for Swizerland)
for c in places:
if c in Cross['CH'].columns:
Cross["CH"].loc[:, c] = sg_data.loc[:, f"Mix_{c}_CH"] # Swiss imports
if c in Cross.keys():
Cross[c].loc[:, 'CH'] = sg_data.loc[:, f"Mix_CH_{c}"] # Swiss exports
return Cross
#
###############################################################################
# ###########################
# # Create net exchanges
# ###########################
# ###########################
#
[docs]
def create_net_exchange(Cross):
"""
Adapt the cross-border flow to consider exchanges at each border and time step as net.
Net exchange means that electricity can only go from A to B or from B to A, but not in
both directions at the same time.
"""
# d = data.copy()
ctry = list(Cross.keys())
# Correction of the cross-border (turn into net exchanges) over each time step
for i in range(len(ctry)):
for j in range(len(ctry) - 1, i, -1):
decide = (Cross[ctry[j]].loc[:, f"Mix_{ctry[i]}_{ctry[j]}"]
>= Cross[ctry[i]].loc[:, f"Mix_{ctry[j]}_{ctry[i]}"]) # direction
diff = (Cross[ctry[j]].loc[:, f"Mix_{ctry[i]}_{ctry[j]}"]
- Cross[ctry[i]].loc[:, f"Mix_{ctry[j]}_{ctry[i]}"]) # exchange difference
Cross[ctry[j]].loc[:, f"Mix_{ctry[i]}_{ctry[j]}"] = decide * diff # if flow i to j --> +value
Cross[ctry[i]].loc[:, f"Mix_{ctry[j]}_{ctry[i]}"] = (decide - 1) * diff # if j to i <-- -value
return Cross
# +
#####################################
# ####################################
# Join Generation Exchanges
# ####################################
# ####################################
# -
def _join_generation_exchanges(Gen, Cross, is_verbose=False):
"""Function to join generation and cross-border flow information."""
if is_verbose: print("Gather generation and importation...")
### Union of all tables of importation and generation together
Union = {}
for f in Gen.keys(): # for all countries
Union[f] = pd.concat([Gen[f], Cross[f]], axis=1) # gathering of the data
return pd.concat([Union[f] for f in Union.keys()], axis=1) # Join all the tables together
# +
#####################################
# ####################################
# Infer paths
# ####################################
# ####################################
# -
def _infer_paths(path_prep, path_raw, case='Generation'):
"""Function to correctly set the paths to files and savin directory"""
path, savegen = None, None
if ((path_prep is None) & (path_raw is None)):
raise KeyError(f"No path is given for {case} data.")
elif ((path_prep is None) & (path_raw is not None)): # Need raw file
path = path_raw # Then use raw file
elif ((path_prep is not None) & (path_raw is None)): # Got a file prepared
path = path_prep # Then use prepared file
else: # Both are not None
path, savegen = path_raw, path_prep # Then use raw and save in path_prep.
return path, savegen
# +
#####################################
# ####################################
# Resample Data
# ####################################
# ####################################
# -
[docs]
def resample_data(Data, freq):
"""
Function that turns data from MW to MWh and adapts its frequency.
The data is assumed to be in MW, in a table with 15min indexes.
Parameters
----------
Data: dict
dict of DataFrames containing the generation data.
freq: str
the frequency (length of time step)
Returns
-------
dict
dict of pandas DataFrame wiht resampled and converted energy
"""
### VERIFY THE FREQUENCY
check_frequency(freq)
if check_regularity_frequency(freq): # If frequency is regular for pandas
### Normal resampling and MW -> MWh conversion
for f in Data: # For all keys
conv_factor = get_steps_per_hour(freq, dtype=float) # Factor to convert MW to MWh
# Resample Power and turn into energy
Data[f] = (Data[f]
.resample(freq)
.mean() # Mean works also to downscale
.interpolate()
.fillna(0)) / conv_factor
else: # Frequency is month or year
### Use Hours to convert MW -> MWh, then resample to correct frequency
for f in Data: # For all keys
conv_factor = get_steps_per_hour('H') # Factor to convert MW to MWh
# Resample Power and turn into energy
Data[f] = (((Data[f]
.resample('H')
.mean() # Average as power still
.interpolate()
.fillna(0)
)
/ conv_factor # Turn MW -> MWh
)
.resample(freq)
.sum() # Sum as energy now
)
return Data