Nueva implementación Sofia2 DataFlow

En la versión 3.1 de Sofia2 se ha cambiado la implementación de referencia del módulo Sofia 2 DataFlow, pasando de usar Spring xD a StreamSets, mejorando las capacidades de modelado visual de flujo de datos o Pipeline.


Se define Pipeline como el flujo de datos desde el sistema de origen a los sistemas de destino, permitiendo definir cómo transformar los datos a lo largo del camino. Su diseño sigue las siguiente reglas: un único modulo origen para representar el sistema de origen de la información, posibilidad de agregar múltiples procesadores intermedios para transformar los datos, y al menos un módulo de destino (pudiendo ser múltiples) para definir el grabado de la información.

 

Se pueden distinguir tres formas distintas de ejecutar pipelines:

  • Lanzar pipeline: Activa el Pipeline y únicamente se parará cuando el usuario así lo ordene. Mientras permanezca activo, el Pipeline funcionará en modo Streaming.
  • Ejecución simple: Lanza el Pipeline y, cuando haya terminado de procesar la información, se parará.
  • Ejecución en modo Batch: Lanza el Pipeline en modo Batch a través de su expresión CRON impuesta. Una vez el Pipeline haya terminado de procesar la información, se parará.

Mientras que el Pipeline esté activo se puede monitorizar el flujo de datos en tiempo real así como verificar si está funcionando según lo esperado. Esto puede ir acompañado de alertas para avisar de un comportamiento imprevisto en el flujo de datos.

 

En la pantalla de Dataflow, situada en el apartado Analytics de la consola web, se puede ver la siguiente información:

listadoPipeline

 

En el listado de Pipelines se podrán ver todos los Pipelines creados a los cuales tenga acceso el usuario logado en cuestión.

 

En caso de que los Pipelines no tengan ninguna expresión cron definida, se mostrará el campo “Expresión CRON” de la tabla vacío, indicando que la ejecución de ese Pipeline no está disponible en modo batch.

 

La columna Activo informa sobre el estado del Pipeline. En el caso de la imagen, todos los Pipelines están parados salvo el HDFSToHDFS  que al ejecutarse ha dado error. En caso de que un Pipeline estñe activo se mostrará un icono en verde.

 

Los iconos disponibles en la última columna de la tabla describen las siguientes funcionalidades (partiendo desde la izquierda hacia la derecha):

  • Editar: Permite acceder a la configuración del Pipeline, pudiendo modificar los módulos y su configuración asociada.
  • Eliminar: Elimina el Pipeline.
  • Duplicar: Duplica los módulos y configuración del Pipeline, de modo que asignándole un nuevo nombre, descripción y expresión CRON quedará totalmente operativo y disponible en el listado.
  • Cambiar expresión CRON: Permite crear o modificar la expresión CRON del Pipeline en cuestión.
  • Ejecutar/Parar. Lanza el pipeline (explicado anteriormente) o lo para en caso de que el Pipeline se encuentre ejecutándose.
  • Ejecución simple. En caso de que el Pipeline esté parado, se podrá ejecutar el Pipeline en modo simple.

 

Tanto si se crea un Pipeline nuevo como si se duplica, aparecerá una ventana como la de a continuación, en la que se podrá insertar la identificación, descripción y expresión CRON del nuevo Pipeline, siendo estos dos  últimos parámetros opcionales:

crearpipeline

 

Al hacer click sobre Crear se podrá ver una ventana como la que se muestra a continuación:

pipelineinicial.png

 

De nuevo, se procede a enumerar las funcionalidades de la barra de herramientas situada a la derecha del nombre del Pipeline . Partiendo desde la derecha:

  • Reset Origin/Snapshot: Reinicia el origen del Pipeline.
  • Issues: Informa sobre errores en el diseño del Pipeline. Haciendo click sobre el listado emergente, se accederá directamente a la sección errónea.
  • Undo: Revierte los cambios realizados
  • Redo: Rehace los cambios realizados
  • Delete: Borra un módulo o un conector siempre y cuando el Pipeline se encuentre parado.
  • Duplicate: Duplica el módulo seleccionado.
  • Preview: Permite previsualizar el flujo de datos, de modo que permite comprobar la modificación de estos sin la necesidad de escribir en el destino, es decir, puede actuar de simulador.
  • Validate: Valida la configuración del Pipeline. Complementa los errores detectados por la herramienta Issues.
  • Run: Lanza el pipeline (modo ya explicado anteriormente)
  • Library: Herramienta que desplegará todos los módulos disponibles. Es posible repararlos entre origen, destino y procesador o directamente listar todos juntos.

 

Actualmente hay disponible los siguientes módulos Origen:

  • Excel: Obtiene datos desde un fichero Excel
  • Sofia2: Obtiene datos desde la plataforma de Sofia.
  • Amazon S3: Lee archivos de Amazon S3.
  • Directory: Lee archivos totalmente escritos desde un directorio.
  • File Tail: Lee líneas de datos en archivos en los que se está escribiendo
  • HTTP Client: Lee datos de un recurso URL HTTP.
  • JDBC Consumer: Lee datos de bases de datos a través de una conexión JDBC.
  • JMS Consumer: Lee los mensajes de JMS.
  • Kafka Consumer: Lee los mensajes de Kafka
  • Kinesis Consumer: Lee datos de Kinesis Stream
  • MapR Streams Consumer: Lee datos de MAPR Stream.
  • MongoDB: Lee documentos de MongoDB.
  • Omniture: Lee los informes de uso de Internet de la API de informes de Omniture.
  • RabbitMQ Consumer: Lee los mensajes de RabbitMQ.
  • SDC RPC: Lee datos de un SDC RPC destino.
  • UDP Source: Lee los mensajes de uno o más puertos UDP
  • Hadoop FS: Lee datos desde el sistema de archivos distribuido Hadoop (HDFS).
  • Kafka Consumer: Lee los mensajes de Kafka. Utiliza la versión de clúster del origen.

 

En cuanto a los procesadores intermedios de información se cuenta con los siguientes módulos:

  • Base64 Field Decoder: Decodifica datos codificados mediante base64
  • Base64 Field Encoder: Codifica los datos usando base64
  • Expression Evaluator: Realiza cálculos sobre los datos. También puede añadir o modificar atributos del encabezado.
  • Field Converter: Convierte el tipo de datos de un campo
  • Field Hasher: Codifica datos sensibles
  • Field Masker: Permite enmascarar datos sensibles
  • Field Merger: Se fusionan datos en un mapa o lista.
  • Field Remover: Elimina campos desde un registro
  • Field Renamer: Cambia el nombre a campos de un registro
  • Field Splitter: Divide un campo String en diferentes campos de tipo String
  • Geo IP: Proporciona información sobre la ubicación geográfica sobre la base de una dirección IP.
  • Groovy Evaluator: Procesa registros basado en el código Groovy implementado
  • JavaScript Evaluator: Procesa registros basado en el código JavaScript implementado
  • JSON Parser: Parsea un Json embebido en un campo String
  • Jython Evaluator: Efectúa un procesamiento de registros basado en código Jython.
  • Log Parser: Parsea datos de log en un campo basado en un determinado format Log
  • Record Deduplicator: Elimina los registros duplicados.
  • Stream Selector: Direcciona los datos según una lógica asociada.
  • Value Replacer: Reemplaza los valores nulos o sustituye los valores con nulos. Además puede crear campos nuevos y darles un valor específico.
  • XML Parser: Analiza los datos XML en un campo de tipo String.

 

Por último, los módulos destino disponibles son:

  • Amazon S3: Escribe datos en Amazon S3.
  • Cassandra: Escribe datos en un clúster de Cassandra.
  • Elasticsearch: Escribe datos en un clúster Elasticsearch.
  • Flume: Escribe datos en una fuente Flume
  • Hadoop FS: Escribe datos en el sistema de archivos distribuido Hadoop (HDFS).
  • HBase: Escribe datos en un clúster de HBase.
  • Hive Streaming: Escribe los datos en Hive.
  • InfluxDB: Escribe datos en InfluxDB
  • JDBC Producer: Escribe datos en JDBC.
  • Kafka Producer: Escribe los datos en un clúster Kafka.
  • Kinesis Firehose: Escribe datos en un flujo de suministro  Kinesis Firehose
  • Kinesis Producer: Escribe datos en un Stream Kinesis
  • Kudu: Escribe datos en Kudu
  • Local FS: Escribe datos en un sistema de archivos local.
  • MongoDB: Escribe datos en MongoDB
  • MapR FS: Escribe datos en MapR FS
  • MapR Streams Producer: Escribe datos en MapR
  • SDC RPC: Pasa los datos a un origen SDC RPC
  • Solr: Escribe los datos en un nodo o cluster Solr
  • To Error: Pasa los registros entrantes al destino configurado en el pipeline para el tratamiento de errores.
  • Trash: Elimina los registros del pipeline

 

Adicionalmente, se quiere prestar especial atención a los módulos Sofia2, diseñados para volcar información en una ontología o para consumir información almacenada, según sea destino u origen.

 

Los módulos sofia2 poseen las siguientes características configurables:

  • Comunes a ambos módulos:

parametrosGeneralesSofiaPipeline

  • Parámetros adicionales disponibles en Sofia2 destino:sofiaDestinoPipeline.PNG
  • Parámetros adicionales disponibles en Sofia2 origen:sofiaOrigenPipeline.PNG

 

En el siguiente ejemplo se muestran una serie de módulos interconectados entre sí , siendo la ingesta de datos un sistema de ficheros contenidos en un directorio y diferentes volcados de la información según un criterio lógico.

image057

 

Todos los Pipeline tienen una serie de propiedades generales, tales como definición de constantes, definición del comportamiento en caso de error en el flujo de datos, etc…

 

A continuación se puede ver un ejemplo de un Pipeline en el que se produce la ingesta de datos por medio de un conjunto de archivos contenidos en un fichero, varios módulos de procesamiento de la información dispuestos según un criterio lógico y un volcado final de toda la información a Hadoop:pipelineEjemplo.png

 

Cada módulo se ha configurado según unas necesidades específicas. Para este caso, en la ruta en la que el módulo TMP va a buscar archivos para la ingesta de datos es /etc/local, en la que se encuentra un archivo como el que está disponible en el siguiente enlace:

 

https://www.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv

 

El módulo Stream Selector 1 se ha configurado de tal forma que si la línea entrante tiene el campo “payment_type” igual a “CRD” (${record:value(‘/payment_type’) == ‘CRD’}), se redirija la información hacia el camino señalado como 1. En caso contrario, los datos irán por el flujo 2.

 

En el camino 1 se encuentras los módulos de procesamiento “Jython Evalulator 1” y “Field Masker  1” que se encargan respectivamente de analizar la secuencia de la tarjeta de crédito para identificar si se trata de Visa, MasterCard ,etc… y de enmascarar los dígitos de la tarjeta de crédito en cuestión.

 

La configuración de “Jython Evaluator 1” es:

for record in records:
 try:
  cc = record.value['credit_card']
  if cc == '':
    error.write(record, "Payment type was CRD, but credit card was null")
  continue
  cc_type = ''
  if cc.startswith('4'):
    cc_type = 'Visa'
  elif cc.startswith(('51','52','53','54','55')):
    cc_type = 'MasterCard'
  elif cc.startswith(('34','37')):
    cc_type = 'AMEX'
  elif cc.startswith(('300','301','302','303','304','305','36','38')):
    cc_type = 'Diners Club'
  elif cc.startswith(('6011','65')):
    cc_type = 'Discover'
  elif cc.startswith(('2131','1800','35')):
    cc_type = 'JCB'
  else:
    cc_type = 'Other'
record.value['credit_card_type'] = cc_type
output.write(record)
except Exception as e:
  error.write(record, str(e))

 

En cuanto a la configuración del módulo “Field Masker 1”:

paramFieldmasker

 

En el camino 2 se encuentra el módulo “Expression Evaluator  1” en el que se añade un nuevo campo llamado “credit_card_type” con el valor “n/a”:

paramExpresionEvaluator.png

 

Finalmente, ambos flujos de datos se almacenan en Hadoop.

 

Antes de ejecutar el Pipeline, es recomendable validar que los datos de configuración suministrados son coherentes por medio del botón Validate explicado anteriormente.

 

Una vez validado el Pipeline y como medida preventiva para evitar posibles errores en el diseño o configuración, se recomienda previsualizar el flujo de datos, ya que permite comprobar si están siendo tratados de la manera que se pretende. Al hacer click sobre Preview, aparecerá una ventana emergente como la mostrada a continuación:

previewPipeline

 

Como se puede observar, esta herramienta ofrece la posibilidad de visualizar los datos sin que estos se almacenen en el destino correspondiente, además de poder configurar si se desea comprobar el tipo de cada campo o las cabeceras de cada mensaje producido. En esta ocasión, se ha escogido una previsualización en la que no se escriba en los módulos destinos y que muestre el tipo de los diferentes campos.

 

Una vez lanzada la previsualización se podrán comprobar las distintas grabaciones entrantes y salientes de cada módulo, siguiendo siempre que:

  • El origen únicamente tendrá datos salientes.
  • Los procesadores intermedios tendrán datos entrantes a tratar y datos salientes tras la conversión.
  • Los destinos tendrán datos entrantes, pero nunca salientes.

 

En el caso de nuestro ejemplo, se puede observar el siguiente flujo de datos:

datosPipeline1.png

 

Seleccionando únicamente el módulo Jython Evaluator 1:

datosPipeline

Una vez que mediante la previsualización hemos comprobado el correcto funcionamiento del Pipeline, procedemos a ejecutarlo según el modo en que se desee (ejecución simple, ejecución en modo Batch o ejecución en modo Streaming). El botón de ejecución de nuestra pantalla de ejemplo se corresponde a la ejecución en tipo Streaming, que da como resultado las siguientes gráficas estadísticas:

estadisticas1pipeline.png

estadisticas2pipeline.png

 

A continuación se presenta un videotutorial acerca de la creación, edición, duplicación y ejecución de un pipeline sencillo.

Nueva implementación Sofia2 DataFlow

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s