Tutorial: Elasticsearch file ingestion

Ingest unstructured data for MS Word .docx, PDFs, Excel.xlsx, CSV files, .txt files, and HTML files, including media metadata.

About this task

This tutorial shows you how to attach the unstructured data files to your existing Elasticsearch container. Unstructured data may be ingested into MS Word .docx, PDFs, Excel.xlsx, CSV files, .txt files, and HTML files are all acceptable formats. You can also include the media metadata from the NiFi processes groups and associate it with an existing product index, or with a new unstructured index. You can also ingest data in a new schema.

Procedure

  1. In the Elasticsearch container, enable the support (plugin) attachment docker.elastic.co/elasticsearch/elasticsearch:7.x.0.
    1. From the terminal window execute the following command:
      docker exec -it -u 0 commerce_elasticsearch_1 bash
    2. From the container bash terminal, run the following command:
      elasticsearch-plugin install ingest-attachment
    3. Restart the elasticsearch docker:
      docker restart commerce_elasticsearch_1
    4. After adding the attachment command to the dockerfile, create a new Elasticsearch image from the base image.
  2. Create the commerce/search-nifi-app:9.1.x.0. directory in the NiFi container using the following comands:
    -docker exec -it -u 0 commerce_nifi_1 bash
    -mkdir /opt/nifi/extDocs/
    chown nifi:nifi /opt/nifi/extDocs
    To copy the files to the directory you wish to ingest into Elasticsearch, execute the following command on the command line:
    docker cp /home/ingestfiles/. commerce_nifi_1:/opt/nifi/extDocs/.
    In this example the following files will be used: SampleDocs-travel-laptop.docx and SampleDocs-office-laptop.ppt.
  3. Import the following connectors into the NiFi registry commerce/search-registry-app:9.1.x.0.
    1. docker cp custom-UnstructuredIndexSchemaUpdateConnector-attachment.json commerce_registry_1:/opt/nifi-registry/flows/.

      For more information see, custom-UnstructuredIndexSchemaUpdateConnector-attachment.json.

    2. docker cp custom-UnstructuredIndexSchemaUpdate.json commerce_registry_1:/opt/nifi-registry/flows/.

      For more information see, custom-UnstructuredIndexSchemaUpdate.json.

    3. docker cp custom-UnstructuredIndexDatabaseConnectorPipe-Attachment.json commerce_registry_1:/opt/nifi-registry/flows/.

      For more information see, custom-UnstructuredIndexDatabaseConnectorPipe-Attachment.json.

    Open the NiFi registry container and run the following command:
    docker exec -it -u 0 commerce_registry_1 bash
    Run the following commands from the registry terminal.
    1. /opt/nifi-registry/scripts/import_flow.sh custom-UnstructuredIndexSchemaUpdateConnector-attachment /opt/nifi-registry/flows/custom-UnstructuredIndexSchemaUpdateConnector-attachment.json
    2. /opt/nifi-registry/scripts/import_flow.sh custom-UnstructuredIndexSchemaUpdate /opt/nifi-registry/flows/custom-UnstructuredIndexSchemaUpdate.json
    3. /opt/nifi-registry/scripts/import_flow.sh custom-UnstructuredIndexDatabaseConnectorPipe-Attachment /opt/nifi-registry/flows/custom-UnstructuredIndexDatabaseConnectorPipe-Attachment.json
  4. Using the Postman, create a ingest connector. For more information, see Swagger UI.
    Open the Postman and enter the following details:
    • URL - http://localhost:30800/connectors
    • Method = POST
    • Body = the following json body:
      { 
          "name": "auth.unstructured", 
          "description": "This is the connector for the unstructured processing", 
          "pipes": [ 
              { 
                  "name": "custom-UnstructuredIndexSchemaUpdate" 
              }, 
              { 
                  "name": "custom-UnstructuredIndexSchemaUpdateConnector-attachment" 
              },
              { 
                  "name": "custom-UnstructuredIndexDatabaseConnectorPipe-Attachment", 
                  "properties": [ 
                      { 
                          "name": "Database Driver Location(s)", 
                          "value": "${AUTH_JDBC_DRIVER_LOCATION}", 
                          "scope": { 
                              "name": "Database Connection Pool", 
                              "type": "CONTROLLER_SERVICE" 
                          } 
                      }, 
                      { 
                          "name": "Database Connection URL", 
                          "value": "${AUTH_JDBC_URL}", 
                          "scope": { 
                              "name": "Database Connection Pool", 
                              "type": "CONTROLLER_SERVICE" 
                          } 
                      }, 
                      { 
                          "name": "Database User", 
                          "value": "${AUTH_JDBC_USER_NAME}", 
                          "scope": { 
                              "name": "Database Connection Pool", 
                              "type": "CONTROLLER_SERVICE" 
                          } 
                      }, 
                      { 
                          "name": "Password", 
                          "value": "${AUTH_JDBC_USER_PASSWORD}", 
                          "scope": { 
                              "name": "Database Connection Pool", 
                              "type": "CONTROLLER_SERVICE" 
                          } 
                      } 
                  ] 
              }, 
              { 
                  "name": "Terminal" 
              } 
          ] 
      }
      
  5. Add the process group to the NiFi UI and link the input/ output ports as shown in the image below. Four process group pipes will be available in NiFi after you run the connector:

    a) custom-UnstructuredIndexSchemaUpdate
    This process group is used by Elasticsearch to enable attachment settings. If the attachment setting is already enabled, it will be skipped. Change the index name in the schema name properties file to utilise an existing schema.

    b) custom-UnstructuredIndexSchemaUpdateConnector-attachment
    This process group is used by Elasticsearch to enable attachment settings. If the attachment settings are already accessible, it will be skipped. The following settings are used by default, but you can update them to match your requirements. In the Set unstructured attachment' processor, param.attach is accessible.

    The following json code is available in the Populate unstructured Index schema processor. Here you can add or update a keyword. This keyword will be used for ingesting and searching unstructured data.
    {
      "description" : "Extract attachment information",
      "processors" : [
        {
          "attachment" : {
            "field" : "data",
            "indexed_chars_field" : "max_size",
            "properties": [ "content", "title", "keywords", "content_type", "content_length" ]
          }
        }
      ]
    }
    
    c) custom-UnstructuredIndexDatabaseConnectorPipe-Attachment
    This process group is responsible for fetching the file locations from the database, reading file content from the database's specified directory, encoding it into the base64, and ingesting it into the Elasticsearch. Multiple processors are used by this process group to import files into the Elasticsearch.

    The Set Attribute processor is used to specify the parameter that will be used to process the file, which you may change as needed.

    d) Terminal
    This process group completes the process flow and is responsible for terminating the previous process's output.
  6. Run the following queries to set the file location in Catentry.field4 (this table is currently being used in the process group). You can modify the query and add multiple transactions as needed to set the file location for catalog entries.
    UPDATE catentry SET FIELD4 = '/opt/nifi/extDocs/SampleDocs-travel-laptop.docx' WHERE PARTNUMBER = 'CLA022_2203'
    
    UPDATE catentry SET FIELD4 = '/opt/nifi/extDocs/SampleDocs-office-laptop.ppt' WHERE PARTNUMBER = 'CLA022_2205'
    Note: If you want to set the file location in a different table, you will have to modify the following properties in the Set Attribute processor under custom-UnstructuredIndexDatabaseConnectorPipe-Attachment.
    Currently the queries below are being used. You can update or modify them as needed.
    SELECT COUNT(*) as count FROM CATENTRY CE, CATENTDESC CD 
    WHERE CE.CATENTRY_ID = CD.CATENTRY_ID  AND CD.LANGUAGE_ID =-1 AND CE.MARKFORDELETE =0 AND CE.BUYABLE =1 AND CD.PUBLISHED =1 AND ce.FIELD4 IS not NULL
    AND CE.CATENTRY_ID IN (SELECT C.CATENTRY_ID FROM CATGPENREL R, CATENTRY C
    	                           WHERE R.CATALOG_ID IN
    	                                 (SELECT CATALOG_ID FROM STORECAT WHERE STOREENT_ID IN
    	                                 (SELECT RELATEDSTORE_ID FROM STOREREL
    	                                   WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
    	                             AND R.CATENTRY_ID = C.CATENTRY_ID AND C.MARKFORDELETE = 0 AND C.CATENTTYPE_ID <> 'ItemBean')
    
    SELECT CE.CATENTRY_ID, CE.PARTNUMBER , CE.FIELD4, CD.NAME, CD.SHORTDESCRIPTION , CD.PUBLISHED FROM CATENTRY CE, CATENTDESC CD 
    WHERE CE.CATENTRY_ID = CD.CATENTRY_ID  AND CD.LANGUAGE_ID =-1 AND CE.MARKFORDELETE =0 AND CE.BUYABLE =1 AND CD.PUBLISHED =1 AND ce.FIELD4 IS not NULL
    AND CE.CATENTRY_ID IN (SELECT C.CATENTRY_ID FROM CATGPENREL R, CATENTRY C
    	                           WHERE R.CATALOG_ID IN
    	                                 (SELECT CATALOG_ID FROM STORECAT WHERE STOREENT_ID IN
    	                                 (SELECT RELATEDSTORE_ID FROM STOREREL
    	                                   WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
    	                             AND R.CATENTRY_ID = C.CATENTRY_ID AND C.MARKFORDELETE = 0 AND C.CATENTTYPE_ID <> 'ItemBean')
    
    Note: If you want to update the name of the schema where the file attachment will be ingested, modify the following properties in the Set Attribute processor under custom-UnstructuredIndexDatabaseConnectorPipe-Attachment.

  7. Connect the auth.unstructured - custom-UnstructuredIndexSchemaUpdate process group with the Routing Service process group with the INPUT set to auth.unstructured.
    Note: If the connection procedure is followed, this process should already be linked to the routing service. Ensure that the auth.unstructured route is working.
  8. Navigate to the following process group.
    • Select the Execute SQL processor.
    • Right click and select View Configuration.
    • Select the Arrow button at the right side of the following property.

    • Make sure the Database Connection pool service is enabled.

  9. Start each of the four process groups, then go inside each one of them and right-click on the NiFi flow and then select Enable Transmission..
    Note: It is possible that transmissions have already been activated.

  10. After starting the process group run the following URL from Postman.
    POST- https://localhost:5443/wcs/resources/admin/index/dataImport/build?connectorId=auth.unstructured&storeId=1
    
    To check the status:
    GET- https://localhost:5443/wcs/resources/admin/index/dataImport/status?jobStatusId=1036 
  11. You may now validate indexed unstructured data and pass the keyword you specified when setting up the attachment:
    POST - localhost:30200/.auth.1.unstructured/_search
    Body: With Content available in file.
    { "query": { "bool": { 
    
     "must": [
                    {
                        "query_string": {
                            "query": "lightweight"
                        }
                    }
                ]
    
    } } }
    
    Body – with SKU (partnumber).
    { "query": { "bool": { 
    
     "must": [
                    {
                        "query_string": {
                            "query": "CLA022_2205"
                        }
                    }
                ]
    
    } } }
    
    Body – with file extension.
    { "query": { "bool": { 
    
     "must": [
                    {
                        "query_string": {
                            "query": "docx"
                        }
                    }
                ]
    
    } } }