Source code for ecodynelec.dynamic_impact

"""
This module defines functions to compute the dynamic environmental impact of Hydro Pumped
Storage energy, as well as the dynamic shares of storage inflows, outflows, and losses.

The `dynamic_impact` function calculates production and mix-related impact at an hourly
granularity, adjusting for the dynamic usage of storage systems across multiple countries.
The function also updates the impact for global and individual country levels.

The `dynamic_storage_shares` function determines the dynamics of storage utilization
(e.g., natural and pumped inflows, production, losses), based on storage data and energy
flow information.

"""
import pandas as pd
import os

[docs] def dynamic_impact(prod_imp : dict, mix_imp : dict, flows : pd.DataFrame, prod_mix : dict, mix_dict : dict, impact_matrix : pd.DataFrame, network_impact : dict, step_imp_memory : dict, parameter, is_verbose=False) -> (dict,dict): """ Computes the dynamic environmental impacts of storage for given production and mix data within a time series. Adjusts the impact values dynamically based on storage factors such as natural inflow and pumpage inflow percentages, and applies this computation across specified countries and a global scope. Parameters ---------- prod_imp : dict A dictionary containing production impact data with hierarchical structures per country and by energy technology types. mix_imp : dict A dictionary containing environmental mix impact data with hierarchical structures per country and by energy technology types. flows : pd.DataFrame A DataFrame containing raw production and consumption data for each technologies in country. prod_mix : dict A dictionary that provides data regarding production mixes for energy technologies per country. mix_dict : dict A dictionary containing mix-related data used for adjusting mix impacts per country. impact_matrix : pd.DataFrame A DataFrame representing the initial impact values per technology type for dynamic computation adjustments. step_imp_memory : dict Dictionnary containing the last STEP impact of the last year parameter : class Class encapsulating various settings, including parameters for interpolated storage, start and end times, and storage path configuration. is_verbose : bool, optional A flag indicating whether detailed information should be logged during the computation process. Default is False. Returns ------- prod_imp : dict Updated dictionary for production impacts after applying the dynamic storage impact computation. mix_imp : dict Updated dictionary for mix environmental impacts after applying the dynamic storage impact computation. step_imp : dict Dictionnary containing the last STEP impact of the last year """ if parameter.interpolated_stock: # Load storage data from Energy Charts if parameter.storage_path is None: raise ValueError("Storage path is not set. Please set the storage directory path in the config file.") storage = load_concat_files(parameter.storage_path, is_verbose=is_verbose) storage = interpolate_storage_hourly(storage) storage = storage.loc[parameter.start:parameter.end] storage_mode = 'interpolated' else: # No storage data interpolated -> no storage data storage = None storage_mode = 'flow-based' # Determine storage shares : Natural inflow % / Pumpage inflow % in the stock storage_shares = dynamic_storage_shares(storage, flows, storage_mode, is_verbose=is_verbose) # Initial conditions if not step_imp_memory: step_imp = impact_matrix.loc['Hydro_Pumped_Storage_CH'].to_dict() step_imp = {k: [v] for k, v in step_imp.items()} else: step_imp = step_imp_memory[parameter.start.year-1] # to get 2023 data if we are in 2024 for example step_imp = {k: [v] for k, v in step_imp.items()} # Determine the dynamic impact of STEP and apply it to all the countries for k in prod_imp['CH'].keys(): if k == 'Global': continue else: # Remove the Hydro_Pumped_Storage_CH from columns to avoid counting it twice. mix = mix_imp['CH'][k].drop(columns=['Hydro_Pumped_Storage_CH'], inplace=False) # Hourly Dynamic impact computation for i in range(1, len(mix.index)): t = mix.index[i] if storage_mode == "interpolated": # Pumping factor fp = storage_shares['turbine consumption'].iloc[i]/storage_shares['SP'].iloc[i] # Dynamic impact computing step_imp[k].append(fp*mix.loc[t].sum() + (1-fp)*step_imp[k][i-1]) elif storage_mode == "flow-based": # Dynamic impact computing without storage data step_imp[k].append(mix.loc[t].sum()) # Apply dynamic impact to all countries for country in prod_imp.keys(): # Add STEP impact to 'Carbon intensity', 'Human carcinogeric toxicity' ... if country == 'CH': prod_imp[country][k]['Hydro_Pumped_Storage_CH'] = prod_mix[country]['Hydro_Pumped_Storage_CH']*step_imp[k] + prod_mix[country]['Hydro_Pumped_Storage_CH']*network_impact['CH']['Infra PHS'][k] # prod_imp is a dictionary concerning only the production technology of the country in question mix_imp[country][k]['Hydro_Pumped_Storage_CH'] = mix_dict[country]['Hydro_Pumped_Storage_CH']*step_imp[k] + mix_dict[country]['Hydro_Pumped_Storage_CH']*network_impact['CH']['Infra PHS'][k] # Recalculate 'Global' impact with STEP impact added prod_imp[country]['Global'][k] = prod_imp[country][k].sum(axis=1) mix_imp[country]['Global'][k] = mix_imp[country][k].sum(axis=1) step_imp[k] = step_imp[k][-1] # keep in memory the last impact of each category return prod_imp, mix_imp, step_imp
[docs] def dynamic_storage_shares(storage : pd.DataFrame, flows : pd.DataFrame, storage_mode : str, is_verbose : bool ) -> pd.DataFrame: """ Calculate dynamic storage shares based on provided storage data, flows, and production mix. This function computes the dynamic shares of storage considering turbine production, turbine consumption, natural pumping, and overflow mechanisms depending on the specified storage mode. It also tracks the losses and categorizes storage into natural and pumped categories over each time step. The computation begins with initial conditions and iteratively updates the shares for each subsequent time step. Parameters ---------- storage : pd.DataFrame DataFrame containing information about storage levels and maxima. If None, the index will be derived from the production mix. flows : pd.DataFrame A DataFrame containing raw production and consumption data for each technologies in country. storage_mode : str Mode of storage calculation. It supports two values: - 'interpolated': Storage levels are calculated with respect to maximum storage values. - 'flow-based': Assumes no overflow, with constant storage levels. is_verbose : bool Flag to enable or disable detailed verbosity during the computation process. Returns ------- pd.DataFrame DataFrame containing time-series of computed storage shares. Columns include: - 'sj': Current storage level - 'sjj': Previous storage level - 'turbine production': Turbine outflow (production from storage) - 'turbine consumption': Turbine inflow (consumption into storage) - 'overflow': Overflow due to exceeding maxima - 'natural pumping': Residual balance of natural pumping - 'losses': Total losses including turbine production and overflow - 'SN': Share of natural inflows - 'SP': Share of pumping inflows """ # Define the index to use for storage shares if storage is None: index_to_use = flows.index else: index_to_use = storage.index storage_shares = pd.DataFrame(index=index_to_use) # --- Storage level : adapted method for storage_mode --- if storage_mode == 'interpolated': canton_cols = [c for c in storage.columns if c.lower() != 'storage max'] sj_raw = storage[canton_cols].sum(axis=1) storage_max_gwh = storage['storage max'] overflow_mech = (sj_raw - storage_max_gwh).clip(lower=0) storage_shares['sj'] = sj_raw.clip(upper=storage_max_gwh) # determine storage level at j storage_shares['sjj'] = storage_shares['sj'].shift(1) # determine storage level at j-1 elif storage_mode == 'flow-based': overflow_mech = pd.Series(0.0, index=index_to_use) # hypothesis : no overflow, infinite storage constant_stock_series = pd.Series(1.0, index=index_to_use) # hypothesis : constant stock storage_shares['sj'] = constant_stock_series storage_shares['sjj'] = constant_stock_series # --- Turbine flows --- flow = flows.copy() inflow = abs(flow['Hydro_Pumpage_CH']) # STEP consumption, abs to have positive values outflow = flow['Hydro_Pumped_Storage_CH'] # STEP production storage_shares['turbine production'] = outflow storage_shares['turbine consumption'] = inflow # --- Natural pumping (residual balance term) --- natural_pumping = storage_shares['sj'] - storage_shares['sjj'] + storage_shares['turbine production'] - storage_shares['turbine consumption'] storage_shares['overflow'] = overflow_mech + natural_pumping.clip(upper=0).abs() storage_shares['natural pumping'] = natural_pumping.clip(lower=0) # --- Retirements --- storage_shares['losses'] = storage_shares['turbine production'] + storage_shares['overflow'] # --- Initial conditions --- share_pump = 0.24 SN = [(1 - share_pump) * storage_shares['sj'].iloc[0]] # Share of natural inflows at j = 0 SP = [share_pump * storage_shares['sj'].iloc[0]] # Share of pumping inflows at j = 0 # --- Share of storage without losses --- for k in range(1, len(storage_shares)): S_tilde = SN[k - 1] + SP[k - 1] + storage_shares['natural pumping'].iloc[k] + storage_shares['turbine consumption'].iloc[k] S_N_tilde = SN[k - 1] + storage_shares['natural pumping'].iloc[k] S_P_tilde = SP[k - 1] + storage_shares['turbine consumption'].iloc[k] R = storage_shares['losses'].iloc[k] # --- Natural share --- if S_tilde <= 0: fN = 0 else: fN = S_N_tilde / S_tilde SN.append(S_N_tilde - fN * R) SP.append(S_P_tilde - (1 - fN) * R) storage_shares['SN'] = SN storage_shares['SP'] = SP return storage_shares
[docs] def interpolate_storage_hourly(df): """ Interpolates storage data to obtain hourly values. Parameters ---------- df : DataFrame with DatetimeIndex and numeric columns Returns ------- DataFrame with hourly frequency and interpolated values """ if len(df) == 0: return df # Resample to daily frequency df_hourly = df.resample('h').asfreq() # Interpolate missing values df_hourly = df_hourly.interpolate(method='time') return df_hourly
[docs] def load_concat_files(folder_path, is_verbose=False): """ Concatenates all CSV files found in the specified folder. Parameters ---------- folder_path : str Path to the directory containing the yearly CSV files. is_verbose : bool, optional If True, prints each file being read and raises errors when files are missing. Default is False. Returns ------- df : DataFrame with DatetimeIndex """ # Configuration for each dataset type dataset_config = { 'storage': { 'prefix': 'Storage', 'skiprows': [1], 'date_col': 'Date (TC+1)', 'rename_cols': ['Valais', 'Grisons', 'Tessin', 'Reste de la Suisse', 'storage max'], 'convert': 1e6 } } results = {} for dataset_name, config in dataset_config.items(): # Get list of files matching the prefix # Determine the path to search in search_path = os.path.join(folder_path, config['folder']) if 'folder' in config else folder_path files = sorted([f for f in os.listdir(search_path) if f.startswith(config['prefix'])]) # AJOUTEZ CE BLOC POUR LE DÉBUG if not files: raise FileNotFoundError(f"No files found starting with '{config['prefix']}' in directory: {search_path}") # FIN DU BLOC DE DÉBUG if is_verbose: print(f"Loading {dataset_name}: {len(files)} files found") # Load and concatenate files dfs = [] for f in files: if is_verbose: print(f" Reading: {f}") skiprows = config.get('skiprows', None) df = pd.read_csv(os.path.join(search_path, f), skiprows=skiprows) dfs.append(df) # Concatenate all dataframes if len(dfs) > 1: df_concat = pd.concat(dfs, ignore_index=True) else: df_concat = dfs[0] # Process datetime index date_col = config['date_col'] df_concat[date_col] = pd.to_datetime(df_concat[date_col], utc=True).dt.tz_localize(None) df_concat = df_concat.set_index(date_col).sort_index() # Rename columns if specified if 'rename_cols' in config: df_concat.columns = config['rename_cols'] # Apply conversion if specified if 'convert' in config: df_concat = df_concat * config['convert'] if is_verbose: print(f" Loaded {len(df_concat)} rows") return df_concat