Source code for ecodynelec.preprocessing.sfoe_extracting

"""
Module: OFEN PDF parsing & updating
-----------------------------------
Purpose: extract tables from OFEN PDFs (pages 47/48/50 depending on the year),
reshape some columns, and aggregate them into a final DataFrame for use in EcoDynElec.
"""

import re

import pandas as pd
import tabula


def _extract_numbers(cell, page):
    """
    Extract numeric values from a cell string depending on the PDF page layout.

    Parameters
    ----------
    cell : Any
        Raw content of the PDF cell (often str, sometimes NaN).
    page : int
        Logical page number controlling the extraction strategy (47, 48, 50).

    Returns
    -------
    list

        - page 47: up to 4 values (Nuclear, Thermical, Wind, PV).
        - page 48: 2 values (Total, Conso_pompes_STEP).
        - page 50: 2 values (Conso_pompes_STEP, Prod_nette).
        - Fills with pd.NA when fewer values are found.
        - May return an empty list if nothing can be extracted.

    """
    if pd.isna(cell):
        return [pd.NA, pd.NA, pd.NA, pd.NA]

    # Normalize spaces and separators
    s = str(cell).replace("\u00A0", " ").replace("\u202F", " ").strip()
    s = re.sub(r"[;,\t]", " ", s)
    s = re.sub(r"\s+", " ", s)
    parts = s.split(" ")
    nums = []

    if page == 47:
        # Expect 4 values (or reconstruct if split into 5+ tokens)
        if len(parts) == 4:
            nums = [pd.to_numeric(g, errors="coerce") for g in parts]
        elif len(parts) >= 5:
            # First value may be split across 2 tokens
            first = parts[0] + parts[1]
            rest = parts[2:5]
            nums = [pd.to_numeric(first, errors="coerce")] + \
                   [pd.to_numeric(x, errors="coerce") for x in rest]
        return nums

    elif page == 48:
        # Expect 2 values (Total, Conso_pompes_STEP)
        nums = []
        if len(parts) >= 3 and parts[0].isdigit() and parts[1].isdigit() and parts[2].isdigit():
            first = parts[0] + parts[1]   # Handle "x xxx"
            nums.append(pd.to_numeric(first, errors="coerce"))
            nums.append(pd.to_numeric(parts[2], errors="coerce"))
        else:
            # fallback: extract first 2 numbers
            matches = re.findall(r'\d+', s)
            for t in matches[:2]:
                nums.append(pd.to_numeric(t, errors="coerce"))
        while len(nums) < 2:
            nums.append(pd.NA)
        return nums

    elif page == 50:
        # Expect 2 values (Conso_pompes_STEP, Prod_nette)
        if len(parts) == 3 and parts[0].isdigit() and parts[1].isdigit() and parts[2].isdigit():
            nums.append(pd.to_numeric(parts[0], errors="coerce"))   # Conso_pompes_STEP
            prod = str(parts[1]) + str(parts[2])                    # Prod_nette = "x" + "xxx"
            nums.append(pd.to_numeric(prod, errors="coerce"))
        while len(nums) < 2:
            nums.append(pd.NA)
        return nums


[docs] def split_col(df: pd.DataFrame, page: int, names: list) -> pd.DataFrame: """ Replace one source column in the DataFrame by multiple new columns, depending on the page and extraction logic. Parameters ---------- df : pd.DataFrame Table extracted with tabula.read_pdf(...) for a given page. page : int Logical page number. names : list[str] Names of the new columns to insert. Returns ------- pd.DataFrame Same DataFrame, with the source column replaced by `names`. """ if page == 47: src_idx = 2 arr = df.iloc[:, src_idx].apply(lambda cell: _extract_numbers(cell, page=page)).to_list() arr = [vals + [pd.NA] * (4 - len(vals)) for vals in arr] new = pd.DataFrame(arr, columns=names, index=df.index) elif page == 48: src_idx = 6 arr = df.iloc[:, src_idx].apply(lambda cell: _extract_numbers(cell, page=page)).to_list() arr = [vals + [pd.NA] * (2 - len(vals)) for vals in arr] new = pd.DataFrame(arr, columns=names, index=df.index) elif page == 50: src_idx = 5 arr = df.iloc[:, src_idx].apply(lambda cell: _extract_numbers(cell, page=page)).to_list() arr = [vals + [pd.NA] * (2 - len(vals)) for vals in arr] new = pd.DataFrame(arr, columns=names, index=df.index) # Reconstruct DataFrame with left | new | right left = df.iloc[:, :src_idx] right = df.iloc[:, src_idx+1:] if src_idx+1 <= df.shape[1]-1 else df.iloc[:, 0:0] fixed = pd.concat([left, new, right], axis=1) return fixed
[docs] def ofen_pdf_to_df(file, page): """ Load a specific page from an OFEN PDF and return a pre-processed DataFrame. Parameters ---------- file : str Path to the OFEN PDF (e.g. ".../2022.pdf", ".../2024.pdf"). page : int Page number to extract (47, 48, 49, 50). Returns ------- pd.DataFrame - Page 47: Nuclear/Thermical/Wind/PV columns extracted - Page 48: Total/Conso_pompes_STEP columns extracted - Page 50: Conso_pompes_STEP/Prod_nette columns extracted - Others: raw table """ tables = tabula.read_pdf(file, pages=page, pandas_options={'header': None}, stream=True) if file.endswith('24.pdf'): if page == 47: df = pd.DataFrame(tables[0]) if len(df) > 9: df = df.iloc[9:].reset_index(drop=True) # skip header rows df = split_col(df, page=page, names=("Nuclear", "Thermical", "Wind", "PV")) if page == 48: df = pd.DataFrame(tables[1]) df = split_col(df, page=page, names=("Total", "Conso_pompes_STEP")) return df else: if page == 50: df = pd.DataFrame(tables[0]) df[7] = df[6] # keep original logic: duplicate column 6 to 7 df = split_col(df, page=page, names=("Conso_pompes_STEP", "Prod_nette")) return df else: return pd.DataFrame(tables[0])
[docs] def updating_ofen_data(file): """ Full pipeline for an OFEN PDF to obtain SFOE_data dataframe fo: - Load expected pages (depending on year), - Apply column splits and cleanup, - Concatenate tables, - Rename and reorder final columns, - Add 'mois' column and sort by year. /!\ Only works for the 2024 update. Parameters ---------- file : str Path to the OFEN PDF (e.g. '.../2024.pdf', '.../2022.pdf'). Returns ------- pd.DataFrame Final DataFrame with columns: ["annee","mois","Hydro","Nuclear","Thermical","Conso_pompes_STEP", "Prod_nette","Imports","Exports","Conso_CH","Pertes","Conso_Finale_CH"] Sorted by year and month. """ print('Reading', file) if file.endswith('24.pdf'): page = [47, 48] # Page 47 table_1a = ofen_pdf_to_df(file, page=page[0]) table_1a.drop(table_1a.columns[[4, 5, 6, 14]], axis=1, inplace=True) table_1a.columns = range(table_1a.shape[1]) # Page 48 table_1b = ofen_pdf_to_df(file, page=page[1]) table_1b.drop(table_1b.columns[[4, 5, 6, 9, 15]], axis=1, inplace=True) table_1b.columns = range(table_1b.shape[1]) df = pd.concat([table_1a, table_1b]) elif file.endswith('22.pdf'): page = [49, 50] table_1a, table_1b = ofen_pdf_to_df(file, page=page[0]), ofen_pdf_to_df(file, page=page[1]) table_1a.columns = range(table_1a.shape[1]) table_1b.columns = range(table_1b.shape[1]) # Shift columns (year-specific logic) table_1b[8] = table_1b[9] table_1b[9] = table_1b[10] table_1b[10] = table_1b[11] table_1b[11] = table_1b[12] df = pd.concat([table_1a, table_1b]) df.drop(df.columns[[4, 12, 13]], axis=1, inplace=True) # Final column names df.columns = ["annee", "Hydro", "Nuclear", "Thermical", "Conso_pompes_STEP", "Prod_nette", "Imports", "Exports", "Conso_CH", "Pertes", "Conso_Finale_CH"] # Normalize 'annee' to Int64 year_series = (df['annee'].astype(str).str.extract(r'(\d{4})')[0]) df['annee'] = pd.to_numeric(year_series, errors='coerce').astype('Int64') # Derive 'mois' (1..12) per year mois = (df.groupby(df['annee']).cumcount() % 12 + 1).astype('Int16') df.insert(loc=1, column='mois', value=mois) # Sort by year and reset index df_sorted = df.sort_values(by='annee', kind='mergesort').reset_index(drop=True) # Drop last 11 rows (incomplete year) df_sorted = df_sorted.iloc[:-11] return df_sorted