"""
Collection of functions to perform a data autocomplete
"""
###############################
###############################
### IMPORTS
###
from itertools import groupby
import numpy as np
import pandas as pd
###############################
###############################
### PILOTE FUNCTION
###
[docs]
def autocomplete(data:dict, n_hours:int=2, days_around:int=7, daytype_only:bool=False,
limit:float=.3, ignore:bool=False, is_verbose:bool=False):
"""
Main function to auto-complete the data. Works with generation and import.
Parameters
----------
data: dict
the dict of data to auto-complete.
n_hours: int, default to 2
max number of hours missing in a row to consider a
short gap and use linear interpolation.
days_around: int, default to 7
number of days before and after a long gap to be used
when creating an average day to complete the gap.
daytype_only: bool, default is False
fills long gap using an average day build only with
days of similar type (weekday, Saturday, Sunday)
limit: float, default to 0.3
max relative size of gap to allow an autocomplete. If a
gap is longer than this fraction of the data, it will be
filled with zeros.
ignore: bool, default is False
the missing data is flagged but not auto-completed. Displays
a report if `is_verbose` is set to True.
is_verbose: bool, default is False
to display information during the process.
Returns
-------
dict
dict of data with autocompleted information
pandas.DataFrame
pandas DataFrame with resolutions
"""
if is_verbose: print(f'Autocomplete...'+" "*15)
### ESTIMATE RESOLUTION
resolution = infer_resolution(data)
### RESHAPE THE DATA
new_data = {c: {field: to_original_series(data[c].loc[:,field],
freq=resolution.loc[field,c])
for field in data[c].columns}
for c in data}
### IDENTIFY DATA GAPS
all_gaps = find_missing(new_data)
if ignore:
datasize = {c: {k: new_data[c][k].shape[0] for k in new_data[c]} for c in new_data}
if is_verbose: report_missing(all_gaps, datasize)
### SET DATA BACK TO THEIR ORIGINAL FORMAT
new_data = {c: pd.DataFrame({field: new_data[c][field]
for field in new_data[c]})
for c in new_data}
return new_data, resolution # return 'new_data'
### REDUCE SELECTION TO LONG GAPS
long_thresholds = set_thresholds(new_data, resolution, n_hours=n_hours)
lengths = set_lengths(new_data) # Compute lengths
excess_thresholds = {c :{k: int(limit*lengths[c][k])
for k in lengths[c]}
for c in lengths}
long_gaps = sort_gaps(all_gaps, lower=long_thresholds,
upper=excess_thresholds, lengths=lengths) # Sort gaps for long
excess_gaps = sort_gaps(all_gaps, lower=excess_thresholds,
lengths=lengths) # Sort gaps for excess
### SPLIT LONG GAPS INTO DAYS
if daytype_only: #Modify the condition later
long_gaps = {c: {field: longs_into_days(long_gaps[c][field], indexes=data[c].loc[:,field].index, n_hours=n_hours)
for field in long_gaps[c]} for c in long_gaps}
### FILL LONG GAPS
deltas = set_deltas(new_data, resolution, days_around=days_around)
new_data = fill_all_periods(new_data, period_indexes=long_gaps,
deltas=deltas, daytype_only=daytype_only, is_verbose=is_verbose)
### FILL GAPS TO SKIP WITH ZEROS
new_data = fill_all_excess(new_data, period_indexes=excess_gaps)
### SET DATA BACK TO THEIR ORIGINAL FORMAT
new_data = {c: pd.DataFrame({field: new_data[c][field]
for field in new_data[c]})
for c in new_data}
### FILL SHORT GAPS AND RETURN
new_data = fill_occasional(new_data)
return new_data, resolution
###############################
###############################
### HELPER FUNCTIONS
###
[docs]
def infer_resolution(data:dict):
"""Infers the resolution of all fields for all countries"""
resolution = pd.DataFrame({c: {k: infer_one( data[c].loc[:,k].dropna(axis=0).index)
for k in data[c].columns} # All still dataframes
for c in data})
if not all(resolution.index.str.len()==2): # Generation
return resolution.fillna(method='ffill').fillna(method='bfill')
else: return resolution
[docs]
def infer_one(obj):
"""Infer frequency for one single time Series"""
if len(obj)>3:
freq = pd.infer_freq(obj) # Use built-in pandas
else: # Avoid issue when not enough data, just assume a 15min step.
freq='15T'
if freq is not None: # but function is not robust
return freq # at all...
### Back-up plan is to infer manually (smallest delta)
components = {'15T':lambda x: getattr(x,'minutes')==15,
'30T':lambda x: getattr(x,'minutes')==30,
'H':lambda x: getattr(x,'hours')==1,
'D':lambda x: getattr(x,'days')==1,} # Possible frequencies
tdelta = pd.Timedelta( np.diff(obj).min() ) # Shortest time delta between indexes
# Identify the corresponding frequency (day, hour, 30min, 15min)
freqs = pd.Series({k:components[k](tdelta.components) for k in components})
return freqs.idxmax() # Get the index (frequency) of max (1st True or 15T)
[docs]
def get_steps_per_hour(freq, dtype=int):
"""Retrieve resolution for a specific country and field.
Parameters
----------
freq: str
the base frequency of the time series
dtype: data-type, default to `int`
the type of return. Default behavior returns an integer,
i.e. zero when the frequency is lower than an hour. It may
be convenient to sometimes return a fraction instead, using float.
Returns
-------
dtype
the number of time steps per hour to expect in a time series.
"""
### Make sure it starts with a number
if not np.any([freq.startswith(k) for k in '0123456789']): # If starts with a letter
frequency = f"1{freq}"
else: frequency = freq
return dtype( pd.Timedelta('1H') / pd.Timedelta(frequency) ) # Return nb of steps per hour
[docs]
def set_lengths(data:dict):
"""
Compute the length of each subcategory for each country
"""
return {c: {k: data[c][k].shape[0]
for k in data[c]}
for c in data}
[docs]
def set_deltas(data:dict, resolution, days_around:int):
"""
Compute the deltas of each subcategory for each country
for the creation of typical days.
"""
return {c: {k: (days_around*24)*get_steps_per_hour(freq=resolution.loc[k,c])
for k in data[c]}
for c in data}
[docs]
def set_thresholds(data:dict, resolution, n_hours:int):
"""
Compute the thresholds of each subcategory for each country
for the flagging of long gaps.
"""
return {c: {k: n_hours*get_steps_per_hour(freq=resolution.loc[k,c])
for k in data[c]}
for c in data}
[docs]
def to_original_series(obj, freq):
"""
Scale data back to original resolution. Applicable to pandas Series only.
"""
if not isinstance(obj, pd.core.series.Series): # Test on type
raise TypeError(f"Only series are expected. {type(obj)} object was passed.")
### CONVERT DATA MW -> MWH BEFORE AUTO-COMPLETING (as the frequency is infered)
return obj.resample(freq).asfreq() # Resample with original frequency
###############################
###############################
### IDENTIFICATION OF GAPS
###
[docs]
def find_missing(data:dict):
"""
Identifies the missing values for the entire set of data.
Parameters
-----------
data: dict of pandas DataFrames
the data to process
Returns
--------
dict (keys are countries) of dicts (keys are former columns)
of matrices. Final matrix has one identified gap per row and
three columns (length of gap, first..., and last index of gap)
"""
return {c: {k: find_missing_one( series=data[c][k] ) # Find missing values
for k in data[c]} # Iterate for every sub-category
for c in data} # Iterate for every country
[docs]
def find_missing_one(series):
"""
Identifies all missing values for one single series.
Parameters
-----------
series: pandas Series
the data to process
Returns
--------
Matrix (n x 3). Final matrix has one identified gap
per row (n rows) and three columns (length of gap,
first..., and last index of gap)
"""
### Identify if data point is NaN or not
vecNan = np.isnan(series.to_numpy())
### Count the isna() similar values in a row (either False or True)
count_series = np.array([(x,len(list(y))) for x,y in groupby(vecNan)])
if count_series[0][0]==1: # Correction if first data is missing
count_series = np.concatenate([[[0,0]],count_series])
if count_series[-1][0]==0: # Correcti0n if last data is available
count_series = count_series[:-1]
### Accumulating the sum gives you pairs of
### (1st idx - last idx) for missing values
cum_series = count_series[:,1].cumsum()
### Compute the length of each gap
len_gaps = cum_series[1::2] - cum_series[::2]
### Gather results in a table (col1: length, col2: start index, col3: end index)
return np.concatenate([[len_gaps], cum_series.reshape(cum_series.shape[0]//2,2).T, ], axis=0).T.astype('int32')
###############################
###############################
### SORTING OF GAPS
###
[docs]
def sort_gaps(gaps:dict, lower:dict, lengths:dict, upper:dict=None):
"""
Identify long gaps (above threshold).
Needs the length of data for specific processes
"""
if upper is None:
upper = {c: {k: None for k in gaps[c]} for c in gaps}
return {c: {k: select_long_gaps(gaps[c][k], name=k,
lower=lower[c][k],
upper=upper[c][k],
length=lengths[c][k])
for k in gaps[c]}
for c in gaps}
[docs]
def select_long_gaps(gaps, name, lower, upper, length):
"""
Identify long gaps for one subcategory of one country
with one unique threshold. Can make exception with
some cases, e.g. solar at the extremes of dataset.
"""
### Select all longer than threshold
if upper is not None:
long_gaps = gaps[np.logical_and(upper>gaps[:,0], gaps[:,0]>lower)]
else:
long_gaps = gaps[gaps[:,0]>lower]
### Handle specific gaps
if ((gaps.shape[0]>0)&(upper is not None)):
long_gaps = add_specific_gaps(gaps, name, length, long_gaps)
return long_gaps
[docs]
def add_specific_gaps(all_gaps, name, length, long_gaps):
### Specific to solar data
# At the start
if np.logical_and.reduce([name == 'Solar', # If solar
all_gaps[0,1]==0, # If gap at the start
all_gaps[0].tolist() not in long_gaps.tolist()]): # If not already long gap
long_gaps = np.concatenate( [all_gaps[[0]],long_gaps], axis=0 )
# At the end
if np.logical_and.reduce([name == 'Solar', # If solar
all_gaps[-1,2]==length, # If gap at the end
all_gaps[-1].tolist() not in long_gaps.tolist()]): # If not already long gap
long_gaps = np.concatenate( [long_gaps, all_gaps[[-1]] ], axis=0 )
return long_gaps
[docs]
def longs_into_days(gaps, indexes, n_hours=2):
new_set = []
for gap in gaps:
idx = indexes[gap[1]:gap[2]] # Get the dates
incl = np.unique(idx.dayofyear) # Days in that gap
if len(incl)==1: # Single day
new_set.append(gap)
else: # Multiple days
for d in incl: # add subsection per day if more than n_hours steps (or sure it is "short")
subidx = np.argwhere(idx.dayofyear==d).ravel()+gap[1]
if len(subidx)>n_hours: new_set.append([len(subidx), subidx[0], subidx[-1]])
return np.array(new_set)
###############################
###############################
### COMPLETE GAPS
###
[docs]
def fill_all_periods(data:dict, period_indexes:np.ndarray, deltas:dict, daytype_only:bool=False, is_verbose:bool=False):
"""Fills all long gaps.
Parameters
----------
data: dict
collection of data, with structure being `{country: { unit: pandas.Series } }`
period_indexes: numpy.ndarray
matrix indicating the location and length of long gaps
deltas: dict
collection of number of time steps to create the average days around gaps.
Structure is `{country: {unit: {gap_id: int} } }`.
daytype_only: bool
uses an average day build only with days of similar type (weekday, Saturday, Sunday)
is_verbose: bool, default to False
to display information.
"""
for i,c in enumerate(data): # For all countries
### Fill the periods
for j,k in enumerate(data[c]): # For all elements of each country
if is_verbose: print(f"\t{c} ({i+1:02d}/{len(data):02d}); field {j+1:02d}/{len(data[c]):02d})"+" "*10, end='\r')
data[c][k] = fill_one_series(data[c][k], period_indexes[c][k], delta=deltas[c][k],
daytype_only=daytype_only) # Fill the data
if is_verbose: print("\tCompleted."+" "*30)
return data
[docs]
def fill_one_series(data, period_indexes, delta, daytype_only=False):
"""Fills all long gaps for one single series in one country"""
filled = data.copy()
for gap in period_indexes:
### Create Average Day
if daytype_only:
avg_day = (reduce_to_daytype(filled.iloc[max(0, gap[1]-delta) : min(gap[2]+delta, filled.shape[0])],
weekday=filled.index[gap[1]].dayofweek)
.groupby(lambda x: x.strftime('%H:%M')).mean())
else:
avg_day = filled.iloc[max(0, gap[1]-delta) : min(gap[2]+delta, filled.shape[0])].groupby(lambda x: x.strftime('%H:%M')).mean()
### Fill the period
filled.iloc[gap[1]:gap[2]] = fill_one_period(avg_day, to_fill=filled.iloc[gap[1]:gap[2]])
return filled
[docs]
def fill_one_period(avg_day, to_fill):
"""Fills one single long gap using one average day."""
filled = pd.Series(None, index=to_fill.index, name=to_fill.name, dtype='float32')
for t in avg_day.index:
filled.loc[filled.index.strftime('%H:%M')==t] = avg_day.loc[t]
return filled
[docs]
def fill_all_excess(data:dict, period_indexes:dict):
"""Fills with zeros the fields that were skipped"""
for i,c in enumerate(data): # For all countries
### Fill the periods
for j,k in enumerate(data[c]): # For all elements of each country
for gap in period_indexes[c][k]:
data[c][k].iloc[gap[1]:gap[2]] = data[c][k].iloc[gap[1]:gap[2]].fillna(0) # Write 0 where gaps
return data
[docs]
def fill_occasional(data:dict):
"""Fills short gaps of data with linear interpolation."""
return {c: data[c].interpolate(method='linear', limit_direction='both') for c in data}
[docs]
def reduce_to_daytype(data, weekday):
if weekday<5: # Regular week day
return data.loc[data.index.dayofweek<5]
else: # Saturday only or Sunday only
return data.loc[data.index.dayofweek==weekday]
##########################
### SKIP AUTO-COMPLETE
###
[docs]
def report_missing(gaps:dict, datasizes:dict):
"""Count and display missings"""
displaying = {}
for c in gaps:
displaying[c] = {}
for k in gaps[c]:
if 0 in gaps[c][k].shape: # empty
displaying[c][k]=0
else:
displaying[c][k]=gaps[c][k][:,0].sum()
displaying = pd.DataFrame.from_dict(displaying).fillna(0).astype(int)
miss,size = displaying.values.sum(), pd.DataFrame(datasizes).fillna(0).values.sum()
print("="*25)
print(f"Missing data identified: {miss} ({100*miss/size:.2f}%)")
print(displaying.replace(0,'-'))
print("="*25)
return