Monitorear datos y modelos en las operaciones aéreas con Evidently y Streamlit en producción

La monitorización de datos y modelos en las operaciones aéreas con Evidently y Streamlit en producción

Introducción

¿Has experimentado la frustración de un modelo que funciona bien en entrenamiento y evaluación, pero que funciona peor en el entorno de producción? Es un desafío común en la fase de producción, y ahí es donde entra en juego Evidently.ai, una fantástica herramienta de código abierto, para hacer que nuestro modelo de aprendizaje automático sea observable y fácil de monitorear. Esta guía cubrirá las razones detrás de los cambios en los datos y el rendimiento del modelo en el entorno de producción, así como las acciones necesarias a implementar. También aprenderemos cómo integrar esta herramienta con la aplicación de predicción Streamlit. Comencemos nuestro increíble viaje.

Este artículo fue publicado como parte del Data Science Blogathon.

Prerrequisitos necesarios

1) Clonar el repositorio

git clone "https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture.git"

2) Crear y activar el entorno virtual

#crear un entorno virtualpython3 -m venv venv#Activar tu entorno virtual en la carpeta de tu proyecto source venv/bin/activate

#Este comando instala paquetes de Python listados en el archivo requirements.txtpip install -r requirements.txt

4) Instalar Streamlit y Evidently

pip install streamlitpip install evidently

Estructura del proyecto:

project_root/│├── assets/│├── data/│   └── Monitoring_data.csv│├── models/│   ├── best_model.pkl│   ├── lightgml_model.pkl│   ├── linear_regression_model.pkl│   ├── random_forest_model.pkl│   ├── ridge_regression.pkl│   ├── svm_model.pkl│   └── xgboost_model.pkl│├── notebooks/│   └── EDA.ipynb│├── src/│   ├── controller/│   │   └── flight_delay_controller.py│   ││   ├── model/│   │   └── flight_delay_model.py│   ││   ├── view/│   │   └── flight_delay_view.py│   ││   ├── data_preprocessing.py│   ├── model_evaluation.py│   └── modeling.py│├── .gitignore├── Dockerfile├── LICENSE├── Readme.md├── app.py└── requirements.txt

Preprocesamiento de datos

Obteniendo datos de la nube

En este proyecto, obtendremos el conjunto de datos de Azure. Primero, crearemos un contenedor de almacenamiento en Azure, luego un almacenamiento en blob, subiremos nuestro conjunto de datos sin procesar allí, lo haremos accesible públicamente y lo utilizaremos para futuros pasos de preprocesamiento de datos.

Enlace del conjunto de datos: https://www.kaggle.com/datasets/giovamata/airlinedelaycauses

Aquí está el fragmento de código para obtener datos de la nube,

class DataPreprocessorTemplate:    """    Patrón de método de plantilla para el preprocesamiento de datos con pasos personalizables.    """    def __init__(self, data_url):        """        Inicializar la plantilla de preprocesamiento de datos con la URL de datos incluyendo el token SAS.        Args:            data_url (str): La URL de los datos con el token SAS.        """        self.data_url = data_url    def fetch_data(self):        """        Obtener datos de Azure Blob Storage.        Returns:            pd.DataFrame: El conjunto de datos obtenido como un DataFrame de Pandas.        """        try:            # Obtener el conjunto de datos utilizando la URL proporcionada            print("Obteniendo los datos de la nube...")            data = pd.read_csv(self.data_url)            return data        except Exception as e:            raise Exception("Ocurrió un error durante la recuperación de datos: " + str(e))def main():  # URL de los datos incluyendo el token SAS  data_url = "https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv"  output_path = "../data/cleaned_flight_delays.csv"  data_preprocessor = DataPreprocessorTemplate(data_url)  data = data_preprocessor.fetch_data()  cleaned_data=data_preprocessor.clean_data(data)  data_preprocessor.save_cleaned_data(cleaned_data, output_path)     

Imagen Azure:

Limpieza y transformación de datos

En el mundo de los datos, la transformación de datos convierte los diamantes en bruto en diamantes pulidos. Aquí, procederemos con todos los pasos de preprocesamiento de datos, como eliminar características no deseadas, completar valores faltantes, codificar columnas categóricas y eliminar valores atípicos. Aquí hemos implementado la codificación ficticia. Luego, eliminaremos los valores atípicos utilizando la prueba de Z-score.

Código de limpieza de datos:

def clean_data(self,df):        """        Limpia y preprocesa los datos de entrada.        Args:            df (pd.DataFrame): El conjunto de datos de entrada.        Returns:            pd.DataFrame: El conjunto de datos limpio y preprocesado.        """        print("Limpiando datos...")        df=self.remove_features(df)        df=self.impute_missing_values(df)        df=self.encode_categorical_features(df)        df=self.remove_outliers(df)        return df    def remove_features(self,df):        """        Elimina las columnas innecesarias del conjunto de datos.        Args:            df (pd.DataFrame): El conjunto de datos de entrada.        Returns:            pd.DataFrame: El conjunto de datos con las columnas innecesarias eliminadas.        """        print("Eliminando columnas innecesarias...")        df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)        return df    def impute_missing_values(self,df):        """        Completa los valores faltantes en el conjunto de datos.        Args:            df (pd.DataFrame): El conjunto de datos de entrada.        Returns:            pd.DataFrame: El conjunto de datos con los valores faltantes completados.        """        print("Completando valores faltantes...")        delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']        # Completar los valores faltantes con 0 para estas columnas        df[delay_colns]=df[delay_colns].fillna(0)        # Completar los valores faltantes con la mediana para estas columnas        columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']        df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())        return df    def encode_categorical_features(self,df):        """        Codifica las características categóricas en el conjunto de datos.        Args:            df (pd.DataFrame): El conjunto de datos de entrada.        Returns:            pd.DataFrame: El conjunto de datos con las características categóricas codificadas.        """        print("Codificando características categóricas...")        df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin', 'Dest'], drop_first=True)        return df    def remove_outliers(self,df):        """        Elimina los valores atípicos del conjunto de datos.        Args:            df (pd.DataFrame): El conjunto de datos de entrada.        Returns:            pd.DataFrame: El conjunto de datos con los valores atípicos eliminados.        """        print("Eliminando valores atípicos...")        z_threshold=3        z_scores=np.abs(stats.zscore(df[self.numerical_columns]))        outliers=np.where(z_scores<z_threshold)        df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]        print("Forma después de la limpieza de datos:", df_no_outliers.shape)        return df_no_outliers

Luego, guardaremos el conjunto de datos limpios para nuestros futuros fines de entrenamiento y evaluación del modelo. Usaremos joblib para guardar el conjunto de datos limpios.

Aquí está el fragmento de código:

def save_cleaned_data(self,cleaned_data, output_path):    """    Guarda los datos limpios en un archivo CSV.    Args:        cleaned_data (pd.DataFrame): El conjunto de datos limpio.        output_path (str): La ruta para guardar los datos limpios.    """    print("Guardando datos limpios...")                cleaned_data.to_csv(output_path,index=False)

Entrenamiento y evaluación

Después de la limpieza de datos, dividiremos nuestro conjunto de datos en 2 componentes: conjunto de entrenamiento y conjunto de prueba. Luego, entrenaremos varios modelos de regresión, como regresión lineal, regresión de bosque aleatorio, xgboost, regresión de crestas y modelos lightgbm. Luego, guardaremos todos los archivos como archivos .pkl usando joblib, lo cual reduce el tiempo y optimiza el uso de recursos adicionales.

Ahora, podemos usar este archivo de modelo entrenado para fines de evaluación y predicción. Luego, evaluaremos el modelo en función de métricas de evaluación como el puntaje R2, el valor MAE (Error absoluto medio) y el valor RMSE (Error cuadrático medio). Luego, guardaremos el modelo de mejor rendimiento para usarlo en la implementación.

Código de ejemplo para entrenamiento de modelos:

# Función para crear modelos de aprendizaje automático
def crear_modelo(nombre_modelo):
    if nombre_modelo == "random_forest":
        return RandomForestRegressor(n_estimators=50, random_state=42)
    elif nombre_modelo == "linear_regression":
        return LinearRegression()
    elif nombre_modelo == "xgboost":
        return xgb.XGBRegressor()
    elif nombre_modelo == "ridge_regression":
        return Ridge(alpha=1.0)  # Ajustar alpha según sea necesario
    elif nombre_modelo == "lightgbm":
        return lgb.LGBMRegressor()

# Cargar el conjunto de datos limpios
print("Iniciando carga del modelo...")
datos_limpios = pd.read_csv("../data/cleaned_flight_delays.csv")
print("Carga del modelo completada")

# Definir la variable objetivo (ArrDelay) y las características (X)
variable_objetivo = "ArrDelay"
X = datos_limpios.drop(columns=[variable_objetivo])
y = datos_limpios[variable_objetivo]

# Dividir los datos en conjuntos de entrenamiento y prueba
print("Dividiendo los datos en conjuntos de entrenamiento y prueba...")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print("División de datos completada.")

# Función para entrenar y guardar un modelo
def entrenar_y_guardar_modelo(nombre_modelo, X_train, y_train):
    modelo = crear_modelo(nombre_modelo)
    print(f"Entrenando el modelo {nombre_modelo}...")
    start_time = time.time()
    modelo.fit(X_train, y_train)
    print("Entrenamiento del modelo completado...")
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Tiempo de entrenamiento transcurrido: {elapsed_time:.2f} segundos")
    # Guardar el modelo entrenado para usarlo más adelante
    joblib.dump(modelo, f"../models/{nombre_modelo}_modelo.pkl")
    print(f"Modelo {nombre_modelo} guardado como {nombre_modelo}_modelo.pkl")

# Crear y entrenar el modelo Random Forest
entrenar_y_guardar_modelo("random_forest", X_train, y_train)

# Entrenar el modelo de regresión lineal
entrenar_y_guardar_modelo("regresión_lineal", X_train, y_train)

# Crear y entrenar el modelo XGBoost
entrenar_y_guardar_modelo("xgboost", X_train, y_train)

# Crear y entrenar el modelo de regresión ridge
entrenar_y_guardar_modelo("regresión_ridge", X_train, y_train)

# Crear y entrenar el modelo lightgbm
entrenar_y_guardar_modelo("lightgbm", X_train, y_train)

Código de ejemplo para evaluar modelos

# Cargar el conjunto de datos limpios
datos_limpios = pd.read_csv("../data/cleaned_flight_delays.csv")

# Cargar los modelos de aprendizaje automático entrenados
modelo_random_forest = joblib.load("../models/random_forest_model.pkl")
modelo_regresión_lineal = joblib.load("../models/regresión_lineal_model.pkl")
modelo_xgboost = joblib.load("../models/xgboost_model.pkl")
modelo_regresión_ridge = joblib.load("../models/regresión_ridge_model.pkl")
modelo_lightgbm = joblib.load("../models/lightgbm_model.pkl")

# Definir la variable objetivo (ArrDelay) y las características (X)
variable_objetivo = "ArrDelay"
X = datos_limpios.drop(columns=[variable_objetivo])
y = datos_limpios[variable_objetivo]

# Dividir los datos en conjuntos de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Definir una función para evaluar los modelos
def evaluar_modelo(modelo, X_test, y_test):
    y_pred = modelo.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    return mae, mse, r2

# Crear un diccionario de modelos para evaluar
modelos = {
    "Random Forest": modelo_random_forest,
    "Regresión Lineal": modelo_regresión_lineal,
    "XGBoost": modelo_xgboost,
    "Regresión Ridge": modelo_regresión_ridge,
    "LightGBM": modelo_lightgbm,
}

# Evaluar cada modelo y almacenar sus métricas
metricas = {}
for nombre_modelo, modelo in modelos.items():
    mae, mse, r2 = evaluar_modelo(modelo, X_test, y_test)
    metricas[nombre_modelo] = {"MAE": mae, "MSE": mse, "R2": r2}

# Imprimir las métricas de evaluación para todos los modelos
for nombre_modelo, metricas_modelo in metricas.items():
    print(f"Métricas para el modelo {nombre_modelo}:")
    print(f"Error Absoluto Medio (MAE): {metricas_modelo['MAE']:.2f}")
    print(f"Error Cuadrado Medio (MSE): {metricas_modelo['MSE']:.2f}")
    print(f"Puntuación R-cuadrado (R2): {metricas_modelo['R2']:.2f}")
    print()

Después de la evaluación, elegiremos el mejor modelo de implementación.

Fragmento de código para guardar e imprimir el mejor modelo:

# Encontrar el mejor modelo basado en la puntuación de R2best_model = max(metrics, key=lambda model: metrics[model]["R2"])# Imprimir los resultadosprint(f"El mejor modelo de todos los modelos entrenados es {best_model} con las siguientes métricas:")print(f"Error Absoluto Medio (MAE): {metrics[best_model]['MAE']}")print(f"Error Cuadrático Medio (MSE): {metrics[best_model]['MSE']}")print(f"Puntuación R-cuadrado (R2): {metrics[best_model]['R2']}")# Guardar el mejor modelo para usarlo más adelantejoblib.dump(models[best_model], "../models/best_model.pkl")print("Mejor modelo guardado como best_model.pkl")

Salida:

Evidentemente: Para el Monitoreo de Datos y Modelos

En el entorno de producción, varios factores pueden afectar el funcionamiento adecuado del modelo. Algunos de los aspectos clave son:

  1. Disparidad entre entrenamiento y servicio: Esto ocurre cuando hay una diferencia significativa entre los datos que usamos para el entrenamiento y los experimentos.
  2. Problemas de calidad e integridad de los datos: Pueden existir problemas de procesamiento de datos, como tuberías rotas, problemas de infraestructura, cambios en el esquema de datos u otros problemas de origen.
  3. Modelo anterior roto: En el entorno de producción, los modelos a menudo forman una cadena, donde la entrada de un modelo depende de la salida de otro. Por lo tanto, cualquier problema en la salida de un modelo afectará la predicción general del modelo.

4. Deriva gradual del concepto: Con el tiempo, puede haber cambios en el concepto en el que estamos trabajando o en la variable objetivo, por lo que es necesario el monitoreo. También puede haber situaciones inesperadas donde el monitoreo es crucial y nuestras predicciones del modelo pueden fallar. Por ejemplo, catástrofes o calamidades naturales.

Para evaluar la calidad de los datos y del modelo, consideraremos 2 conjuntos de datos: el conjunto de referencia y el conjunto de datos actuales.

Conjunto de referencia: Este conjunto de datos sirve como referencia para las métricas de calidad del conjunto de datos actual. Aunque Evidently elige valores de umbral predeterminados, basados en nuestro conjunto de referencia, también podemos proporcionar valores de umbral de métricas personalizadas según nuestras necesidades específicas.

Conjunto de datos actual: Representa los datos desconocidos en vivo que se tomaron para la evaluación.

Calcularemos informes de deriva de datos, deriva del objetivo, calidad de datos y rendimiento del modelo con estos dos conjuntos de datos.

Los informes cubrirán métricas de deriva de datos, deriva del objetivo, calidad de datos y rendimiento del modelo. Por lo general, el monitoreo se realiza en intervalos específicos en lotes.

Aspectos a tener en cuenta antes de volver a entrenar el modelo:

1. Verificar la deriva de datos: Si se detecta deriva de datos, se recomienda verificar primero la calidad de los datos y buscar cualquier factor externo que pueda influir en la deriva, como pandemias y calamidades naturales.

2. Evaluar el rendimiento del modelo: Considerar el rendimiento del modelo después de abordar los problemas de deriva de datos. Si los informes de datos y modelo muestran deriva, puede ser una buena idea volver a entrenar el modelo. Antes de tomar esta decisión, considere el tercer punto.

3. Consideración de volver a entrenar: No siempre es necesario volver a entrenar. En muchas situaciones, no tendremos suficientes datos nuevos para volver a entrenar el modelo. Tenga cuidado al entrenar con nuevos datos, ya que también existe la posibilidad de que los datos sean inestables y incorrectos por razones específicas.

4. Establecer alertas para la deriva de datos: Antes de establecer advertencias para las derivas de datos, analice la importancia de esos datos/características específicos en la predicción. No todas las derivas de datos son importantes y requieren acciones. Siempre analice la importancia de la influencia de cada característica en la predicción.

Los informes se pueden generar en varios formatos, como HTML, JPEG, JSON, etc. Vamos a explorar el fragmento de código para generar los informes.

Fragmento de Código para Informe de Rendimiento del Modelo

# Creación de un objeto de informe de rendimiento de regresiónregression_performance_report = Report(metrics=[RegressionPreset()])# Ejecución del informe de rendimiento de regresiónregression_performance_report.run(    reference_data=reference,  # Conjunto de datos de referencia para comparación    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Conjunto de datos actual para análisis    column_mapping=column_mapping  # Mapeo entre columnas en conjuntos de datos de referencia y actuales)# Especificación de la ruta para guardar el informe de rendimiento del modelo en formato HTMLmodel_performance_report_path = reports_dir / 'model_performance.html'# Guardar el informe de rendimiento de regresión como un archivo HTMLregression_performance_report.save_html(model_performance_report_path)

Fragmento de Código para Drift del Objetivo

# Creación de un objeto de informe de drift del objetivotarget_drift_report = Report(metrics=[TargetDriftPreset()])# Ejecución del informe de drift del objetivotarget_drift_report.run(    reference_data=reference,  # Conjunto de datos de referencia para comparación    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Conjunto de datos actual para análisis    column_mapping=column_mapping  # Mapeo entre columnas en conjuntos de datos de referencia y actuales)# Especificación de la ruta para guardar el informe de drift del objetivo en formato HTMLtarget_drift_report_path = reports_dir / 'target_drift.html'# Guardar el informe de drift del objetivo como un archivo HTMLtarget_drift_report.save_html(target_drift_report_path)

Drift de Datos

# Creación de un objeto de mapeo de columnascolumn_mapping = ColumnMapping()column_mapping.numerical_features = numerical_features  # Definir características numéricas para el mapeo# Creación de un objeto de informe de drift de datosdata_drift_report = Report(metrics=[DataDriftPreset()])# Ejecución del informe de drift de datosdata_drift_report.run(    reference_data=reference,  # Conjunto de datos de referencia para comparación    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Conjunto de datos actual para análisis    column_mapping=column_mapping  # Mapeo entre características numéricas en conjuntos de datos de referencia y actuales)# Especificación de la ruta para guardar el informe de drift de datos en formato HTMLdata_drift_report_path = reports_dir / 'data_drift.html'# Guardar el informe de drift de datos como un archivo HTMLdata_drift_report.save_html(data_drift_report_path)column_mapping = ColumnMapping()column_mapping.numerical_features = numerical_featuresdata_drift_report = Report(metrics=[DataDriftPreset()])data_drift_report.run(    reference_data=reference,    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],    column_mapping=column_mapping)data_drift_report_path = reports_dir / 'data_drift.html'data_drift_report.save_html(data_drift_report_path)

Fragmento de Código para Calidad de Datos

# Creación de un objeto de mapeo de columnascolumn_mapping = ColumnMapping()column_mapping.numerical_features = numerical_features  # Definir características numéricas para el mapeo# Creación de un objeto de informe de calidad de datosdata_quality_report = Report(metrics=[DataQualityPreset()])# Ejecución del informe de calidad de datosdata_quality_report.run(    reference_data=reference,  # Conjunto de datos de referencia para comparación    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Conjunto de datos actual para análisis    column_mapping=column_mapping  # Mapeo entre características numéricas en conjuntos de datos de referencia y actuales)# Especificación de la ruta para guardar el informe de calidad de datos en formato HTMLdata_quality_report_path = reports_dir / 'data_quality.html'# Guardar el informe de calidad de datos como un archivo HTMLdata_quality_report.save_html(data_quality_report_path)

Explorar la Arquitectura del Modelo-Vista-Controlador (MVC)

Vamos a aprender sobre la arquitectura MVC en esta sección:-

La arquitectura Modelo-Vista-Controlador (MVC) es un patrón de diseño utilizado en aplicaciones web. Se utiliza para simplificar la complejidad del código y mejorar la legibilidad del mismo. Veamos sus componentes.

Modelo: La Lógica

El componente Modelo representa la lógica central de nuestra aplicación. Se encarga del procesamiento de datos y las interacciones con los modelos de aprendizaje automático. El script del Modelo se encuentra en el directorio src/model/.

Vista: Construcción de Interfaces de Usuario

El componente Vista es responsable de crear la interfaz de usuario. Interactúa con el usuario y muestra información. El script de la Vista se encuentra en el directorio src/view/.

Controlador: El Puente Orquestador

El componente Controlador actúa como un puente intermedio entre los componentes Modelo y Vista. Maneja la entrada y las solicitudes del usuario. El script del Controlador se encuentra en el directorio src/controller/.

Diagrama MVC:

A continuación se muestra una representación visual de la arquitectura MVC:

Veamos el fragmento de código para predecir retrasos en la arquitectura MVC:

1) Modelo (flight_delay_model.py):

# Importar las bibliotecas y módulos necesariosimport pandas as pdimport joblibimport xgboost as xgbfrom typing import Dictclass FlightDelayModel:    def __init__(self, model_file="models/best_model.pkl"):        """        Inicializa FlightDelayModel.        Args:            model_file (str): Ruta al archivo de modelo entrenado.        """        # Carga el modelo pre-entrenado del archivo proporcionado        self.model = joblib.load(model_file)    def predecir_retraso(self, input_data: Dict):        """        Predecir el retraso del vuelo en base a los datos de entrada.        Args:            input_data (Dict): Un diccionario que contiene características de entrada para la predicción.        Returns:            float: El retraso del vuelo predicho.        """        # Convierte el diccionario de datos de entrada en un DataFrame para la predicción        input_df = pd.DataFrame([input_data])        # Usa el modelo pre-entrenado para hacer una predicción        prediction = self.model.predict(input_df)        return prediction

2) Vista (flight_delay_view.py):

import streamlit as stfrom typing import Dictclass FlightDelayView:    def display_input_form(self):        """        Muestra el formulario de entrada en la barra lateral de Streamlit para que los usuarios ingresen datos.        """        st.sidebar.write("Valores de entrada:")        # La parte de programación para crear elementos de formulario de entrada estará aquí    def display_selected_inputs(self, selected_data: Dict):        """        Muestra los valores de entrada seleccionados según la entrada del usuario.        Args:            selected_data (Dict): Un diccionario que contiene valores de entrada seleccionados.        Returns:            Dict: El mismo diccionario con valores seleccionados para referencia.        """        # La parte de programación para mostrar los valores de entrada seleccionados estará aquí        # Muestra los valores de entrada seleccionados, como barras deslizantes, entradas numéricas y select boxes        return selected_data    def display_predicted_delay(self, flight_delay):        """        Muestra el retraso del vuelo predicho al usuario.        Args:            flight_delay: El valor del retraso del vuelo predicho.        """        # La parte de programación para mostrar el retraso predicho estará aquí        # Muestra el valor del retraso del vuelo predicho al usuario

3) Controlador (flight_delay_controller.py):

import streamlit as stfrom src.view.flight_delay_view import FlightDelayViewfrom src.model.flight_delay_model import FlightDelayModelfrom typing import Dictclass FlightDelayController:    def __init__(self):        self.model = FlightDelayModel()        self.view = FlightDelayView()        self.selected_data = self.model.selected_data()    def run_prediction(self):        # Muestra el formulario de entrada, recopila las entradas del usuario y muestra las entradas seleccionadas        self.view.display_input_form()        input_data = self.get_user_inputs()        self.view.display_selected_inputs(input_data)        if st.button("Predecir retraso del vuelo"):            # Cuando se hace clic en el botón de predicción, predice el retraso del vuelo y muestra el resultado            flight_delay = self.model.predict_delay(input_data)            self.view.display_predicted_delay(flight_delay)    def get_user_inputs(self):        # La parte de programación para recopilar las entradas del usuario de la barra lateral de Streamlit estará aquí.        user_inputs = {}        # Crea un diccionario vacío para almacenar las entradas del usuario        # Puedes usar los widgets de la barra lateral de Streamlit para recopilar las entradas del usuario, p.ej., st.sidebar.slider, st.sidebar.selectbox, etc.        # Aquí puedes agregar código para recopilar las entradas del usuario y poblar el diccionario 'user_inputs'.        # Ejemplo:        # user_inputs['selected_feature'] = st.sidebar.slider("Seleccionar característica", valor_mínimo, valor_máximo, valor_predeterminado)        # Repite esto para cada entrada de usuario que quieras recopilar.        return user_inputs        # Devuelve el diccionario que contiene las entradas del usuario

Integración de la predicción y el monitoreo con Streamlit

Aquí integraremos el monitoreo de Evidently con la predicción de Streamlit para permitir a los usuarios hacer predicciones y monitorear los datos y el modelo.

Este enfoque permite a los usuarios predecir, monitorear o ambos, escrito sin problemas, siguiendo el patrón de diseño de arquitectura MVC.

Código:

Fragmento de código para seleccionar el conjunto de datos de referencia y actual, implementado en src/flight_delay_Controller.py

# Importar las bibliotecas necesariasimport streamlit as stimport pandas as pdimport timeclass FlightDelayController:    def run_monitoring(self):        # Título e introducción de la aplicación Streamlit        st.title("Aplicación de Monitoreo de Datos y Modelos")        st.write("Estás en la Aplicación de Monitoreo de Datos y Modelos. Selecciona el rango de fecha y mes en la barra lateral y haz clic en 'Enviar' para comenzar el entrenamiento y monitoreo del modelo.")        # Permite al usuario elegir su rango de fecha preferido        new_start_month = st.sidebar.selectbox("Mes de inicio", range(1, 12), 1)        new_end_month = st.sidebar.selectbox("Mes de fin", range(1, 12), 1)        new_start_day = st.sidebar.selectbox("Día de inicio", range(1, 32), 1)        new_end_day = st.sidebar.selectbox("Día de fin", range(1, 32), 30)        # Si el usuario hace clic en el botón "Enviar"        if st.button("Enviar"):            st.write("Obteniendo tus datos actuales...")            # Mide el tiempo que se tarda en obtener los datos            data_start = time.time()            df = pd.read_csv("data/Monitoring_data.csv")            data_end = time.time()            time_taken = data_end - data_start            st.write(f"Datos obtenidos en {time_taken:.2f} segundos")            # Filtrar los datos en base al rango de fechas seleccionado            date_range = (                (df['Mes'] >= new_start_month) & (df['DíaDelMes'] >= new_start_day) &                (df['Mes'] <= new_end_month) & (df['DíaDelMes'] <= new_end_day)            )            # Crear el conjunto de datos de referencia y actual a partir del rango de fechas.            datos_de_referencia = df[~date_range]            datos_actuales = df[date_range]

Después de elegir los conjuntos de datos de referencia y actuales, generaremos informes sobre la deriva de datos, la calidad de los datos, la calidad del modelo y la deriva del objetivo. El código para implementar la generación de informes se divide en 3 partes según el diseño de la arquitectura MVC. Veamos el código en estos 3 archivos.

Fragmento de código para flight_delay_Controller.py

import streamlit as stfrom src.view.flight_delay_view import FlightDelayViewfrom src.model.flight_delay_model import FlightDelayModelimport numpy as npimport pandas as pdfrom scipy import statsimport time# Controllerclass FlightDelayController:    """    El componente Controlador para la aplicación de predicción de retraso de vuelo.    Esta clase orquesta la interacción entre los componentes Modelo y Vista.    """    def __init__(self):        # Inicializa el controlador creando instancias del Modelo y la Vista.        self.model = FlightDelayModel()        # Crea una instancia del FlightDelayModel.        self.view = FlightDelayView()        # Crea una instancia del FlightDelayView.        self.selected_data = self.model.selected_data()        # Obtiene los datos seleccionados del modelo.        self.categorical_options = self.model.categorical_features()        # Obtiene las características categóricas del modelo.    def run_monitoring(self):        # Función para ejecutar la aplicación de monitoreo.        # Establece el título y una breve descripción.        st.title("Aplicación de Monitoreo de Datos y Modelo")        st.write("Estás en la Aplicación de Monitoreo de Datos y Modelo. Selecciona el rango de fecha y mes en la barra lateral y haz clic en 'Enviar' para comenzar el entrenamiento y monitoreo del modelo.")        # Selecciona qué informes generar usando casillas de verificación.        st.subheader("Selecciona los Informes a Generar")        generate_model_report = st.checkbox("Generar Informe de Rendimiento del Modelo")        generate_target_drift = st.checkbox("Generar Informe de Deriva del Objetivo")        generate_data_drift = st.checkbox("Generar Informe de Deriva de Datos")        generate_data_quality = st.checkbox("Generar Informe de Calidad de Datos")        if st.button("Enviar"):            # 'date_range' y 'df' no se muestran aquí ya que ya se muestran en los fragmentos de código anteriores            # Datos de referencia sin el rango de fecha seleccionado.            reference_data = df[~date_range]            # Datos actuales dentro del rango de fecha seleccionado.            current_data = df[date_range]            self.view.display_monitoring(reference_data, current_data)  # Muestra los datos de monitoreo.            self.model.train_model(reference_data, current_data)  # Entrena el modelo.            # Genera los informes seleccionados y los muestra.            if generate_model_report:                st.write("### Informe de Rendimiento del Modelo")                st.write("Generando Informe de Rendimiento del Modelo...")                performance_report = self.model.performance_report(reference_data, current_data)                self.view.display_report(performance_report, "Informe de Rendimiento del Modelo")            if generate_target_drift:                st.write("### Informe de Deriva del Objetivo")                st.write("Generando Informe de Deriva del Objetivo...")                target_report = self.model.target_report(reference_data, current_data)                self.view.display_report(target_report, "Informe de Deriva del Objetivo")            if generate_data_drift:                st.write("### Informe de Deriva de Datos")                st.write("Generando Informe de Deriva de Datos...")                data_drift_report = self.model.data_drift_report(reference_data, current_data)                self.view.display_report(data_drift_report, "Informe de Deriva de Datos")            if generate_data_quality:                st.write("### Informe de Calidad de Datos")                st.write("Generando Informe de Calidad de Datos...")                data_quality_report = self.model.data_quality_report(reference_data, current_data)                self.view.display_report(data_quality_report, "Informe de Calidad de Datos")

Código para Flight_delay_model.py

import pandas as pdimport joblibimport xgboost as xgbfrom evidently.pipeline.column_mapping import ColumnMappingfrom evidently.report import Reportfrom evidently.metric_preset import DataDriftPresetfrom evidently.metric_preset import TargetDriftPresetfrom evidently.metric_preset import DataQualityPresetfrom evidently.metric_preset.regression_performance import RegressionPresetimport timeimport streamlit as stfrom typing import Dict# Modelclass FlightDelayModel:    """    El componente Modelo para la aplicación de predicción de retraso de vuelo.    Esta clase maneja la carga de datos, la carga del modelo y las predicciones de retraso.    """    def __init__(self, model_file="models/best_model.pkl"):        # Inicializa la clase FlightDelayModel        # Define el mapeo de columnas para el análisis de datos        self.column_mapping = ColumnMapping()        self.column_mapping.target = self.target        self.column_mapping.prediction = 'prediction'        self.column_mapping.numerical_features = self.numerical_features    # Informe de rendimiento del modelo    def performance_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Genera un informe de rendimiento para las predicciones del modelo.        Args:            reference_data (pd.DataFrame): Conjunto de datos de referencia para la comparación.            current_data (pd.DataFrame): Conjunto de datos actual con predicciones.        Returns:            Report: Un informe que contiene métricas de rendimiento de regresión.        """        regression_performance_report = Report(metrics=[RegressionPreset()])        regression_performance_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return regression_performance_report    def target_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Genera un informe para el análisis de deriva del objetivo.        Args:            reference_data (pd.DataFrame): Conjunto de datos de referencia para la comparación.            current_data (pd.DataFrame): Conjunto de datos actual con predicciones.        Returns:            Report: Un informe que contiene métricas de deriva del objetivo.        """        target_drift_report = Report(metrics=[TargetDriftPreset()])        target_drift_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return target_drift_report    def data_drift_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Genera un informe para el análisis de deriva de datos.        Args:            reference_data (pd.DataFrame): Conjunto de datos de referencia para la comparación.            current_data (pd.DataFrame): Conjunto de datos actual con predicciones.        Returns:            Report: Un informe que contiene métricas de deriva de datos.        """        data_drift_report = Report(metrics=[DataDriftPreset()])        data_drift_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return data_drift_report    def data_quality_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Genera un informe de calidad de datos (nota: puede llevar alrededor de 10 minutos).        Args:            reference_data (pd.DataFrame): Conjunto de datos de referencia para la comparación.            current_data (pd.DataFrame): Conjunto de datos actual con predicciones.        Returns:            Report: Un informe que contiene métricas de calidad de datos.        """        st.write("Generar el Informe de Calidad de Datos llevará más tiempo, alrededor de 10 minutos, debido a su análisis exhaustivo. Puedes esperar o explorar otros informes si tienes poco tiempo.")        data_quality_report = Report(metrics=[DataQualityPreset()])        data_quality_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return data_quality_report

 Código flight_delay_view.py

# Importar bibliotecas necesarias
import streamlit as st
import pandas as pd

# Definir una clase para la vista de retraso de vuelo
class FlightDelayView:
    """
    Componente de vista para la aplicación de predicción de retraso de vuelo.
    Esta clase maneja la visualización de la aplicación web de Streamlit.
    """

    @staticmethod
    def display_input_form():
        """
        Muestra el formulario de entrada en la aplicación de Streamlit.
        """
        st.title("Aplicación de Predicción de Retraso de Vuelo")
        st.write("Esta aplicación predice la duración del retraso de un vuelo en minutos.")
        st.sidebar.header("Entrada del Usuario")

    @staticmethod
    def display_monitoring(reference_data, current_data):
        """
        Muestra información de monitoreo.
        Args:
            reference_data (DataFrame): Dataset de referencia.
            current_data (DataFrame): Dataset actual.
        """
        st.write("Por favor, desplácese hacia abajo para ver su informe")
        st.write("Forma del Dataset de Referencia:", reference_data.shape)
        st.write("Forma del Dataset Actual:", current_data.shape)
        # Información de entrenamiento del modelo
        st.write("### El modelo está entrenando...")

    @staticmethod
    def display_selected_inputs(selected_data):
        """
        Muestra las entradas de usuario seleccionadas.
        Args:
            selected_data (dict): Datos de entrada proporcionados por el usuario.
        """
        input_data = pd.DataFrame([selected_data])
        st.write("Entradas Seleccionadas:")
        st.write(input_data)
        return input_data

    @staticmethod
    def display_predicted_delay(flight_delay):
        """
        Muestra el retraso de vuelo predicho.
        Args:
            flight_delay (float): Retraso de vuelo predicho en minutos.
        """
        st.write("Retraso de Vuelo Predicho (minutos):", round(flight_delay[0], 2))

    @staticmethod
    def display_report(report, report_name: str):
        """
        Muestra un informe generado por Evidently.
        Args:
            report (Report): El informe de Evidently a mostrar.
            report_name (str): Nombre del informe (por ejemplo, "Informe de Rendimiento del Modelo").
        """
        st.write(f"{report_name}")
        # Mostrar el informe de Evidently como HTML con desplazamiento habilitado
        st.components.v1.html(report.get_html(), height=1000, scrolling=True)

Ahora, llega la aplicación final de Streamlit, en la que se han integrado tanto la predicción como el monitoreo.

app.py

import streamlit as st
import pandas as pd
from src.controller.flight_delay_controller import FlightDelayController

def main():
    st.set_page_config(page_title="Aplicación de Predicción de Retraso de Vuelo", layout="wide")
    # Inicializar el controlador
    controller = FlightDelayController()
    # Crear la aplicación de Streamlit
    st.sidebar.title("Aplicación de Predicción de Retraso de Vuelo y Monitoreo de Datos y Modelo")
    choice = st.sidebar.radio("Seleccione una opción:", ("Realizar Predicciones", "Monitorear Datos y Modelo"))
    if choice == "Realizar Predicciones":
        controller.run_prediction()
    elif choice == "Monitorear Datos y Modelo":
        controller.run_monitoring()

if __name__ == '__main__':
    principal()

Para ejecutar la aplicación Streamlit, ejecute

# Para ejecutar la aplicación, ejecute lo siguiente:
streamlit run app.py

Visite http://localhost:8501 en su navegador web para usar la aplicación.

Tablero de Predicción:

Tablero de Monitoreo:

Desbloqueando Ideas con Informes de Evidently

Informe de Cambio de Datos:

Aquí, podemos ver el informe de cambio de datos para nuestro proyecto. Podemos ver que 5 columnas han cambiado. Por lo tanto, nuestro siguiente paso sería analizar las posibles razones detrás del cambio de estas características con los respectivos expertos en el dominio.

Cambio en el Objetivo:

Aquí, podemos ver la desviación del objetivo; no se detecta ninguna desviación aquí.

Informe de rendimiento del modelo

Aquí, podemos ver todas las métricas de rendimiento de nuestro modelo después de analizar el nuevo conjunto de datos en lotes. Podemos tomar las decisiones necesarias en función de las métricas.

Informe de calidad de datos

Por lo general, utilizamos informes de calidad de datos para datos sin procesar y no procesados. Podemos ver todas las métricas relacionadas con los datos, como el número de columnas, la correlación, los valores duplicados, etc. También podemos implementarlo para realizar un análisis exploratorio de datos.

Integración de Docker para implementación:

Comprensión de Docker y su importancia:

Es importante dockerizar nuestro proyecto para ejecutarlo en cualquier entorno sin problemas de dependencia. Docker es una herramienta esencial para empaquetar y distribuir aplicaciones. Aquí están los pasos para configurar y usar Docker para este proyecto.

Dockerfile:

El Dockerfile en el directorio de nuestro proyecto contiene las instrucciones para construir una imagen de Docker. Escriba el Dockerfile con la sintaxis correcta.

Estructura del proyecto Archivo Docker:

Redacción de un Dockerfile eficiente

Es esencial tener el tamaño de la imagen de Docker lo más mínimo posible. Podemos utilizar técnicas como multi-staging en Docker para reducir el tamaño de la imagen y utilizar una imagen base muy ligera para Python.

Código:

# Use una versión oficial de Python como imagen principalFROM python: 3.9-slim# Establecer el directorio de trabajo en /appWORKDIR /app# Crear los directorios necesariosRUN mkdir -p /app/data /app/models /app/src/controller /app/src/model /app/src/view# Copiar los archivos de su máquina host al contenedorCOPY app.py /app/COPY src/controller/flight_delay_controller.py /app/src/controller/COPY src/model/flight_delay_model.py /app/src/model/COPY src/view/flight_delay_view.py /app/src/view/COPY requirements.txt /app/# Crear y activar un entorno virtualRUN python -m venv venvRUN /bin/bash -c "source venv/bin/activate"# Instalar los paquetes necesarios especificados en requirements.txtRUN pip install -r requirements.txt# Instalar wgetRUN apt-get update & apt-get install -y wget# Descargar el archivo de conjunto de datos usando wgetRUN wget -O /app/data/DelayedFlights.csv https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv# Descargar el archivo de mejor modelo usando wgetRUN wget -O /app/models/best_model.pkl https://flightdelay.blob.core.windows.net/flight-delayed-dataset/best_model.pkl# Hacer que el puerto 8501 esté disponible para el mundo exterior de este contenedorEXPOSE 8501# Definir el comando predeterminado para ejecutar tu aplicación StreamlitCMD ["streamlit", "run", "app.py"]

Construcción y ejecución de contenedores Docker

Sigue estos pasos para construir la imagen de Docker y ejecutar un contenedor en la terminal después de escribir el Dockerfile.

docker build -t flight-delay-prediction .docker run -p 8501:8501 flight-delay-prediction

Compartir nuestra aplicación Streamlit con Streamlit Sharing:

Después de construir nuestra aplicación Streamlit para compartirla con otros, podemos usar Streamlit Sharing para alojar nuestras aplicaciones de forma gratuita. Aquí están los siguientes pasos a seguir:

1) Organizar el proyecto: Asegúrese de tener un archivo app.py en el directorio raíz.

2) Definir dependencias: Cree un archivo requirements.txt para que la compartición de Streamlit instale todos los paquetes necesarios mencionados en este archivo.

3) Usar GitHub: Envíe el repositorio a GitHub para integrarlo sin problemas con la compartición de Streamlit.

Luego, regístrate en Streamlit sharing, pega la URL de tu repositorio de GitHub y haz clic en “deploy”, ahora se generará la URL de tu proyecto. Puedes compartirla con otras personas.

Conclusión

Esta guía nos enseñó cómo integrar herramientas como Streamlit, Azure, Docker y Evidently para construir increíbles aplicaciones basadas en datos. Al concluir esta guía, tendrás suficiente conocimiento para construir aplicaciones web con Streamlit, aplicaciones portátiles a través de Docker y asegurar la calidad de los datos y el modelo a través de Evidently. Sin embargo, esta guía se trata de leer y aplicar este conocimiento en tus próximos proyectos de ciencia de datos. Intenta experimentar, innovar y explorar mejoras adicionales que se puedan realizar. Gracias por acompañarme hasta el final de la guía. ¡Sigue aprendiendo, sigue haciéndolo y sigue creciendo!

Puntos clave

  • Es esencial dockerizar nuestro proyecto de aprendizaje automático para que se pueda ejecutar en cualquier entorno sin problemas de dependencias.
  • Para reducir el tamaño de la imagen Docker, considera utilizar la técnica de multi-etapas.
  • Implementar la arquitectura MVC en el código reduce la complejidad y mejora la legibilidad del código en aplicaciones web complejas.
  • La integración nos ayuda a monitorear la calidad de los datos y el modelo en el entorno de producción.
  • Después de analizar todos los informes de datos y modelos, se deben tomar las acciones apropiadas.

Preguntas frecuentes

Repositorio de GitHub: https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture

We will continue to update Zepes; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

Inteligencia Artificial

Los Anunciantes más Grandes del Mundo Aceptan el Poder de la IA Un Cambio de Paradigma en la Publicidad

En un movimiento que podría remodelar el panorama publicitario, algunos de los anunciantes más renombrados del mundo ...

Inteligencia Artificial

Aterrizaje de Chandrayaan 3 Cómo la IA y los sensores ayudaron en la épica empresa lunar de la ISRO.

En la fascinante expansión de la exploración espacial, cada misión es una apuesta cósmica, cada una un lanzamiento de...

Ciencias de la Computación

La inteligencia artificial se utilizó para crear una nueva canción final de los Beatles, según Paul McCartney.

El músico dice que utilizó tecnología para 'extraer' la voz de John Lennon de una vieja demo y completar una canción ...

Inteligencia Artificial

Conoce los Modelos de Difusión Compartimentados (CDM) Un enfoque de IA para entrenar diferentes modelos de difusión o indicaciones en distintas fuentes de datos.

Con los avances recientes en tecnología y en el campo de la Inteligencia Artificial, ha habido mucho progreso y mejor...

Inteligencia Artificial

El mundo natural potencia el futuro de la visión por computadora

Un sistema de software de código abierto tiene como objetivo mejorar el entrenamiento de sistemas de visión por compu...