"""Module reaching ENTSO-E server and downloading data"""
import os
from datetime import datetime, timedelta
from functools import partial
from getpass import getpass
from time import time
import pandas as pd
import paramiko
from ecodynelec.parameter import Parameter
from ecodynelec.progress_info import ProgressInfo
# +
#################
#################
# ## Download
##############
# -
[docs]
def download(config, threshold_minutes=15, threshold_size=.9, is_verbose=False):
"""Downloads data from ENTSO-E servers and save it.
Parameters
----------
config: ecodynelec.Parameter
collection of parameters for the execution of ecodynelec.
The relevant information is the start and end date, as well
as server information and path information to save raw_generation
and raw_exchange.
thershold_minutes: int, default to 15
time in minutes. Maximum time between last download and
last remote unpdate to not download the file.
threshold_size: float, default to 0.9
minimum ratio of size difference between local and remote file below
which to download, if the last download is newer than `threshold_minutes`.
is_verbose: bool, default to False
to display information during the download
Returns
-------
None
"""
t0 = time()
### Verify the configuration
if isinstance(config, str):
if any([config.endswith(k) for k in ('.xlsx', '.xls', '.ods')]):
config = Parameter(excel=config)
else:
raise NotImplementedError(f"File extension for {config} is not supported.")
### Get the start and end dates
dates = _set_time(config.start, config.end)
### Decide on the files to download
file_list = {k: _get_file_list(*dates, getattr(config.server, f'_remote{k}Dir'),
getattr(config.server, f'_name{k}File'))
for k in ['Generation', 'Exchanges']}
### Point to the saving locations
save_list = {k: _get_file_list(*dates, getattr(config.path, k.lower()),
getattr(config.server, f'_name{k}File'))
for k in file_list}
### Clear directories
if config.server.removeUnused:
_remove_olds(config.path.generation, save_list['Generation']) # Remove unused generation
_remove_olds(config.path.exchanges, save_list['Exchanges']) # Remove unused exchanges
### Download files
_reach_server(config.server, files=file_list, savepaths=save_list, is_verbose=is_verbose,
threshold_minutes=threshold_minutes, threshold_size=threshold_size)
### EOF
if is_verbose: print(f"\tDownload from server: {time() - t0:.2f} sec" + " " * 40)
return
# +
#################
#################
# ## Reach Server
##############
# -
def _reach_server(server_info, files, savepaths, threshold_minutes=15, threshold_size=.9, is_verbose=False,
progress_bar: ProgressInfo = None):
"""Function establishing the connection with the server using credentials
, collecting files and saving them. Nothing is returned.
"""
### Create the connection
if is_verbose: print("Connection...", end='\r')
if progress_bar: progress_bar.set_sub_label('Connection...')
### Get the username, if required
if server_info.username in [None, '']:
server_info.username = input("Username: ")
### Connexion loop
password = server_info.password # Get the password from parameters
if server_info.password is None:
password = _manage_password()
is_valid, safety_loop = False, 1
while not is_valid:
transport = paramiko.Transport((server_info.host, server_info.port))
try:
transport.connect(username=server_info.username, password=password)
is_valid = True # Success
except paramiko.AuthenticationException as authentication_error:
if safety_loop == 0:
message = "Connection failed. Your password may be outdated. "
message += "Please verify by logging in at https://transparency.entsoe.eu/"
print(message)
raise paramiko.AuthenticationException(message)
else:
print("Error in Password or Username. Connection failed.")
safety_loop -= 1 # One chance less
transport.close() # Close the transport
# del transport
password = _manage_password() # New try for password
### Create a data pipe between local and remote
if is_verbose: print("Create pipe...", end='\r')
if progress_bar:
progress_bar.set_sub_label('Create pipe...')
dl_bar = ProgressInfo(color='green')
dl_bar.hide()
else:
dl_bar = None
with paramiko.SFTPClient.from_transport(transport) as sftp:
### Download all files
for categ in files: # Generation then Exchanges
for i, (remote, local) in enumerate(zip(files[categ], savepaths[categ])): # For each file
try:
if progress_bar:
progress_bar.set_sub_label(f'Check file: {categ} {i + 1}/{len(files[categ])}')
if not _should_download(sftp, remote, local, threshold_minutes, threshold_size): continue;
if dl_bar:
dl_bar.show()
dl_bar.progress(local, 0)
callback_fct = None
if is_verbose or progress_bar is not None:
callback_fct = partial(_progressBar, info=f"{categ} {i + 1}/{len(files[categ])}",
is_verbose=is_verbose, progress_bar=dl_bar)
sftp.get(remotepath=remote, localpath=local,
callback=callback_fct)
except FileNotFoundError as e:
print(f"ERROR: File {remote} (local: {local} not found. Skipping...")
continue;
if dl_bar: dl_bar.hide()
### Close the connection to transport (already done for sftp)
transport.close()
### EOF
# +
#################
#################
# ## Set Time
##############
# -
def _set_time(start, end):
"""Set the list of year-month to target the files to download.
If start is None: start of previous month or 2 months before end.
If end is None: now.
"""
if None not in [start, end]: # Regular method
all_months = pd.period_range(start=start, end=end, freq='M', periods=None)
elif ((start is None) & (end is None)): # Both -> 2months before now
all_months = pd.period_range(start=start, end=datetime.utcnow(), freq='M', periods=2)
elif end is None: # Start but no end: until now
all_months = pd.period_range(start=start, end=datetime.utcnow(), freq='M', periods=None)
elif start is None: # End but no start -> 2 months before end
all_months = pd.period_range(start=start, end=end, freq='M', periods=2)
return all_months.year, all_months.month
# +
#################
#################
# ## Get File List
##############
# -
def _get_file_list(years, months, path, rootName):
"""Create the list of files to download from the server,
with exact names.
"""
return [f"{path}{y}_{m:02d}_{rootName}"
for y, m in zip(years, months)]
# +
#################
#################
# ## Manage Password
##############
# -
def _manage_password():
"""Asks the user to give a password.
ENTSO-E requires the password to be changed once in a while,
it might just have expired. If the correct password is not valid
anymore, visit https://transparency.entsoe.eu/ to login and reset.
"""
return getpass("Password: ")
# +
#################
#################
# ## Should Download
##############
# -
def _should_download(sftp, remote, local, threshold_minutes=15, threshold_size=.9):
"""Investigates whether to download a file or not.
Parameters
----------
sftp
the connexion to server
remote: str
name of the remote file
local: str
local version of the file name, if it were to exist
thershold_minutes: int, default to 15
time in minutes. Maximum time between last download and
last remote unpdate to not download the file.
threshold_size: float, default to 0.9
minimum ratio of size difference between local and remote file below
which to download, if the last download is newer than `threshold_minutes`.
Returns
-------
bool
True if
- the file does not exist in the local target directory OR
- the remote file is newer than `threshold_minutes` after download of the local file
- the local file is smaller in size than `threshold_size` of the remote file's size.
"""
### IF NO LOCAL FILE ALREADY, DOWNLOAD.
if not os.path.isfile(local):
return True
### IF REMOTE FILE IS NEWER THAN LOCAL, DOWNLOAD.
remote_mtime = datetime.fromtimestamp(getattr(sftp.lstat(remote), 'st_mtime'))
local_mtime = datetime.fromtimestamp(getattr(os.stat(local), 'st_mtime'))
delta_utc = datetime.utcnow() - datetime.now() # Server is in UTC. Local files are in local time.
is_newer = (remote_mtime - (local_mtime + delta_utc)) > timedelta(minutes=threshold_minutes)
if is_newer: return True
### IF REMOTE IS (SIGNIFICANTLY) LARGER, DOWNLOAD
remote_size = getattr(sftp.lstat(remote), 'st_size') # Size of remote document
local_size = getattr(os.stat(local), 'st_size') # Size of local document
is_larger = ((remote_size - local_size) / remote_size) > (1 - threshold_size)
if is_larger: return True
### OTHER CASES: DO NOT DOWNLOAD
return False
# +
#################
#################
# ## Remove Old Files
##############
# -
def _remove_olds(path, local_list):
"""Clears unused files in local directory"""
remove_list = [path + f for f in os.listdir(path) if ((f not in local_list) & (os.path.isfile(path + f)))]
for f in remove_list:
os.remove(f) # Delete the file
# EOF
# +
#################
#################
# ## Progress Bar
##############
# -
def _progressBar(transferred, toBeTransferred, info, is_verbose=False, progress_bar: ProgressInfo = None):
"""Display a progress bar during the download"""
if is_verbose:
print(
f"[{info}] Transferred: {transferred / 1024 ** 2:.1f} MB\tOut of: {toBeTransferred / 1024 ** 2:.1f}" + " " * 10,
end="\r")
if progress_bar is not None:
progress_bar.set_max_value(int(toBeTransferred))
progress_bar.set_progress(int(transferred))
progress_bar.set_sub_label(f'{transferred / 1024 ** 2:.1f}/{toBeTransferred / 1024 ** 2:.1f} MB')