Source code for ecodynelec.preprocessing.downloading

"""Module reaching ENTSO-E server and downloading data"""

import os
from datetime import datetime, timedelta
from functools import partial
from getpass import getpass
from time import time

import pandas as pd
import paramiko

from ecodynelec.parameter import Parameter
from ecodynelec.progress_info import ProgressInfo


# +

#################
#################
# ## Download
##############

# -


[docs] def download(config, threshold_minutes=15, threshold_size=.9, is_verbose=False): """Downloads data from ENTSO-E servers and save it. Parameters ---------- config: ecodynelec.Parameter collection of parameters for the execution of ecodynelec. The relevant information is the start and end date, as well as server information and path information to save raw_generation and raw_exchange. thershold_minutes: int, default to 15 time in minutes. Maximum time between last download and last remote unpdate to not download the file. threshold_size: float, default to 0.9 minimum ratio of size difference between local and remote file below which to download, if the last download is newer than `threshold_minutes`. is_verbose: bool, default to False to display information during the download Returns ------- None """ t0 = time() ### Verify the configuration if isinstance(config, str): if any([config.endswith(k) for k in ('.xlsx', '.xls', '.ods')]): config = Parameter(excel=config) else: raise NotImplementedError(f"File extension for {config} is not supported.") ### Get the start and end dates dates = _set_time(config.start, config.end) ### Decide on the files to download file_list = {k: _get_file_list(*dates, getattr(config.server, f'_remote{k}Dir'), getattr(config.server, f'_name{k}File')) for k in ['Generation', 'Exchanges']} ### Point to the saving locations save_list = {k: _get_file_list(*dates, getattr(config.path, k.lower()), getattr(config.server, f'_name{k}File')) for k in file_list} ### Clear directories if config.server.removeUnused: _remove_olds(config.path.generation, save_list['Generation']) # Remove unused generation _remove_olds(config.path.exchanges, save_list['Exchanges']) # Remove unused exchanges ### Download files _reach_server(config.server, files=file_list, savepaths=save_list, is_verbose=is_verbose, threshold_minutes=threshold_minutes, threshold_size=threshold_size) ### EOF if is_verbose: print(f"\tDownload from server: {time() - t0:.2f} sec" + " " * 40) return
# + ################# ################# # ## Reach Server ############## # - def _reach_server(server_info, files, savepaths, threshold_minutes=15, threshold_size=.9, is_verbose=False, progress_bar: ProgressInfo = None): """Function establishing the connection with the server using credentials , collecting files and saving them. Nothing is returned. """ ### Create the connection if is_verbose: print("Connection...", end='\r') if progress_bar: progress_bar.set_sub_label('Connection...') ### Get the username, if required if server_info.username in [None, '']: server_info.username = input("Username: ") ### Connexion loop password = server_info.password # Get the password from parameters if server_info.password is None: password = _manage_password() is_valid, safety_loop = False, 1 while not is_valid: transport = paramiko.Transport((server_info.host, server_info.port)) try: transport.connect(username=server_info.username, password=password) is_valid = True # Success except paramiko.AuthenticationException as authentication_error: if safety_loop == 0: message = "Connection failed. Your password may be outdated. " message += "Please verify by logging in at https://transparency.entsoe.eu/" print(message) raise paramiko.AuthenticationException(message) else: print("Error in Password or Username. Connection failed.") safety_loop -= 1 # One chance less transport.close() # Close the transport # del transport password = _manage_password() # New try for password ### Create a data pipe between local and remote if is_verbose: print("Create pipe...", end='\r') if progress_bar: progress_bar.set_sub_label('Create pipe...') dl_bar = ProgressInfo(color='green') dl_bar.hide() else: dl_bar = None with paramiko.SFTPClient.from_transport(transport) as sftp: ### Download all files for categ in files: # Generation then Exchanges for i, (remote, local) in enumerate(zip(files[categ], savepaths[categ])): # For each file try: if progress_bar: progress_bar.set_sub_label(f'Check file: {categ} {i + 1}/{len(files[categ])}') if not _should_download(sftp, remote, local, threshold_minutes, threshold_size): continue; if dl_bar: dl_bar.show() dl_bar.progress(local, 0) callback_fct = None if is_verbose or progress_bar is not None: callback_fct = partial(_progressBar, info=f"{categ} {i + 1}/{len(files[categ])}", is_verbose=is_verbose, progress_bar=dl_bar) sftp.get(remotepath=remote, localpath=local, callback=callback_fct) except FileNotFoundError as e: print(f"ERROR: File {remote} (local: {local} not found. Skipping...") continue; if dl_bar: dl_bar.hide() ### Close the connection to transport (already done for sftp) transport.close() ### EOF # + ################# ################# # ## Set Time ############## # - def _set_time(start, end): """Set the list of year-month to target the files to download. If start is None: start of previous month or 2 months before end. If end is None: now. """ if None not in [start, end]: # Regular method all_months = pd.period_range(start=start, end=end, freq='M', periods=None) elif ((start is None) & (end is None)): # Both -> 2months before now all_months = pd.period_range(start=start, end=datetime.utcnow(), freq='M', periods=2) elif end is None: # Start but no end: until now all_months = pd.period_range(start=start, end=datetime.utcnow(), freq='M', periods=None) elif start is None: # End but no start -> 2 months before end all_months = pd.period_range(start=start, end=end, freq='M', periods=2) return all_months.year, all_months.month # + ################# ################# # ## Get File List ############## # - def _get_file_list(years, months, path, rootName): """Create the list of files to download from the server, with exact names. """ return [f"{path}{y}_{m:02d}_{rootName}" for y, m in zip(years, months)] # + ################# ################# # ## Manage Password ############## # - def _manage_password(): """Asks the user to give a password. ENTSO-E requires the password to be changed once in a while, it might just have expired. If the correct password is not valid anymore, visit https://transparency.entsoe.eu/ to login and reset. """ return getpass("Password: ") # + ################# ################# # ## Should Download ############## # - def _should_download(sftp, remote, local, threshold_minutes=15, threshold_size=.9): """Investigates whether to download a file or not. Parameters ---------- sftp the connexion to server remote: str name of the remote file local: str local version of the file name, if it were to exist thershold_minutes: int, default to 15 time in minutes. Maximum time between last download and last remote unpdate to not download the file. threshold_size: float, default to 0.9 minimum ratio of size difference between local and remote file below which to download, if the last download is newer than `threshold_minutes`. Returns ------- bool True if - the file does not exist in the local target directory OR - the remote file is newer than `threshold_minutes` after download of the local file - the local file is smaller in size than `threshold_size` of the remote file's size. """ ### IF NO LOCAL FILE ALREADY, DOWNLOAD. if not os.path.isfile(local): return True ### IF REMOTE FILE IS NEWER THAN LOCAL, DOWNLOAD. remote_mtime = datetime.fromtimestamp(getattr(sftp.lstat(remote), 'st_mtime')) local_mtime = datetime.fromtimestamp(getattr(os.stat(local), 'st_mtime')) delta_utc = datetime.utcnow() - datetime.now() # Server is in UTC. Local files are in local time. is_newer = (remote_mtime - (local_mtime + delta_utc)) > timedelta(minutes=threshold_minutes) if is_newer: return True ### IF REMOTE IS (SIGNIFICANTLY) LARGER, DOWNLOAD remote_size = getattr(sftp.lstat(remote), 'st_size') # Size of remote document local_size = getattr(os.stat(local), 'st_size') # Size of local document is_larger = ((remote_size - local_size) / remote_size) > (1 - threshold_size) if is_larger: return True ### OTHER CASES: DO NOT DOWNLOAD return False # + ################# ################# # ## Remove Old Files ############## # - def _remove_olds(path, local_list): """Clears unused files in local directory""" remove_list = [path + f for f in os.listdir(path) if ((f not in local_list) & (os.path.isfile(path + f)))] for f in remove_list: os.remove(f) # Delete the file # EOF # + ################# ################# # ## Progress Bar ############## # - def _progressBar(transferred, toBeTransferred, info, is_verbose=False, progress_bar: ProgressInfo = None): """Display a progress bar during the download""" if is_verbose: print( f"[{info}] Transferred: {transferred / 1024 ** 2:.1f} MB\tOut of: {toBeTransferred / 1024 ** 2:.1f}" + " " * 10, end="\r") if progress_bar is not None: progress_bar.set_max_value(int(toBeTransferred)) progress_bar.set_progress(int(transferred)) progress_bar.set_sub_label(f'{transferred / 1024 ** 2:.1f}/{toBeTransferred / 1024 ** 2:.1f} MB')