Simplifique la infraestructura de transmisión con soporte de distribución mejorada para Kinesis Knowledge Streams en transmisión estructurada


Históricamente, la sabiduría convencional sostenía que el procesamiento de flujo es el dominio de casos de uso operativo de latencia estrictamente baja, como actualizaciones de inventario en fracciones de segundo en un sitio internet de comercio electrónico. En contradicción directa con eso, hemos observado que muchas de las cargas de trabajo más importantes de nuestros clientes procesan grandes volúmenes de datos generados continuamente para su uso en análisis, BI y ML. Eso no quiere decir que esos casos de uso operativo no sean importantes, absolutamente lo son. En cambio, la conclusión aquí es que el mundo del procesamiento de flujo no está definido por la latencia, sino por el paradigma de procesamiento incremental de datos generados continuamente. Varios consumidores con diferentes objetivos pueden confiar en las mismas fuentes de datos.

Databricks tiene la misión de hacer que el procesamiento de flujo sea más rápido, más consistente y más fácil de usar para nuestros clientes, por lo que estamos creando nuevos conectores y agregando nuevas funciones a los existentes como parte del esfuerzo basic para desarrollar el próxima generación de Streaming Estructurado. Nos complace anunciar que el Conector de Kinesis en Databricks Runtime ahora es suitable con Fan-out mejorado (EFO) para flujos de datos de Kinesis.

¿Por qué importa esto? Nuestros clientes utilizan Transmisión estructurada para potenciar las cargas de trabajo de ETL críticas para el negocio, pero el análisis y el ML no son los únicos usos para esos datos: puede haber una cantidad de otras aplicaciones que consumen de un flujo de Kinesis determinado. Con soporte para EFO, nuestros clientes ahora pueden consolidar su infraestructura en menos flujos que potencian los casos de uso tanto operativos como de ETL. Nuestro conector sigue siendo totalmente suitable con transmisiones no mejoradas, por lo que nuestros clientes conservan la opción de cuándo tiene sentido asumir el costo adicional de ejecutar consumidores de EFO.

Descripción basic de Kinesis Knowledge Streams

Dado que este es un weblog técnico, vamos a entrar en detalles sobre cómo funciona todo esto, comenzando con una descripción basic de Flujos de datos de Amazon Kinesis (KDS).

KDS es un servicio de transmisión de datos sin servidor que facilita la captura, el procesamiento y el almacenamiento de flujos de datos a cualquier escala. KDS sirve como la columna vertebral de las aplicaciones de transmisión personalizadas que recopilan de fuentes como registros de aplicaciones, fuentes de redes sociales y datos de flujo de clics. Los datos se colocan en Kinesis Knowledge Streams, lo que garantiza durabilidad y elasticidad. El aspecto de servicio administrado de Kinesis Knowledge Streams libera a los usuarios de la carga operativa de crear y ejecutar una canalización de entrada de datos. La elasticidad de Kinesis Knowledge Streams le permite escalar el flujo hacia arriba o hacia abajo, para que nunca pierda registros de datos antes de que caduquen.

Figura 1: Kinesis Data Streams (enlace)
Figura 1: Kinesis Knowledge Streams (enlace)

Modos de consumo

KDS logra el paralelismo de rendimiento a través del concepto de fragmentos, cada uno de los cuales contiene una secuencia de mensajes o registros. Hay dos formas diferentes de consumir datos de fragmentos:

  1. Modo de rendimiento compartido: Este es el modo por defecto. Cada fragmento en una secuencia proporciona un máximo de 2 MB/s de rendimiento de lectura. Este rendimiento se comparte entre los consumidores que leen un fragmento. Por lo tanto, si varias aplicaciones consumen mensajes de un fragmento, se comparten entre ellas 2 MB/s de rendimiento de lectura.
  2. Modo Fan-Out mejorado (rendimiento dedicado): Como parte de este modo, cada aplicación que lee desde un fragmento se registra como consumidor y obtiene un rendimiento de lectura de 2 MB/s de forma particular person. Esto permite que varias aplicaciones lean en paralelo desde un fragmento sin compartir el rendimiento de lectura, lo que a su vez mejora el rendimiento del procesamiento.
    • Nota: hay un límite de 20 consumidores registrados para cada transmisión mejorada y un costo de recuperación de datos asociado
Figura 2: Conector de Kinesis de Spark de Databricks (modo de rendimiento compartido frente a EFO)
Figura 2: Conector de Kinesis de Spark de Databricks (modo de rendimiento compartido frente a EFO)

Conector Kinesis de ladrillos de datos

Anteriormente, el conector de Kinesis en Databricks Runtime solo admitía el modo de rendimiento compartido junto con la compatibilidad con la creación de fragmentos (tanto la fusión como la división de fragmentos). Se basa en las API de Kinesis y utiliza el SDK de Java de AWS.

Con el lanzamiento de Tiempo de ejecución de ladrillos de datos 11.3 LTSel conector de Kinesis ahora admite secuencias que utilizan EFO.

El seguimiento configuraciones se introducen para habilitar y configurar esta función:

  • streamName – Una lista separada por comas de nombres de secuencias.
    • Predeterminado: Ninguno (Este es un parámetro obligatorio)
  • región – Región para que se especifiquen los flujos.
    • Valor predeterminado: región resuelta localmente
  • posición inicial – Outline desde dónde empezar a leer en stream.newest,.
    • Predeterminado: último
    • Valores posibles: último, trim_horizon, más temprano, at_timestamp
  • modo consumidor – Tipo de consumidor para ejecutar la consulta de transmisión con…
    • Predeterminado: sondeo
    • Valores posibles: sondeo o efo

Este es un ejemplo de Scala de una aplicación de transmisión estructurada que lee desde un flujo de datos de Kinesis utilizando EFO y escribe en una tabla de Delta Lake:


val kinesisDF = spark.readStream
  .format("kinesis")
  .choice("streamName","<stream_name>" )
  .choice("area", "<aws_region>")
  .choice("initialPosition", "newest")
  .choice("awsAccessKey", "<access_key>")
  .choice("awsSecretKey", "<secret_key>")
  .choice("consumerMode", "efo")
  .choice("consumerName", "<consumer_name>") // makes use of question id by default
  .choice("requireConsumerDeregistration", <true|false>) // false by default
  .load()

val question = kinesisDF
  .writeStream
  .format("delta")
  .outputMode("append")
  .choice("checkpointLocation", "(checkpoint location)")
  .set off(as soon as=True) 
  .desk("delta desk identify")

Funcionamiento en modo Fan-out mejorado (EFO)

Aquí hay información útil para tener en cuenta al ejecutar canalizaciones de transmisión en modo EFO.

Registro de Consumidores

  • Amazon Kinesis requiere que los consumidores se registren explícitamente para usar con el modo EFO
  • El conector de Databricks Kinesis ofrece la opción para registrar un nuevo consumidor o reutilizar un consumidor existente
  • El nombre del consumidor es un campo opcional. Si se proporciona uno explícitamente con la consulta de transmisión, ese nombre se usa como el nombre del consumidor de EFO. Si no se proporciona uno, entonces el identificador de consulta único asociado con la consulta de transmisión se usa como el nombre del consumidor.
  • Si el consumidor EFO con el nombre anterior ya está registrado en Amazon Kinesis, ese consumidor se usa directamente. De lo contrario, se registra un nuevo consumidor EFO con Amazon Kinesis.
  • Por ejemplo, para registrar un nuevo consumidor con el nombre “testConsumer” para el flujo “testStream”, puede escribir la consulta como

val kinesisDF = spark.readStream
  .format("kinesis")
  .choice("streamName","testStream")
  .choice("consumerMode", "efo")
  .choice("consumerName", "testConsumer")
  .load()

Baja del Consumidor

  • De manera related al registro, los consumidores de Kinesis EFO también deben cancelar su registro explícitamente para evitar incurrir en cargos por hora por consumidor
  • El conector de Kinesis de Databricks proporciona un par de opciones para realizar esta cancelación de registro desde dentro de la plataforma de Databricks:
    
    val kinesisDF = spark.readStream
      .format("kinesis")
      .choice("streamName","testStream")
      .choice("consumerMode", "efo")
      .choice("consumerName", "testConsumer")
      .choice("requireConsumerDeregistration", "true")
      .load()
    
    
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager
    
    // === Configurations for Kinesis streams ===
    // If you're utilizing IAM roles to connect with a Kinesis stream (beneficial), you don't want to set the entry key and the key key
    val awsAccessKeyId = "YOUR ACCESS KEY ID"
    val awsSecretKey = "YOUR SECRET KEY"
    val kinesisRegion = "YOUR REGION" // e.g.- "us-west-2"
    
    // Create an AWS Kinesis client supervisor
    val supervisor = AWSKinesisConsumerManager.newManager()
    .choice("awsAccessKey", awsAccessKeyId)
    .choice("awsSecretKey", awsSecretKey)
    .choice("area", kinesisRegion)
    .create()
    
    // Checklist shoppers registered with "testStream"
    val shoppers = supervisor.listConsumers("testStream")
    show(shoppers)
    
    // De-register client with identify "testConsumer"
    supervisor.deregisterConsumer("testStream", "testConsumer")
    
    • Uno opción es establecer el indicador “requireConsumerDeregistration” en verdadero, lo que realizaría la cancelación del registro del consumidor al salir de la consulta en un mejor esfuerzo base. Si Databricks Runtime anula el registro del consumidor porque una consulta se detiene o falla, la consulta se vuelve a ejecutar y creará un consumidor con el mismo nombre, pero con un ARN diferente. Si confía en el ARN del consumidor por algún motivo, no utilice esta función de cancelación automática del registro.
      • Nota: es posible que la cancelación del registro del consumidor falle debido a situaciones como bloqueos del programa e interrupciones de la pink, por lo que recomendamos que los clientes verifiquen periódicamente a todos los consumidores registrados para asegurarse de que no haya consumidores huérfanos que incurran en costos innecesariamente.
    • El otro opción es ejecutar la utilidad de administración de consumidores fuera de línea desde un cuaderno de Databricks que le permite enumerar, registrar y cancelar el registro de los consumidores asociados con un flujo de datos de Kinesis de forma advert hoc.

Combinación de encuestas y consumidores de EFO

  • amazonas kinesis permite para el uso de ambos tipos de consumidores para el mismo flujo al mismo tiempo
    • Nota: los consumidores del modo de sondeo aún están sujetos al mismo límite de rendimiento de 2 MB/s que se comparte entre todos esos consumidores
  • El conector de Databricks Kinesis también permite ejecutar ambos tipos de consumidor en paralelo con solo proporcionar el mismo nombre de transmisión de Kinesis y diferentes modos de consumidor, según sea necesario por consulta.

Arquitectura de referencia

Figura 3: Arquitectura de referencia de Databricks con modo de rendimiento compartido
Figura 3: Arquitectura de referencia de Databricks con modo de rendimiento compartido

Figura 4: Arquitectura de referencia de Databricks con modo EFO
Figura 4: Arquitectura de referencia de Databricks con modo EFO

Esta arquitectura de referencia muestra cuántos de nuestros clientes utilizan actualmente Kinesis dentro de su ecosistema de datos. Tienen consumidores basados ​​en Spark, como aplicaciones de transmisión estructurada para ETL, y consumidores que no son de Spark, como funciones Lambda para casos de uso operativo.

Si tomamos un ejemplo de un servicio de transmisión de video, emite una serie de eventos, ya sean producidos por el usuario (eventos de visualización como pausa, reproducción, salida, and so forth.) o la propia aplicación (fotogramas eliminados, almacenamiento en búfer de transmisión de video, and so forth.). ). Si separamos los casos de uso en función de lo operativo frente a lo ETL, los consumidores de los mismos datos pueden verse así:

  • Operacional: el equipo de SRE puede tener comprobaciones de estado que utilicen eventos de vigilancia que se producen en un usuario determinado y otra telemetría de la aplicación, que pueden requerir una acción inmediata si fallan. Estos casos de uso suelen ser de máquina a máquina y también pueden incluir paneles en tiempo actual, por lo que la sensibilidad a la latencia es alta. Por lo basic, estos datos no se conservan durante mucho tiempo debido al costo de la reducción de la latencia.
  • ETL: puede haber equipos de análisis / ML que deseen utilizar los mismos eventos de vigilancia y telemetría de aplicaciones para ejecutar análisis descriptivos para informes o análisis predictivos para cosas como la detección de anomalías y la rotación. Su sensibilidad a la latencia es mucho más baja, pero necesitan que los datos se remonten mucho más atrás históricamente y necesitan que sean duraderos.

Para los casos de uso de ETL, nuestros clientes escriben los datos con la menor cantidad de transformaciones posible en un lago delta de bronce mesa. Por lo basic, estos son trabajos sin estado o solo de mapas, que leen los datos de un flujo de datos de Kinesis, los deserializan y luego los escriben en Delta Lake. Este patrón se mantiene, independientemente de si el flujo de datos de Kinesis está en modo de rendimiento compartido o en modo EFO. Tenga en cuenta que con el modo EFO, la arquitectura es más easy porque los usuarios ya no tendrán que duplicar secuencias para eludir la falta de compatibilidad con EFO en el conector de Databricks Kinesis. Los trabajos posteriores se leen de la tabla de bronce Delta Lake como fuente de transmisión y transforman gradualmente nuevos registros en tablas plateadas y doradas para el consumo.

¿Por qué Delta Lake? Su caso de uso canónico fue transmisión de ETL y análisis – las propiedades ACID combinadas con la tolerancia a fallas de Structured Streaming hacen que la recuperación sea sencilla y una API sencilla compactar los archivos de forma asíncrona después de las escrituras alivia el problema de los archivos pequeños para los lectores posteriores. Es un formato de almacenamiento abierto fácil de usar que brinda a los usuarios una forma de procesar de manera confiable grandes volúmenes de datos para una amplia variedad de casos de uso.

Conclusión

La compatibilidad con EFO brinda a los usuarios de Kinesis en Databricks una forma de simplificar su infraestructura al hacer que las cargas de trabajo de Spark y las que no son de Spark consuman los mismos flujos sin competir por la capacidad de rendimiento. A medida que las empresas continúan desarrollando capacidades operativas que se basan en fuentes de transmisión de datos, la simplicidad de la infraestructura es una de las claves para la escalabilidad. Un flujo duplicado menos es un elemento menos para administrar y un punto de falla menos. A medida que ampliamos nuestro ecosistema de conectores y simplificamos el procesamiento de transmisiones como parte de proyecto velocidad de la luz¡esté atento a las nuevas funciones que harán su vida más fácil!

Related Articles

Comments

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Same Category

spot_img

Stay in touch!

Follow our Instagram