Maneje las operaciones de datos de UPSERT utilizando Delta Lake y AWS Glue de código abierto


Muchos clientes necesitan un lago de datos de transacciones ACID (atómico, consistente, aislado, duradero) que pueda registrar la captura de datos modificados (CDC) de fuentes de datos operativos. También existe la demanda de fusionar datos en tiempo actual en datos por lotes. El marco de Delta Lake proporciona estas dos capacidades. En esta publicación, discutimos cómo manejar UPSERT (actualizaciones e inserciones) de los datos operativos utilizando Delta Lake integrado de forma nativa con Pegamento AWSy consulta el Delta Lake usando amazona atena.

Examinamos una organización de seguros hipotética que emite pólizas comerciales a pequeñas y medianas empresas. Los precios de los seguros varían según varios criterios, como la ubicación de la empresa, el tipo de empresa, la cobertura contra terremotos o inundaciones, and so forth. Esta organización planea construir una plataforma de análisis de datos, y los datos de la póliza de seguro son una de las entradas de esta plataforma. Debido a que el negocio está creciendo, cada mes se inscriben y renuevan cientos y miles de nuevas pólizas de seguro. Por lo tanto, todos estos datos operativos deben enviarse a Delta Lake casi en tiempo actual para que la organización pueda realizar varios análisis y crear modelos de aprendizaje automático (ML) para servir a sus clientes de una manera más eficiente y rentable.

Descripción basic de la solución

Los datos pueden tener su origen en cualquier fuente, pero, por lo basic, los clientes desean llevar datos operativos a los lagos de datos para realizar análisis de datos. Una de las soluciones es traer los datos relacionales usando Servicio de migración de bases de datos de AWS (AWS DMS). Las tareas de AWS DMS se pueden configurar para copiar la carga completa y los cambios continuos (CDC). La carga completa y la carga de CDC se pueden llevar a las capas de almacenamiento sin procesar y seleccionadas (Delta Lake) en el lago de datos. Para mantenerlo easy, en esta publicación optamos por las fuentes de datos y la capa de ingesta; se supone que los datos ya se copiaron en el depósito sin procesar en forma de archivos CSV. Un trabajo de ETL de AWS Glue realiza la transformación necesaria y copia los datos en la capa de Delta Lake. La capa de Delta Lake garantiza el cumplimiento de ACID de los datos de origen.

El siguiente diagrama ilustra la arquitectura de la solución.
Diagrama de arquitectura

El caso de uso que usamos en esta publicación es sobre una compañía de seguros comercial. Usamos un conjunto de datos easy que contiene las siguientes columnas:

  • Política – Número de póliza, ingresado como texto
  • Expiración – Fecha de vencimiento de la póliza
  • Ubicación – Tipo de ubicación (Urbana o Rural)
  • Estado – Nombre del estado donde se encuentra la propiedad
  • Región – Región geográfica donde se encuentra la propiedad
  • Valor asegurado – El valor de la propiedad
  • Tipo de negocio – Tipo de uso comercial para la propiedad, como agricultura o comercio minorista
  • Terremoto – Está incluida la cobertura contra terremotos (S o N)
  • Inundación – ¿Está incluida la cobertura contra inundaciones? (S o N)

El conjunto de datos contiene una muestra de 25 pólizas de seguro. En el caso de un conjunto de datos de producción, puede contener millones de registros.

policy_id,expiry_date,location_name,state_code,region_name,insured_value,business_type,earthquake,flood
200242,2023-01-02,City,NY,East,1617630,Retail,N,N
200314,2023-01-02,City,NY,East,8678500,Condominium,Y,Y
200359,2023-01-02,Rural,WI,Midwest,2052660,Farming,N,N
200315,2023-01-02,City,NY,East,17580000,Condominium,Y,Y
200385,2023-01-02,City,NY,East,1925000,Hospitality,N,N
200388,2023-01-04,City,IL,Midwest,12934500,Condominium,Y,Y
200358,2023-01-05,City,WI,Midwest,928300,Workplace Bldg,N,N
200264,2023-01-07,Rural,NY,East,2219900,Farming,N,N
200265,2023-01-07,City,NY,East,14100000,Condominium,Y,Y
100582,2023-03-25,City,NJ,East,4651680,Condominium,Y,Y
100487,2023-03-25,City,NY,East,5990067,Condominium,N,N
100519,2023-03-25,Rural,NY,East,4102500,Farming,N,N
100462,2023-03-25,City,NY,East,3400000,Development,Y,Y
100486,2023-03-26,City,NY,East,9973900,Condominium,Y,Y
100463,2023-03-27,City,NY,East,15480000,Workplace Bldg,Y,Y
100595,2023-03-27,Rural,NY,East,2446600,Farming,N,N
100617,2023-03-27,City,VT,Northeast,8861500,Workplace Bldg,N,N
100580,2023-03-30,City,NH,Northeast,97920,Workplace Bldg,Y,Y
100581,2023-03-30,City,NY,East,5150000,Condominium,Y,Y
100475,2023-03-31,Rural,WI,Midwest,1451662,Farming,N,N
100503,2023-03-31,City,NJ,East,1761960,Workplace Bldg,N,N
100504,2023-03-31,Rural,NY,East,1649105,Farming,N,N
100616,2023-03-31,City,NY,East,2329500,Condominium,N,N
100611,2023-04-25,City,NJ,East,1595500,Workplace Bldg,Y,Y
100621,2023-04-25,City,MI,Central,394220,Retail,N,N

En las siguientes secciones, recorremos los pasos para realizar las operaciones UPSERT de Delta Lake. usamos el Consola de administración de AWS para realizar todos los pasos. Sin embargo, también puede automatizar estos pasos usando herramientas como Formación en la nube de AWSel Equipment de desarrollo de la nube de AWS (AWS CDK), Terraforms, and so forth.

requisitos previos

Esta publicación está dirigida a arquitectos, ingenieros, desarrolladores y científicos de datos que construyen, diseñan y crean soluciones analíticas en AWS. Esperamos una comprensión básica de la consola, AWS Glue, Servicio de almacenamiento easy de Amazon (Amazon S3) y Atenea. Además, la persona puede crear Administración de acceso e identidad de AWS (IAM), crear y ejecutar trabajos y rastreadores de AWS Glue, y puede trabajar con el editor de consultas de Athena.

Utilice la versión 3 del motor de consulta de Athena para consultar las tablas del lago delta, más adelante en la sección “Consulta de la carga completa con Athena”.

Atenea QE V3

Configurar un depósito S3 para fuentes de datos de carga completa y CDC

Para configurar su depósito S3, full los siguientes pasos:

  1. Inicie sesión en su cuenta de AWS y elija la región más cercana a usted.
  2. En la consola de Amazon S3, cree un depósito nuevo. Asegúrese de que el nombre sea único (por ejemplo, delta-lake-cdc-blog-<some random quantity>).
  3. Cree las siguientes carpetas:
    1. $nombre_del_depósito/carga completa – Esta carpeta se usa para una carga completa única desde la fuente de datos ascendente
    2. $nombre_de_depósito/cdcload – Esta carpeta se utiliza para copiar los cambios de datos ascendentes
    3. $nombre_del_depósito/delta – Esta carpeta contiene los archivos de datos de Delta Lake
  4. Copie el conjunto de datos de muestra y guárdelo en un archivo llamado full-load.csv a su máquina native.
  5. Cargue el archivo usando la consola de Amazon S3 en la carpeta $bucket_name/fullload.

carpetas s3

Configurar una política y un rol de IAM

En esta sección, creamos una política de IAM para el acceso al depósito de S3 y un rol para que se ejecuten los trabajos de AWS Glue, y también usamos el mismo rol para consultar Delta Lake con Athena.

  1. En la consola de IAM, elija Políticas en el panel de navegación.
  2. Elegir Crear política.
  3. Seleccionar JSON tab y pegue el siguiente código de política. Reemplace la {bucket_name} que creó en el paso anterior.
{
    "Model": "2012-10-17",
    "Assertion": (
        {
            "Sid": "AllowListingOfFolders",
            "Motion": (
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ),
            "Impact": "Enable",
            "Useful resource": (
                "arn:aws:s3:::{bucket_name}"
            )
        },
        {
            "Sid": "ObjectAccessInBucket",
            "Impact": "Enable",
            "Motion": (
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ),
            "Useful resource": "arn:aws:s3:::{bucket_name}/*"
        }
    )
}

  1. Nombra la política delta-lake-cdc-blog-policy y seleccione Crear política.
  2. En la consola de IAM, elija roles en el panel de navegación.
  3. Elegir Crear rol.
  4. Seleccione AWS Glue como su entidad de confianza y elija Próximo.
  5. Seleccione la política que acaba de crear y con dos políticas adicionales administradas por AWS:
    1. delta-lake-cdc-blog-policy
    2. AWSGlueServiceRole
    3. CloudWatchFullAccess
  1. Elegir Próximo.
  2. Asigne un nombre al rol (por ejemplo, delta-lake-cdc-blog-role).

función de gestión de identidades y accesos

Configurar trabajos de AWS Glue

En esta sección, configuramos dos trabajos de AWS Glue: uno para carga completa y otro para la carga de CDC. Comencemos con el trabajo de carga completa.

  1. En la consola de AWS Glue, en Integración de datos y ETL en el panel de navegación, elija Trabajos. AWS Glue Studio se abre en una nueva pestaña.
  2. Seleccionar Editor de secuencias de comandos de chispa y elige Crear.

Editor de estudio de pegamento

  1. En el editor de secuencias de comandos, reemplace el código con el siguiente fragmento de código
import sys
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.sql.sorts import *

## @params: (JOB_NAME)
args = getResolvedOptions(sys.argv, ('JOB_NAME','s3_bucket'))

# Initialize Spark Session with Delta Lake
spark = SparkSession 
.builder 
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
.getOrCreate()

#Outline the desk schema
schema = StructType() 
      .add("policy_id",IntegerType(),True) 
      .add("expiry_date",DateType(),True) 
      .add("location_name",StringType(),True) 
      .add("state_code",StringType(),True) 
      .add("region_name",StringType(),True) 
      .add("insured_value",IntegerType(),True) 
      .add("business_type",StringType(),True) 
      .add("earthquake_coverage",StringType(),True) 
      .add("flood_coverage",StringType(),True) 

# Learn the total load
sdf = spark.learn.format("csv").choice("header",True).schema(schema).load("s3://"+ args('s3_bucket')+"/fullload/")
sdf.printSchema()

# Write information as DELTA TABLE
sdf.write.format("delta").mode("overwrite").save("s3://"+ args('s3_bucket')+"/delta/insurance coverage/")

  1. Navegar a la Detalles del trabajo pestaña.
  2. Proporcione un nombre para el trabajo (por ejemplo, Full-Load-Job).
  3. Para Rol de gestión de identidades y accesos¸ elige el rol delta-lake-cdc-blog-role que creaste anteriormente.
  4. Para tipo de trabajadorelegir G 2X.
  5. Para marcador de trabajoelegir Desactivar.
  6. Colocar Número de reintentos a 0.
  7. Bajo Propiedades avanzadas¸ mantenga los valores predeterminados, pero proporcione la ruta del archivo JAR delta core para Ruta de la biblioteca de Python y Ruta de los JAR dependientes.
  8. En los parámetros del trabajo:
    1. Añadir la clave --s3_bucket con el nombre del depósito que creó anteriormente como valor.
    2. Añadir la clave --datalake-formats y dar el valor delta
  9. Mantenga los valores predeterminados restantes y elija Ahorrar.

Detalles del trabajo

Ahora vamos a crear el trabajo de carga de CDC.

  1. Cree un segundo trabajo llamado CDC-Load-Job.
  2. Siga los pasos en el Detalles del trabajo ficha como en el trabajo anterior.
  3. Alternativamente, puede elegir la opción “Clonar trabajo” del trabajo de carga completa, esto llevará todos los detalles del trabajo del trabajo de carga completa.
  4. En el editor de secuencias de comandos, ingrese el siguiente fragmento de código para la lógica de CDC:
import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.sql.session import SparkSession
from pyspark.sql.capabilities import col
from pyspark.sql.capabilities import expr

## For Delta lake
from delta.tables import DeltaTable


## @params: (JOB_NAME)
args = getResolvedOptions(sys.argv, ('JOB_NAME','s3_bucket'))

# Initialize Spark Session with Delta Lake
spark = SparkSession 
.builder 
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
.getOrCreate()

# Learn the CDC load
cdc_df = spark.learn.csv("s3://"+ args('s3_bucket')+"/cdcload")
cdc_df.present(5,True)

# now learn the total load (newest information) as delta desk
delta_df = DeltaTable.forPath(spark, "s3://"+ args('s3_bucket')+"/delta/insurance coverage/")
delta_df.toDF().present(5,True)

# UPSERT course of if matches on the situation the replace else insert
# if there is no such thing as a key phrase then create an information set with Insert, Replace and Delete flag and do it individually.
# for delete it has to run in loop with delete situation, this script don't deal with deletes.
    
final_df = delta_df.alias("prev_df").merge( 
supply = cdc_df.alias("append_df"), 
#matching on primarykey
situation = expr("prev_df.policy_id = append_df._c1"))
.whenMatchedUpdate(set= {
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")} )
.whenNotMatchedInsert(values =
#inserting a brand new row to Delta desk
{   "prev_df.policy_id"             : col("append_df._c1"),
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")
})
.execute()

Ejecutar el trabajo de carga completa

En la consola de AWS Glue, abra full-load-job y elige Correr. El trabajo tarda aproximadamente 2 minutos en completarse y el estado de ejecución del trabajo cambia a logrado. Ir a $bucket_name y abre el delta carpeta, que contiene la carpeta del seguro. Puede anotar los archivos de Delta Lake en él. Ubicación delta en S3

Crear y ejecutar el rastreador de AWS Glue

En este paso, creamos un rastreador de AWS Glue con Delta Lake como tipo de fuente de datos. Después de ejecutar correctamente el rastreador, inspeccionamos los datos con Athena.

  1. En la consola de AWS Glue, elija rastreadores en el panel de navegación.
  2. Elegir Crear rastreador.
  3. Proporcione un nombre (por ejemplo, delta-lake-crawler) y elige Próximo.
  4. Elegir Agregar una fuente de datos y elige lago delta como su fuente de datos.
  5. Copie el URI de su carpeta delta (por ejemplo, s3://delta-lake-cdc-blog-123456789/delta/insurance coverage) e ingrese la ubicación de la ruta de la tabla Delta Lake.
  6. Mantener la selección predeterminada Crear tablas nativasy elige Agregar una fuente de datos de Delta Lake.
  7. Elegir Próximo.
  8. Elija el rol de IAM que creó anteriormente, luego elija Próximo.
  9. Selecciona el default base de datos de destino y proporcionar delta_ para el prefijo del nombre de la tabla. Si no default existe una base de datos, puede crear una.
  10. Elegir Próximo.
  11. Elegir Crear rastreador.
  12. Ejecute el rastreador recién creado. Una vez que el rastreador está completo, el delta_insurance la tabla está disponible bajo Databases/Tables.
  13. Abra la tabla para comprobar el resumen de la tabla.

Puede observar nueve columnas y sus tipos de datos. Mesa de pegamento

Consulta la carga completa usando Athena

En el paso anterior, creamos el delta_insurance desk ejecutando un rastreador en la ubicación de Delta Lake. En esta sección, consultamos la delta_insurance tabla usando Athena. Tenga en cuenta que si utiliza Athena por primera vez, configure la carpeta de resultados de la consulta para almacenar los resultados de la consulta de Athena (por ejemplo, s3://<your-s3-bucket>/query-output/).

  1. En la consola de Athena, abra el editor de consultas.
  2. Mantenga las selecciones predeterminadas para Fuente de datos y Base de datos.
  3. Ejecutar la consulta SELECT * FROM delta_insurance;. Esta consulta devuelve un complete de 25 filas, lo mismo que en el feed de datos de carga completa.
  4. Para la comparación de CDC, ejecute la siguiente consulta y almacene los resultados en una ubicación donde pueda comparar estos resultados más tarde:
SELECT * FROM delta_insurance
WHERE policy_id IN (100462,100463,100475,110001,110002)
order by policy_id;

La siguiente captura de pantalla muestra el resultado de la consulta de Athena.

Resultados de la consulta de carga completa

Cargue la fuente de datos de CDC y ejecute el trabajo de CDC

En esta sección, actualizamos tres pólizas de seguro e insertamos dos pólizas nuevas.

  1. Copie los siguientes datos de la póliza de seguro y guárdelos localmente como cdc-load.csv:
U,100462,2024-12-31,City,NY,East,3400000,Development,Y,Y
U,100463,2023-03-27,City,NY,East,1000000,Workplace Bldg,Y,Y
U,100475,2023-03-31,Rural,WI,Midwest,1451662,Farming,N,Y
I,110001,2024-03-31,City,CA,WEST,210000,Workplace Bldg,N,N
I,110002,2024-03-31,Rural,FL,East,975000,Retail,N,Y

La primera columna del feed de CDC describe las operaciones UPSERT. U es para actualizar un registro existente, y I es para insertar un nuevo registro.

  1. Cargue el archivo cdc-load.csv en el $bucket_name/cdcload/ carpeta.
  2. En la consola de AWS Glue, ejecute CDC-Load-Job. Este trabajo se encarga de actualizar el Lago Delta en consecuencia.

Los detalles del cambio son los siguientes:

  • 100462 – La fecha de caducidad cambia al 31/12/2024
  • 100463 – Valor asegurado cambia a 1 millón
  • 100475 – Esta política ahora está bajo una nueva zona de inundación
  • 110001 y 110002 – Nuevas políticas añadidas a la tabla
  1. Vuelva a ejecutar la consulta:
SELECT * FROM delta_insurance
WHERE policy_id IN (100462, 100463,100475,110001,110002)
order by policy_id;

Como se muestra en la siguiente captura de pantalla, los cambios en la fuente de datos de CDC se reflejan en los resultados de la consulta de Athena.
Resultados de la consulta de Athena

Limpiar

En esta solución, usamos todos los servicios administrados y no hay costo si los trabajos de AWS Glue no se están ejecutando. Sin embargo, si desea limpiar las tareas, puede eliminar los dos trabajos de AWS Glue, la tabla de AWS Glue y el depósito S3.

Conclusión

Las organizaciones buscan continuamente soluciones analíticas de alto rendimiento, rentables y escalables para extraer el valor de sus fuentes de datos operativos casi en tiempo actual. La plataforma analítica debe estar lista para recibir cambios en los datos operativos tan pronto como ocurran. Las soluciones típicas de lagos de datos enfrentan desafíos para manejar los cambios en los datos de origen; el marco de Delta Lake puede cerrar esta brecha. Esta publicación demostró cómo crear lagos de datos para operaciones UPSERT utilizando AWS Glue y tablas nativas de Delta Lake, y cómo consultar tablas de AWS Glue desde Athena. Puede implementar sus operaciones de datos UPSERT a gran escala con AWS Glue, Delta Lake y realizar análisis con Amazon Athena.

Referencias


Sobre los autores

Praveen allam es arquitecto de soluciones en AWS. Ayuda a los clientes a diseñar aplicaciones escalables y rentables de nivel empresarial utilizando la nube de AWS. Crea soluciones para ayudar a las organizaciones a tomar decisiones basadas en datos.

vivek singh es arquitecto sénior de soluciones del equipo de laboratorio de datos de AWS. Ayuda a los clientes a desbloquear su viaje de datos en el ecosistema de AWS. Sus áreas de interés son la automatización de canalizaciones de datos, la calidad y el gobierno de datos, los lagos de datos y las arquitecturas de casas de lagos.

Related Articles

China ataca a Estados Unidos y los demócratas apuntan a Biden en los dudosos tabloides de esta semana

investigador nacional "George Santos - Psicópata americano!" grita la portada típicamente...

Comments

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Same Category

spot_img

Stay in touch!

Follow our Instagram