Grupos de procesos de enlace NiFi
Los grupos de procesos de enlace son grupos de procesos NiFi del programa de utilidad que realizan tareas en los canales de NiFi de HCL Commerce.
Los siguientes grupos de procesos de enlaces le permiten controlar el flujo del canal, realizar o iniciar la bifurcación o la división procesos de flujo de datos o realizar otras funciones de mantenimiento del canal. Si necesita modificar un enlace, puede personalizar las plantillas del procesador de enlaces a través de un descriptor del conector mediante el servicio Ingest. Para obtener más información, consulte Crear un conector de servicio NiFi.
Enlace de alias
El canal de enlace de alias crea un alias para el índice dado realizando las siguientes tareas:
- Recupere una lista de alias existentes de Elasticsearch.
- Busque el nombre de índice interno del índice especificado mediante la variable index.alias.name.
- El formato del nombre del índice interno es: . “environment name” . “store id” . “index name” . “time id.
- Localice el nombre del alias si está disponible para este índice.
- Vuelva a crear una alias para este índice utilizando el patrón: environment name” . “store id” . “index name.
- Elimine el índice más antiguo, si existe.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione la siguiente carga con el descriptor de conector:
- Utilice environment.alias.name para definir un nombre de entorno del índice de búsqueda que se intercambiará con un alias
- Utilice index.alias.name para definir el nombre del índice de búsqueda que se intercambiará con un alias
- Utilice keep.backup para conservar la copia del índice intercambiado más recientemente. Este índice intercambiado recientemente se puede conservar con fines de recuperación. El sistema solo puede conservar una copia adicional como máximo y elimina todas las copias antiguas para conservar el espacio de almacenamiento.
{
"// Activate Catalog index with an environment alias": {
"environment.alias.name": "environment name of the search index to be swapped with an alias",
"index.alias.name": "name of the search index to be swapped with an alias"
},
"name": "AliasLink",
"label": "AliasLink - Catalog",
"properties": [
{
"name": "environment.alias.name",
"value": "auth",
"scope": {
"name": "Alias Index",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.alias.name",
"value": "catalog",
"scope": {
"name": "Alias Index",
"type": "PROCESS_GROUP"
}
}
]
}
Enlace de memoria caché
Este canal de enlace de memoria caché borra toda la caché interna especificada de los servicios del controlador:
- El servicio de controlador de tienda almacena en caché los idiomas admitidos, los valores predeterminados del nivel de la tienda, la ruta de la tienda relacionada.
- El servicio del controlador de la configuración almacena en memoria caché la configuración de componentes de búsqueda.
- El servicio de controlador de catálogo almacena en caché la jerarquía del catálogo y las rutas de navegación para la tienda en cuestión.
- El servicio de controlador de coincidencia almacena en caché las configuraciones de las búsquedas de coincidencias de colores y dimensiones y unidades de medida.
- El servicio de controlador NLP almacena en caché las frases lematizadas utilizadas para el procesamiento de lenguaje natural (NPL).
- El servicio de controlador de perfil almacena en memoria caché la configuración del perfil de Ingest para la personalización.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione el siguiente ul cuando se utilice con el descriptor de conector:
- Utilice Clear para borrar todo el contenido de la memoria caché del servicio de controlador correspondiente
- Utilice Refresh para borrar y volver a crear el contenido de la memoria caché del servicio de controlador correspondiente
- Utilice Skip para mantener el contenido actual de la memoria caché del servicio de controlador correspondiente
Enlace de clonación
El canal de enlace de clonación hace una copia idéntica del índice dado.
- Busca el nombre de índice interno del índice especificado mediante las variables index.source.name y index.target.name.
- Llama al punto final Elasticsearch /_clone para realizar una operación de clonación de índice.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione la siguiente carga con el descriptor de conector:
- Utilice environment.source.name para definir el nombre del entorno del índice fuente que se va a clonar.
- Utilice index.source.name como nombre del índice fuente que se va a clonar.
- Utilice environment.target.name para definir el nombre del entorno del índice de destino.
- Utilice index.target.name como nombre del índice de destino.
- Utilice wait.timeout para especificar el tiempo máximo de espera hasta que finalice la operación de clonación
Continuar enlace
El canal Continuar enlace se utiliza junto con el control de flujo WaitLink
como marcador de fin de flujo. Direcciona el flujo al servicio de terminal, que a su vez envía una señal al procesador de control de flujo para liberar el siguiente flujo. Cuando no queda más flujo, el flujo actual continúa hasta la etapa siguiente.
No es necesaria ninguna propiedad para este conducto.
Copiar enlace
El conducto Copiar enlace es un flujo de datos controlador que copia un grupo de documentos indexados de un índice a otro. Esta operación de copia se realiza de forma "inteligente" para que solo se copie en los campos del índice de destino que tengan valores diferentes. Aunque se haya actualizado el campo de índice de origen, si el valor de campo en el índice de destino es el mismo, no se realizará ninguna operación. Después de cada actualización en el índice de destino, se puede emitir una invalidación de memoria caché condicional basándose en un patrón personalizado y en un nombre de memoria caché proporcionado.
Uso
- Utilice Replacement Value en un procesador de generación de consultas de SCROLL Elasticsearch para definir la consulta de Elasticsearch.
- Utilice Entity Identifier en el documento de origen de extracción para definir la clave _id del documento de índice de destino; este identificador se puede expresar utilizando variables de registro y archivo de flujo, así como utilizando el nombre de campo de índice en la respuesta de búsqueda entre corchetes [ ].
- Utilice cache.channel.name para definir el nombre de la memoria caché local que se utilizará para almacenar temporalmente los datos extraídos.
- Utilice la variable environment.source.name para definir el nombre del entorno del que procede el índice de origen.
- Utilice la variable environment.target.name para definir el nombre del entorno en el que copiar el índice.
- Utilice la variable index.source.name para definir el nombre del índice de origen que se va a copiar.
- Utilice la variable index.target.name para definir el nombre del índice de destino que se va a copiar.
- Utilice las variables scroll.bucket.size y scroll.page.size para controlar el tamaño del lote al leer y grabar en el índice de búsqueda. Utilice el mismo valor para ambas variables.
- Utilice scroll.duration para definir la cantidad de tiempo que el motor de búsqueda debe utilizar para retener los datos de resultados de búsqueda para el desplazamiento.
- Utilice Cache Invalidation Strategy en
Update Target Document
para definir cuándo se debe emitir la invalidación. - Utilice Cache Invalidation Template en
Update Target Document
para definir el identificador de dependencia que se utilizará para la invalidación. Este identificador se puede expresar utilizando variables de registro y archivo de flujo, así como utilizando el nombre de campo de índice en la respuesta de búsqueda entre corchetes[ ]
. - Utilice Cache Name para definir el nombre de la memoria caché a la que se enviarán las invalidaciones.
- Utilice Field Name para definir un patrón para que coincida con el nombre de campo que se utilizará para aplicar la estrategia de invalidación.
Limitación
Esta función de copia solo funciona con campos de valor únicos.
{
"// Copying all inventory counts from Inventory index back to Product index": {
"environment.source.name": "the name of the environment where the source index comes from",
"environment.target.name": "the name of the environment to copy the index to",
"cache.channel.name": "the name of the local cache to be used to temporarily store the extracted data",
"index.source.name": "the name of the source index for copying",
"index.target.name": "the name of the target index for copying",
"scroll.page.size": "search query scroll page size",
"scroll.bucket.size": "size of the batch response to be received within each scroll page",
"scroll.duration": "the time period to retain the search context for scrolling"
},
"name": "CopyLink",
"label": "CopyLink - Inventory (Copy To Product)",
"properties": [
{
"name": "connector.stage.name",
"value": "Inventory Stage 2, Copy Inventories",
"scope": {
"name": "Prepare Copy",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.channel.name",
"value": "services/cache/nifi/Inventory",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.template",
"value": "storeId:productId:[id.store]:[id.catentry]",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.channel",
"value": "baseCache",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.field",
"value": "inventories.*.quantity",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "environment.source.name",
"value": "live",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "environment.target.name",
"value": "live",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.source.name",
"value": "live.inventory",
"scope": {
"name": "Prepare Copy",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.page.size",
"value": "6000",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.bucket.size",
"value": "2000",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.duration",
"value": "30s",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "properties.Route on default language",
"value": "allow",
"scope": {
"name": "Extract Source Document.Route On Language",
"type": "PROCESSOR"
}
},
{
"name": "properties.Route on other supported languages",
"value": "allow",
"scope": {
"name": "Extract Source Document.Route On Language",
"type": "PROCESSOR"
}
},
{
"name": "properties.Entry Identifier",
"value": "${param.storeId}-${param.langId}-[id.catalog]-[id.catentry]",
"scope": {
"name": "Extract Source Document.Extract Source Document",
"type": "PROCESSOR"
}
},
{
"name": "properties.Replacement Value",
"value": "{\"stored_fields\":[\"id.*\",\"inventories.*\"],\"size\":${es.pageSize},\"_source\":false,\"query\":{\"bool\":{\"must\":{\"match_all\":{}},\"filter\":[{\"term\":{\"id.store\":\"${param.storeId}\"}},{\"term\":{\"id.catalog\":\"${param.catalogId}\"}} ${extCatentryES} ${extDataloadES} ]}}}",
"scope": {
"name": "SCROLL Elasticsearch.Generate query",
"type": "PROCESSOR"
}
}
]
}
Enlace de actualización
El canal de enlace de actualización renueva explícitamente el buscador de índices especificado para recoger las últimas actualizaciones.
- Busca el nombre de índice interno del índice especificado mediante la variable index.refresh.name.
- Determina el formato del nombre del índice que se debe utilizar en función de si el índice actual es Store o se utiliza para el procesamiento en tiempo casi real (NRT):
- Formato del índice de la tienda:
“environment name” . “index name”
. - Formato para los usos de NRT:
“environment name” . “store id” . “index name”
. - Formato para la reindexación completa:
. “environment name” . “store id” . “index name” . “time id”
.
- Formato del índice de la tienda:
- Llamar a Elasticsearch para realizar una operación de actualización del índice.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione el siguiente ul cuando se utilice con el descriptor de conector:
- Utilice index.refresh.name para definir el nombre del índice de búsqueda que debe actualizarse
{
"// Explicitly refresh the Description index searcher to pick up the latest updates": {
"index.refresh.name": "name of the search index to be refreshed"
},
"name": "RefreshLink",
"label": "RefreshLink - Description",
"properties": [
{
"name": "index.refresh.name",
"value": "description",
"scope": {
"name": "Refresh Index",
"type": "PROCESS_GROUP"
}
}
]
}
Enlace de bloqueo
El canal de enlace de bloqueo cambia el estado de bloqueo de un índice determinado. Cuando se bloquea un índice, no se permite ninguna operación de escritura.
- Busca el nombre de índice interno del índice especificado mediante las variables environment.lock.name y index.lock.name.
- Llama al punto final Elasticsearch /_settings para realizar la operación de bloqueo o desbloqueo del índice
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione el siguiente ul cuando se utilice con el descriptor de conector:
- Utilice environment.lock.name como nombre del entorno del índice de búsqueda que se va a bloquear
- Utilice index.lock.name para definir el nombre del índice de búsqueda que se va a bloquear
- Utilice enable.lock para bloquear todas las operaciones de escritura en el índice cuando se establece en false
Enlace de fusión
El canal de enlace de fusión realiza una operación de fusión de fragmentos de índice explícita contra el índice dado:
- Busca el nombre de índice interno del índice especificado mediante la variable index.merge.name.
- Llama al punto final Elasticsearch /_forcemerge para realizar la operación de fusión de índices de forma síncrona.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione el siguiente ul cuando se utilice con el descriptor de conector:
- Utilice index.merge.name para definir el nombre del índice de búsqueda que se va a fusionar
- Utilice index.merge.segment para definir el número de segmentos a fusionar; para fusionar completamente los índices, ajústelo a 1
Enlace de réplica
El canal de enlace de réplica habilita de nuevo las réplicas de índices especificadas y el intervalo de actualización automática de los índices:
- Busca el nombre de índice interno del índice especificado mediante la variable index.refresh.name.
- Llama a Elasticsearch para actualizar los valores de índice utilizando las réplicas recién definidas y el intervalo de actualización.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione la siguiente carga cuando se utilice con el descriptor de conector:
- Utilice index.refresh.name para definir el nombre del índice de búsqueda que se va a replicar
- Utilice index.refresh.interval para definir la cantidad de tiempo de espera entre cada actualización automática del índice
- Utilice index.replicas para definir el número de réplicas de índice deseadas que se utilizarán para el índice de búsqueda dado
{
"// Enabling back Description index replicas and automatic index refresh interval": {
"index.refresh.name": "name of the search index affected",
"index.refresh.interval": "amount of wait time between each index refresh",
"index.replicas": "number of desired index replicas to be used"
},
"name": "ReplicateLink",
"label": "ReplicateLink - Description",
"properties": [
{
"name": "index.refresh.name",
"value": "description",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.refresh.interval",
"value": "1s",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.replicas",
"value": "0",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
}
]
}
Enlace de reinicio
El canal de enlace de reinicio debe ser la primera sección de canal de cada conector. Simplemente reinicia todos los contadores NiFi relacionados para la tienda actual. No es necesaria ninguna propiedad para este conducto.
Enlace de guardado
El canal de enlace de guardado debe ser la penúltima sección de canal de cada conector. Simplemente guarda todos los contadores de progreso relacionados dentro de Nifi para esta operación de indexación actual en Elasticsearch. No es necesaria ninguna propiedad para este conducto.
Terminal
El canal terminal debe ser el la última sección de canal de cada Conector. Simplemente dirige el flujo al Servicio de Terminal para desencadenar un informe de resumen que se generará para la operación de ejecución actual. No es necesaria ninguna propiedad para este conducto.
Enlace de tienda
El canal de enlace de la tienda divide la solicitud única de indexación a nivel de la tienda en varias solicitudes individuales, en función del número de catálogos e idiomas admitidos en la tienda en cuestión. Este canal debe utilizarse al principio de un conector para inicializar el lenguaje y los flujos específicos del catálogo para cada tienda.
Además de dividir el flujo de entrada por catálogo por idioma del almacén dado, este canal también inicializa los siguientes atributos del archivo de flujo para cada uno de sus flujos de salida generados:
- master.catalog es el ID de catálogo maestro para la tienda actual.
- default.catalog es el ID de catálogo predeterminado para la tienda actual.
- default.language es el ID de idioma predeterminado para la tienda actual.
- default.contract es el ID de contrato predeterminado para la tienda actual.
- default.currency es la divisa predeterminada para la tienda actual.
- param.storeId es el ID de tienda actual.
- param.catalogId es el ID de catálogo utilizado para la tienda actual.
- param.langId es el ID de idioma utilizado para la tienda actual.
- flow.lf define si el flujo actual utiliza o no reserva de idioma.
Enlace NRT
El canal de enlace NRT divide una única solicitud de indexación en tiempo casi real (NRT) en varias solicitudes individuales de tiendas relacionadas, en función del número de catálogos e idiomas admitidos en todas sus tiendas relacionadas. Este canal debe utilizarse al principio de un conector NRT para inicializar los flujos específicos del lenguaje y del catálogo para cada tienda relacionada.
Además de dividir el flujo entrante por catálogo por idioma de cada tienda relacionada, este conducto también inicializa los siguientes atributos de archivo de flujo para cada uno de los flujos de salida generados:
- master.catalog es el ID de catálogo maestro para la tienda actual.
- default.catalog es el ID de catálogo predeterminado para la tienda actual.
- default.language es el ID de idioma predeterminado para la tienda actual.
- default.contract es el ID de contrato predeterminado para la tienda actual.
- default.currency es la divisa predeterminada para la tienda actual.
- param.storeId es el ID de tienda actual.
- param.catalogId es el ID de catálogo utilizado para la tienda actual.
- param.langId es el ID de idioma utilizado para la tienda actual.
- flow.lf define si el flujo actual utiliza o no reserva de idioma.
Dependiendo del tipo de canalización que se utilice con este conector NRT, se establecerán diferentes conjuntos de variables como atributos adicionales del archivo de flujo para la composición NRT SQL posterior. Consulte la carga asociada del siguiente procesador UpdateAttribute para obtener más información:
- Canalización de carga de datos: Procesador de NiFi Prepare Dataload Join SQL
extDataloadES
- TI_DELTA_CG_FACET_JOIN_QUERY
- TI_DELTA_CG_JOIN_QUERY
- TI_DELTA_CG_URL_1C_JOIN_QUERY
- TI_DELTA_CG_URL_JOIN_QUERY
- TI_DELTA_JOIN_QUERY
- TI_DELTA_JOIN_QUERY_ATTCH
- TI_DELTA_JOIN_QUERY_ATTR
- TI_DELTA_JOIN_QUERY_BUNDLE
- TI_DELTA_JOIN_QUERY_DYNKIT
- TI_DELTA_JOIN_QUERY_FIND_PARENT
- TI_DELTA_JOIN_QUERY_MASSOCCECE
- TI_DELTA_JOIN_QUERY_ROLLUP_ATTR
- Canalización de atributos: Procesador de NiFi Prepare Attribute NRT SQL extension
- extAttributeAnd
- Canalización de productos: Procesador de NiFi Prepare Product NRT SQL extension
extCatentryAndSQL
extCatentryAndSQL1a
extCatentryAndSQL1b
extCatentryES
extCatentryIdFromAndSQL
extCatentryIdWhereParentOrChild
extCatentryParentIdWhereSQL
extCatentryURLES
extCatentryWhereSQL
extDataCatentryId
extDataCatentryIdParent
- Canalización de categorías: Procesador de NiFi Prepare Category NRT SQL extension
extCatgroupAndSQL
extCatgroupAndSQL1a
extCatgroupAndSQL1b
extCatgroupES
extCatgroupURLES
Enlace de carga de datos
El canal de enlace de carga de datos divide una sola solicitud de indexación de carga de datos en varias solicitudes individuales de tiendas relacionadas, según el número de catálogos e idiomas que admitan todas sus tiendas relacionadas. Este canal debe utilizarse al principio de un conector de carga de datos para inicializar los flujos específicos del lenguaje y del catálogo para cada tienda relacionada.
Además de dividir el flujo entrante por catálogo por idioma de cada tienda relacionada, este conducto también inicializa los siguientes atributos de archivo de flujo para cada uno de los flujos de salida generados:
- master.catalog es el ID de catálogo maestro para la tienda actual.
- default.catalog es el ID de catálogo predeterminado para la tienda actual.
- default.language es el ID de idioma predeterminado para la tienda actual.
- default.contract es el ID de contrato predeterminado para la tienda actual.
- default.currency es la divisa predeterminada para la tienda actual.
- param.storeId es el ID de tienda actual.
- param.catalogId es el ID de catálogo utilizado para la tienda actual.
- param.langId es el ID de idioma utilizado para la tienda actual.
- flow.lf define si el flujo actual utiliza o no reserva de idioma.
Dependiendo del tipo de canalización que se utilice con este conector de carga de datos, se establecerán diferentes conjuntos de variables como atributos adicionales del archivo de flujo para la composición de carga de datos SQL posterior. Consulte los siguientes procesadores UpdateAttribute para obtener más información:
extDataloadES
- TI_DELTA_CG_FACET_JOIN_QUERY
- TI_DELTA_CG_JOIN_QUERY
- TI_DELTA_CG_URL_1C_JOIN_QUERY
- TI_DELTA_CG_URL_JOIN_QUERY
- TI_DELTA_JOIN_QUERY
- TI_DELTA_JOIN_QUERY_ATTCH
- TI_DELTA_JOIN_QUERY_ATTR
- TI_DELTA_JOIN_QUERY_BUNDLE
- TI_DELTA_JOIN_QUERY_DYNKIT
- TI_DELTA_JOIN_QUERY_FIND_PARENT
- TI_DELTA_JOIN_QUERY_MASSOCCECE
- TI_DELTA_JOIN_QUERY_ROLLUP_ATTR
Enlace de reindexación
El canal de enlace de reindexación prepara todos los atributos del archivo de flujo necesarios para realizar una solicitud de reindexación completa para el almacén dado. Este canal debe utilizarse al principio de un conector de reindexación completo.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione la siguiente carga cuando se utilice con el descriptor de conector:
- Utilice flow.language.fallback para definir si se debe realizar o no la operación de reserva de idioma durante el tiempo de indexación
Enlace de división
Este canal de enlace de división se puede utilizar para bifurcar el flujo de datos actual para lanzar un nuevo flujo asíncrono contra un conector separado. Se generará un informe de resumen independiente inmediatamente después de la finalización del flujo de datos en este conector.
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, se recomienda personalizar esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Ampliación de los conectores Ingest.
Utilice la variable split.connector.name para definir el nombre del conector para remitir el flujo de datos actual
{
"// Launch the post-index connector to perform indexing as a background task": {
"split.connector.name": "the name of the Connector to send the event to"
},
"name": "SplitLink",
"label": "SplitLink - Post-Index",
"properties": [
{
"name": "split.connector.name",
"value": "auth.postindex",
"scope": {
"name": "Launch Connector",
"type": "PROCESS_GROUP"
}
}
]
}
Enlace de espera
El canal de enlace de espera es un responsable del flujo de datos que bloquea un flujo de datos NiFi hasta que se cumpla una determinada condición:
- Llegando al final de cierta canalización dentro de un conector. Cuando este canal está configurado como un controlador de flujo de datos a nivel de conector, solo limita hasta un número definido de flujos simultáneos que se ejecutan dentro del mismo conector.
- Finalización del procesamiento de todos los archivos de flujo de NiFi de un canal determinado y sus solicitudes de indexación masiva asociadas enviadas a Elasticsearch. De forma alternativa, si este canal se configura como un oyente de eventos que solo espera un determinado tipo de eventos entrantes, en función de cuál de los siguientes "Método de espera" se haya elegido:
- Método de evento: escucha el canal Redis definido en la propiedad Nombre de canal del Subscribe Redis de NiFi. Utilice la
Wait Strategy
para controlar el ámbito de bloqueo y elWait Link
se desbloqueará solo cuando se reciba una clave de mensaje que coincida con este canal de Redis:- Bloqueo a nivel de conector
- Para utilizar como control de flujo. La clave de mensaje tiene este formato:
Su contenido de mensaje JSON tiene estas propiedades:<connector.name> - <stage.name>
catálogo, conector, entorno, flujo, idioma, ejecutar, estado, almacenar, esperar
- Bloqueo a nivel de tienda
- Para utilizar como control de flujo. La clave de mensaje tiene este formato:
Su contenido de mensaje JSON tiene estas propiedades:<connector.name> - <stage.name> - <store.id>
catálogo, conector, entorno, flujo, idioma, ejecutar, estado, almacenar, esperar
- Bloqueo a nivel de ejecución
- Para utilizarse como control de flujo y la clave de mensaje tiene este formato:
Su contenido de mensaje JSON tiene estas propiedades:<connector.name> - <stage.name> - <run.id>
esperar, entorno, ejecutar, almacenar, idioma, catálogo, conector, flujo, estado
- Bloqueo a nivel de etapa
- Para utilizar con
WaitLink
. La clave de mensaje tiene este formato:
Su contenido de mensaje JSON tiene estas propiedades:<connector.name> - <stage.name> - <store.id> - <language.id> - <catalog.id> - <run.id>
esperar, entorno, ejecutar, almacenar, idioma, catálogo, conector, flujo, estado
Para ilustrar un ejemplo de
WaitLink
en laProduct Stage 1a
del conector auth.reindex, supongamos que hay un flujo de datos con 11 store.id, -1 language.id y 10001 catalog.id para ejecutar9d310aba-756b-4690-adae-856528d70f10
. Cuando se establece unWaitLink
para bloquear en esta etapa de producto 1a, el flujo de datos solo se liberará cuando se reciba un mensaje JSON con clave“auth.reindex-DatabaseProductStage1a-11--1-10001-9d310aba-756b-4690-adae-856528d70f10”
desde el canal Redis services/nifi/wait para el entorno de autoría. - Método de espera: pone en pausa el flujo de datos actual durante el tiempo especificado en la "Duración de la penalización".
- Método de exploración: supervisa el tamaño de la consulta de nivel superior del canal especificado para esperar hasta que el contador llegue a cero antes de que el flujo de datos se libere para su posterior procesamiento
- Método de evento: escucha el canal Redis definido en la propiedad Nombre de canal del Subscribe Redis de NiFi. Utilice la
Uso
Para permitir que el servicio Ingest pueda gestionar y actualizar esta plantilla, personalice esta plantilla a través de un descriptor de conector mediante un servicio Ingest. Para obtener más información, consulte Gestionar conectores en el servicio Ingestion.
Proporcione la siguiente carga cuando se utilice con el descriptor de conector:
Como controlador de flujo:
- Establezca connector.flow.control en true.
- Utilice connector.flow.limit para definir el número máximo de flujos paralelos que puede realizar el conector en cualquier momento.
- Utilice wait.strategy para controlar el nivel de bloqueo:
connector
,store
orun
. - Asegúrese de que la variable wait.flow.strategy esté establecida en uno de los valores siguientes, basándose en el nivel de bloqueo establecido anteriormente:
"Connector" = "connector.id" "Store" = "store.id" "Run" = "run.id"
Como oyente de eventos:
- Opcionalmente se establece connector.flow.control en false
- Utilice connector.stage.name para definir el nombre del grupo de procesos NiFi a supervisar
- Utilice wait.connector.name para definir el nombre de otro conector a supervisar (opcional)
- Utilice wait.error.strategy para definir si debe continuar o detenerse inmediatamente cuando se produzca un error
- Utilice wait.error.limit para definir el nivel de tolerancia de errores antes de aplicar la estrategia de error
- Utilice wait.method para definir si se utiliza un desencadenante basado en eventos o para explorar el tamaño de la cola
WaitLink
.{
"// Wait for completion of all Product documents before proceeding to the next stage": {
"connector.stage.name": "name of the process group to monitor",
"wait.error.limit": "define the error limit to stop or empty for no limit",
"wait.method": "define whether to use event based trigger or to scan queue size"
},
"name": "WaitLink",
"label": "WaitLink - Product Stage 1a",
"properties": [
{
"name": "connector.stage.name",
"value": "DatabaseProductStage1a",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
},
{
"name": "wait.error.limit",
"value": "0",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
},
{
"name": "wait.method",
"value": "event",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
}
]
}