Source code for ecodynelec.preprocessing.auxiliary

"""
Module containing a collection of functions to load side-datasets
that may be required during the execution of `ecodynelec` proceses.
"""

import os
import warnings

import numpy as np
import pandas as pd
import tabula as tabula
from datetime import datetime

################# Local functions
from ecodynelec.checking import check_frequency


# +

###########################
# ##########################
# Load SwissGrid
# ##########################
# ##########################

# -

[docs] def load_swissGrid(path_sg, start=None, end=None, freq='H'): """ Function to load production and cross-border flows information from Swiss Grid. Data used many times along the algorithm. Parameters ---------- path_sg: str path to the file with Swiss Grid information start: str, default None starting date, as datetime or str end: str, default None ending date, as datetime or str freq: str, default to 'H' frequency to resample the SwissGrid data to Returns ------- pandas.DataFrame table of SwissGrid information in MWh """ ### Default path if path_sg is None: path_sg = get_default_file(name='SwissGrid_total.csv') ### Date safety if start is not None: start = pd.to_datetime(start) if end is not None: end = pd.to_datetime(end) ### Import SwissGrid data sg = pd.read_csv(path_sg, index_col=0, parse_dates=True, dtype="float32") sg = sg.drop(columns=["Consommation_CH", "Consommation_Brut_CH"]) # Remove unused columns ### Check info availability (/!\ if sg smaller, big problem not filled yet !!!) if 'Production_CH' not in sg.columns: raise KeyError("Missing information 'Production_CH' in SwissGrid Data.") if ((start is None) | (end is None)): msg = " /!\ Some date limits are None. SwissGrid is on period {} - {}. It may not match the Generation and Exchange." warnings.warn(msg.format(sg.loc[start:end].index[0], sg.loc[start:end].index[-1])) elif ((start < sg.index[0]) | (end > sg.index[-1])): # print information only msg = " /!\ Resudual computed only during {} - {}. SwissGrid Data not available on the rest of the period." warnings.warn(msg.format(sg.loc[start:end].index[0], sg.loc[start:end].index[-1])) ### Rename the columns sg.columns = ["Production_CH", "Mix_CH_AT", "Mix_AT_CH", "Mix_CH_DE", "Mix_DE_CH", "Mix_CH_FR", "Mix_FR_CH", "Mix_CH_IT", "Mix_IT_CH"] start = pd.to_datetime(start) end = pd.to_datetime(end) ### Select the interesting data, resample to right frequency and convert kWh -> MWh return sg.loc[(sg.index >= start) & (sg.index <= end)].resample(freq).sum() / 1000
# + ########################### # ######################### # Clear Ambiguous Dates # ######################### ########################### # -
[docs] def clear_ambiguous_dates(sg): """Function to clear ambiguous dates in SwissGrid raw data""" # Gather ambiguous dates ambiguous = pd.Series(np.unique(sg.index, return_counts=True)[1], index=np.unique(sg.index), name='Occurrence').reset_index() ambiguous = ambiguous[((ambiguous.loc[:, 'Occurrence'] == 2) & (ambiguous.loc[:, 'index'] == ambiguous.loc[:, 'index'].round("H")))] # Create the new date for ambiguous dates ambiguous['replace'] = ambiguous.loc[:, 'index'].apply(lambda x: x if x.hour == 2 else x - pd.Timedelta("1H")) # Find the right index of first occurrence ambiguous.index = pd.Series(np.arange(sg.shape[0]), index=sg.index).loc[ambiguous.loc[:, 'index']].values[::2] # Clear SG dates sg_cleared = sg.reset_index() sg_cleared.loc[ambiguous.index, "Date"] = ambiguous.loc[:, 'replace'] return sg_cleared.set_index("Date")
# + ########################### # ########################## # Load useful countries # ########################## # ########################## # -
[docs] def load_useful_countries(path_neighbour, ctry): """ Function to load a list of countries directly or indirectly involved in the computation. Countries directly involved are passed as arguments. Countries indirectly involved are their neighbours. These indirectly involved countries help building the import from 'other' countries. """ ### Default path if path_neighbour is None: path_neighbour = get_default_file(name='Neighbourhood_EU.csv') ### For importing only the usefull data neighbouring = pd.read_csv(path_neighbour, index_col=0) useful = list(ctry) # List of countries considered + neighbours for c in ctry: useful += list(neighbouring.loc[c].dropna().values) useful = list(np.unique(useful)) # List of the useful countries, one time each. return useful
# + ########################### # ########################## # Load GridLosses # ########################## # ########################## # -
[docs] def load_grid_losses(network_loss_path, start=None, end=None): """ Function that loads network grid losses and returns a pandas DataFrame with the fraction of network loss in the transmitted electricity for each month. """ ### Default path if network_loss_path is None: network_loss_path = get_default_file(name='SFOE_data.csv') # Get and calculate new power demand for the FU vector losses = pd.read_csv(network_loss_path) losses['Rate'] = 1 + (losses.loc[:, 'Pertes'] / losses.loc[:, 'Conso_CH']) if start is None: if end is None: output = losses.loc[:, ['annee', 'mois', 'Rate']].rename(columns={'annee': 'year', 'mois': 'month'}) return output.reset_index(drop=True) else: end = pd.to_datetime(end) # Savety, redefine as datetime localize = (losses.annee <= end.year) else: start = pd.to_datetime(start) # Savety, redefine as datetime if end is None: localize = (losses.annee >= start.year) else: end = pd.to_datetime(end) # Savety, redefine as datetime localize = ((losses.annee >= start.year) & (losses.annee <= end.year)) output = losses.loc[localize, ['annee', 'mois', 'Rate']].rename(columns={'annee': 'year', 'mois': 'month'}) return output.reset_index(drop=True)
# + ########################### # ########################## # Load gap content # ########################## # ########################## # -
[docs] def load_gap_content(path_gap, start=None, end=None, freq='H', enr_prod_residual_ch=None): """ Function that defines the relative composition of the swiss residual production. The function is very file format specific. If enr_prod_residual_ch is not None, it will be subtracted from the "other" residual production before computing the relative composition of each category. Parameters ---------- path_gap: str Path to the file containing residual content information. The file must contain absolute values for each category, for each time step (if not, use updating.update_residual_share to update the file). start: default to None starting date, as datetime or str end: default to None ending date, as datetime or str freq: str, default to "H" frequency to resample the data to enr_prod_residual_ch: default to None Delta between the renewable electricity production of EcoDynElec-EnrModel and the production given by the ENTSO-E data. If not None, the total will be subtracted from the "other" residual category. Returns ------- pandas.DataFrame table with relative residual production composition for each time step. """ ### Default path if path_gap is None: path_gap = get_default_file(name="Share_residual.csv") # Change df = pd.read_csv(path_gap, index_col=0, parse_dates=True) # Load default from software files elif not os.path.isfile(path_gap): path_gap = get_default_file(name="Share_residual.csv") # Change df = pd.read_csv(path_gap, index_col=0, parse_dates=True) # Load default from software files elif os.path.splitext(path_gap)[1] == '.csv': df = pd.read_csv(path_gap, index_col=0, parse_dates=True) # Load csv file from user else: # cannot use the function in update, due to a circular import interest = {r'Centrales au fil de l\ eau': "Hydro_Run-of-river_and_poundage_Res", 'Centrales à accumulation': "Hydro_Water_Reservoir_Res", 'Centrales therm. classiques et renouvelables': "Other_Res"} df = pd.read_excel(path_gap, header=59, index_col=0).loc[interest.keys()].rename(index=interest) df = df.T df.index = pd.to_datetime(df.index, yearfirst=True) # time data # Check if the file contains relative (old format) or absolute values if df.iloc[0].sum() < 2: raise ValueError( "You should update the residual share file with updating.update_residual_share. It must now contain absolute production values (not relatives)") ########################### ##### Adapt the time resolution of raw data ##### # If year or month -> resample at start ('S') of month/year with average of info localFreq = freq # copy frequency if freq[0] in ["M", "Y"]: localFreq = freq[0] + "S" # specify at 'start' df = df.resample(localFreq).sum() # If in week -> resample with average elif freq in ['W', 'w']: localFreq = 'd' # set local freq to day (to later sum in weeks) ############################### ##### Select information ##### res_start, res_end = None, None if start is not None: start = pd.to_datetime(start) # Savety, redefine as datetime res_start = start if end is not None: end = pd.to_datetime(end) # Savety, redefine as datetime res_end = end + pd.offsets.MonthEnd(0) # Round at the end of the last month cols = ['Hydro_Run-of-river_and_poundage_Res', 'Hydro_Water_Reservoir_Res', 'Other_Res'] df.index = pd.to_datetime(df.index) df = df.loc[(df.index >= res_start) & (df.index <= res_end), cols]# Select information only for good duration if start is None: res_start = df.index[0] if end is None: res_end = df.index[-1] ################################ ##### Build the adapted time series with right time step ##### gap = pd.DataFrame(None, columns=df.columns, index=pd.date_range(start=res_start, end=max(res_end, df.index[-1]), freq=localFreq)) def remove_enr_prod_residual(dt): values = df.loc[dt, :] if enr_prod_residual_ch is not None and dt in enr_prod_residual_ch.index: enr = enr_prod_residual_ch.loc[dt, :].sum(axis=0) delta = values['Other_Res'] - enr.sum() delta = delta.clip(min=0) values['Other_Res'] = delta return values.values if localFreq[0] == 'Y': for dt in df.index: localize = (gap.index.year == dt.year) gap.loc[localize, :] = remove_enr_prod_residual(dt) elif localFreq[0] == "M": for dt in df.index: localize = ((gap.index.year == dt.year) & (gap.index.month == dt.month)) gap.loc[localize, :] = remove_enr_prod_residual(dt) else: for dt in df.index: # everything from (week, ) day to 15 minutes if dt.dayofweek <= 4: # week day localize = ((gap.index.year == dt.year) & (gap.index.month == dt.month) & (gap.index.dayofweek <= 4)) else: localize = ((gap.index.year == dt.year) & (gap.index.month == dt.month) & (gap.index.dayofweek == dt.dayofweek)) gap.loc[localize, :] = remove_enr_prod_residual(dt) gap = gap.dropna(axis=0) if freq in ["W", "w"]: # Aggregate into weeks gap = gap.fillna(method='ffill').resample(freq).mean() # get the relative shares of each technology from the total gap in kWh gap = (gap.divide(gap.sum(axis=1), axis=0)) return gap.dropna(axis=0)
# + ########################### # ########################## # Load raw Entso # ########################## # ########################## # -
[docs] def load_rawEntso(mix_data, freq='H'): """ Function that can load an existing production and exchange matrix in a CSV file """ ################################################ # Labeling of data matrix and import of data ################################################ if type(mix_data) == str: # Import from file check_frequency(freq) # Check the frequency tPass = {'15min': '15min', '30min': '30min', "H": "hour", "D": "day", 'd': 'day', 'W': "week", "w": "week", "MS": "month", "M": "month", "YS": "year", "Y": "year"} data = pd.read_csv(mix_data + f"ProdExchange_{tPass[freq]}.csv", index_col=0, parse_dates=True) elif type(mix_data) == pd.core.frame.DataFrame: # import from the DataFrame passed as argument data = mix_data else: raise KeyError(f"Data type {type(mix_data)} for raw_prodExch is not supported.") return data
# + ########################### # ########################## # Get default file # ########################## # ########################## # -
[docs] def get_default_file(name, level=0, max_level=3): """Function to return the absolute path of default files. The function uses the location of the current auxiliary.py file but assumes no structure in EcoDynElec. It only searches the structure upward""" ### Limit if level >= max_level: raise FileNotFoundError(f"Default support file {name} not found.") ### Function to find parent dir parent = lambda path, n: os.path.dirname(path) if n == 0 else os.path.dirname(parent(path, n - 1)) current_dir = parent(os.path.abspath(__file__), level) ### Check for file in current directory search = os.path.join(current_dir, name) if os.path.isfile(search): return search ### Otherwise, search 1 level in sub-directory for f in os.listdir(current_dir): if os.path.isdir(os.path.join(current_dir, f)): search = os.path.join(current_dir, f, name) if os.path.isfile(search): return search ### Otherwise, search recursively above (until limit reached) return get_default_file(name, level=level + 1)
# + ########################### # ########################## # Load CH Enr Model data # ########################## # ########################## # -
[docs] def load_ch_enr_model(ch_enr_model_path, start, end, freq): """ Load the CH energy production data from the given path and returns a dataframe with the same format as the processed ENTSO-E production and exchange data. Parameters ---------- ch_enr_model_path : str Path to the CH energy production data, generated with EcoDynElec-EnrModel start: str Start date of the data to load end: str End date of the data to load freq: str Frequency of the data to return (from H to Y) Returns ------- pd.DataFrame A dataframe with the same format as the processed ENTSO-E production and exchange data, containing the CH energy production data for the given period """ enr_prod_ch = pd.read_csv(ch_enr_model_path, index_col=0, parse_dates=[0]).astype(float) # Verify that the dataframe contains the right columns assert np.all([c in enr_prod_ch.columns for c in ['Wind', 'Solar', 'Waste', 'Biogas', 'Sewage_gas', 'Biomass_1_crops', 'Biomass_2_waste', 'Hydro_Pumped_Storage', 'Hydro_Pumpage']]) # Adapt the dataframe to the right format enr_prod_ch = enr_prod_ch.loc[start + pd.Timedelta('1H'):end + pd.Timedelta('1H')] / 1000 # Convert from kWh to MWh name_map = { 'Wind': 'Wind_Onshore_CH', 'Solar': 'Solar_CH', 'Waste': 'Waste_CH', 'Hydro_Pumped_Storage': 'Hydro_Pumped_Storage_CH', 'Hydro_Pumpage': 'Hydro_Pumpage_CH' } enr_prod_ch['Biomass_CH'] = enr_prod_ch['Biomass_1_crops'] + enr_prod_ch['Biomass_2_waste'] + enr_prod_ch[ 'Biogas'] + enr_prod_ch['Sewage_gas'] enr_prod_ch.drop(columns=['Biomass_1_crops', 'Biomass_2_waste', 'Biogas', 'Sewage_gas'], inplace=True) enr_prod_ch.rename(columns=name_map, inplace=True) enr_prod_ch['Hydro_Pumpage_CH'] = -enr_prod_ch['Hydro_Pumpage_CH'] # Negative values -> consumption enr_prod_ch.index = enr_prod_ch.index - pd.Timedelta('1H') # Shift the index to the left # Resample the dataframe to the right frequency (and sum the production values) enr_prod_ch = enr_prod_ch.resample(freq).sum() return enr_prod_ch
# + ########################### # ########################## # Read OFEN pdf files # ########################## # ########################## # -
[docs] def split_cell(cell, index): """ Utility function to split a cell of a table read from tabula """ if np.isreal(cell): return 0 sp = cell.split(' ') if len(sp) <= index: return 0 val = sp[index] return val
[docs] def post_process_2017(columns): """ Helper to fix the 2017 data read from the OFEN pdf file """ # Add missing first line l1dates = ['18.1.2017', '21.1.2017', '22.1.2017', '15.2.2017', '18.2.2017', '19.2.2017', '15.3.2017', '18.3.2017', '19.3.2017', '19.4.2017', '22.4.2017', '23.4.2017'] for i in range(0, len(l1dates)): columns[i].insert(0, l1dates[i]) return columns
[docs] def post_process_2022(columns): """ Helper to fix the 2022 data read from the OFEN pdf file """ # Remove an empty line for i in range(len(columns)): columns[i].pop(19) # Add missing first line l1dates = ['19.1.2022', '22.1.2022', '23.1.2022', '16.2.2022', '19.2.2022', '20.2.2022', '16.3.2022', '19.3.2022', '20.3.2022', '20.4.2022', '23.4.2022', '24.4.2022'] for i in range(0, len(l1dates)): columns[i].insert(0, l1dates[i]) # Last two lines are missing in 2022 lm2 = ['9.1', '-', '-', '14.5', '-', '-', '9.5', '-', '-', '18.7', '-', '-'] lm1 = ['160.1', '-', '-', '162.9', '-', '-', '179.4', '-', '-', '193.9', '-', '-'] for i in range(len(columns)): columns[i].append(lm2[i]) columns[i].append(lm1[i]) return columns
[docs] def post_process_2023(columns): """ Helper to fix the 2023 data read from the OFEN pdf file """ # Last line is missing in 2023 lm1 = ['157,6', '-', '-', '174,9', '-', '-', '183,8', '-', '-', '195,4', '-', '-'] for i in range(len(columns)): columns[i].append(lm1[i]) return columns
[docs] def post_process_2024(columns): """ Helper to fix the 2024 data read from the OFEN pdf file """ # Add missing first and second lanes date l1dates = ['17.1.2024', '20.1.2024', '21.1.2024', '21.2.2024', '24.2.2024', '25.2.2024', '20.3.2024', '23.3.2024', '24.3.2024', '17.4.2024', '20.4.2024', '21.4.2024'] l2dates = ['15.5.2024', '18.5.2024', '19.5.2024', '19.6.2024', '22.6.2024', '23.6.2024', '17.7.2024', '20.7.2024', '21.7.2024', '21.8.2024', '24.8.2024', '25.8.2024'] for i in range(0, len(l1dates)): columns[i].insert(0, l1dates[i]) columns[i].insert(11, l2dates[i]) columns[i] = list(pd.Series(columns[i]).dropna()) columns[i] = [x for x in columns[i] if x != 0] # Add missing last 2 lanes lm1 = ['4,8', '–', '–', '26,3', '–', '–', '7,8', '–', '–', '4,5', '–', '–'] lm2 = ['161,3', '–', '–', '165,9', '–', '–', '198,1', '–', '–', '197,2', '–', '–'] for i in range(len(columns)): columns[i].append(lm1[i]) columns[i].append(lm2[i]) return columns
[docs] def read_ofen_pdf_file(file, post_process_fun, page=31): """ Reads an ofen pdf file and extracts a dictionary of typical days with their electricity mix. Supports years from 2017 to 2024. Not tested after. A post-processing function should be provided to fix the data read from the pdf file. This function depends on the year of the data because the format of the pdf file changes between years. Four post-processing functions are provided above for 2017, 2022, 2023 and 2024. Parameters ---------- file : str Path to the pdf file to read post_process_fun : function Function to apply to the data read from the pdf file Takes a list of columns as input and returns the modified list of columns page : int Page of the pdf file to read (default: 31) """ print('Reading', file) # Reconstruct all columns (some of them are merged by tabula) # Tested with 2017, 2022, 2023 and 2024 # This should work for 2018 and following years columns = [] if file.endswith('23.pdf'): tables = tabula.read_pdf(file, pages=page, stream=True) table = tables[0].drop(columns=['2023: Monat', 'Unnamed: 0', 'Unnamed: 2', 'Unnamed: 4', 'Unnamed: 6', 'Unnamed: 8', '2023: Mois']) table = table.iloc[1:].reset_index(drop=True) mapping = table.columns for i in range(len(mapping)): if i in [1,3,5,7]: ### Columns to split c = table[mapping[i]].tolist() c1 = [split_cell(s, 0) for s in c] c2 = [split_cell(s, 1) for s in c] for cell in [0,11,11,11,22,22,22]: c1.pop(cell) c2.pop(cell) columns.append(c1) columns.append(c2) else: ### Columns already usable c = table[mapping[i]].tolist() for cell in [0,11,11,11,22,22,22]: c.pop(cell) columns.append(c) elif file.endswith('24.pdf'): page = 30 tables = tabula.read_pdf(file, pages=page, stream=True) table = tables[0] header_row = pd.DataFrame([table.columns], columns=table.columns) table = pd.concat([header_row, table], ignore_index=True) table.columns = range(table.shape[1]) table = table.drop(columns=[0, 3, 6, 9, 12, 13]) mapping = table.columns columns = [] for i in range(len(mapping)): if i in [0, 2, 4, 6]: ### Columns to split c = table[mapping[i]].tolist() c1 = [split_cell(s, 0) for s in c] c2 = [split_cell(s, 1) for s in c] for cell in [11, 11, 11, 10, 21, 21, 21]: c1.pop(cell) c2.pop(cell) columns.append(c1) columns.append(c2) else: ### Columns already usable c = table[mapping[i]].tolist() for cell in [11, 11, 11, 10, 21, 21, 21]: c.pop(cell) columns.append(c) else: tables = tabula.read_pdf(file, pages=page, stream=True) table = tables[0] mapping = table.columns for i in range(len(mapping)): if i in [1,4,7,10]: ### Columns to split c = table[mapping[i]].tolist() c.insert(0, mapping[i]) columns.append([split_cell(s, 0) for s in c]) columns.append([split_cell(s, 1) for s in c]) elif i in [2,5,8,11]: ### Columns already usable c = table[mapping[i]].tolist() c.insert(0, mapping[i]) columns.append(c) # Apply custom post-processing depending on the year columns = post_process_fun(columns) # Complete all days from the table data days = {} if file.endswith('23.pdf') or file.endswith('24.pdf'): index = 0 for column in columns: days[datetime.strptime(column[index], "%d.%m.%Y").strftime("%Y-%m-%d")] = column[ index + 1:index + 11] index = 11 for column in columns: days[datetime.strptime(column[index], "%d.%m.%Y").strftime("%Y-%m-%d")] = column[ index + 1:index + 11] index = 22 for column in columns: days[datetime.strptime(column[index], "%d.%m.%Y").strftime("%Y-%m-%d")] = column[ index + 1:index + 11] else: index = 0 for column in columns: days[datetime.strptime(column[index], "%d.%m.%Y").strftime("%Y-%m-%d")] = column[ index + 1:index + 11] index = 14 for column in columns: days[datetime.strptime(column[index], "%d.%m.%Y").strftime("%Y-%m-%d")] = column[ index + 1:index + 11] index = 28 for column in columns: days[datetime.strptime(column[index], "%d.%m.%Y").strftime("%Y-%m-%d")] = column[ index + 1:index + 11] # return the data return days