Ingest Synonym index pipeline

Synonym index field mapping from data specification

The following diagram illustrates the Synonym indexing pipeline implemented in Apache NiFi. The flow consists of mainly three stages:
  1. Generate Synonym dictionary document for Elasticsearch based on the input synonym per language.
  2. (IF POST) Extracting current synonyms in the product index dictionary, and adding them to the generated document from stage one.
  3. Update Product's language specific dictionaries with the synonyms document generated from Stage one and Stage two.
Initial
PUT or POST REST Call: http://<Hostname>:30700/connectors/JsonSynonym/data
{​​​
    ​"synonyms": {
        ​ "english": {
            "synonyms": [
                "coff => coffee",
                "driveway, road, street"​
            ]
        ​},
        "french": {
            "synonyms": [
                ​"coff => coffee",
                "driveway, road, street"​
            ]
        ​​}
    }
​}​​​
1. Generate Synonym Dictionary Document​t
The following dataflow describes how the language specific Synonym data can be transformed using the ​CreateSynonymBodyPart1​ Groovy script.​
Output​:
{
    ​"analysis" : {
        "filter" : {
            "custom_english_synonyms_dictionary" : {
                ​"synonyms" : [ "coff => coffee", "driveway, road, street" ],
                "type" : "synonym"
            },
            "custom_french_synonyms_dictionary" : {
                ​"synonyms" : [ "coff => coffee", "driveway, road, street" ],
                "type" : "synonym"
            ​}
        ​}
    }
}​
2. ​​​(IF POST) Extract current synonyms in the product index dictionary, and add them to the generated document
The following dataflow decribes what happens when the user makes a POST* request:
  1. A GET call is made to get the current Synonym Dictionaries per language from the product index.
  2. The language specific Synonym data from Step 1 will be transformed using the ​CreateSynonymBodyPart2​ Groovy script, to merge the data with the document generated from Stage 1.
*Else the user will make a PUT request, which will not add the current Language Specified Synonym Dictionaries in the index to the document from Stage 1.
Step 2 Output​:
​{
    ​"analysis" : {
        "filter" : {
            "custom_english_synonyms_dictionary" : {
                ​"synonyms" : [ "coff => coffee", "driveway, road, street" ],
                "type" : "synonym"
            },
            "custom_french_synonyms_dictionary" : {
                ​"synonyms" : [ "coff => coffee", "ibm => hcl", "driveway, road, street", "musab => musab mobashir" ],
                "type" : "synonym"
            ​},
            "custom_german_synonyms_dictionary" : {
                ​"synonyms" : [ "ibm => hcl", "driveway, road, street", "musab => musab mobashir" ],
                "type" : "synonym"
            ​}
        ​}
    }
}​
3. Update the Product's language specific dictionaries with the synonyms documen​t
The above dataflow decribes the process of updating (Overwriting) the Language Specific Dictionary with the previously generated documentation.
The process has the following steps:
  1. Close Product Index
  2. Update Product Index
  3. Open Product Index​​

Synonym index field mapping from database

Data specification:

​ The following diagram illustrates the Search Term Assosication (STA) indexing pipeline implemented in Apache NiFi. The flow consists of mainly two ​stages:
  1. Extract STAs from Database relative to StoreID (and related storeID) and Generate STA document for Elastic Search.
  2. Update Product's language specific dictionaries with the sta document generated from Stage One.
Stage 1: Extract STAs from Database relative to StoreID (and related storeID) and Generate STA document for Elastic Search
The following dataflow describes how the STA Database Data can be transformed using the ​CreateSTABody​​ Groovy script.​
SQL:
	SELECT S.SRCHTERMASSOC_ID,
		       LISTAGG(S.ASSOCIATIONTYPE, '###') WITHIN GROUP (ORDER BY T.TYPE) ASSOCIATIONTYPE,
		       LISTAGG(S.STATUS, '###') STATUS,
		       LISTAGG(L.LOCALENAME, '###') LOCALENAME,
		       LISTAGG(T.TYPE, '###') TYPE,
		       LISTAGG(T.TERM, '###') TERM
		  FROM LANGUAGE L, SRCHTERMASSOC S, SRCHTERM T, STORECAT C
		 WHERE NOT S.ASSOCIATIONTYPE = 4 AND S.STATUS = 1 AND S.SRCHTERMASSOC_ID=T.SRCHTERMASSOC_ID
		   AND L.LANGUAGE_ID=S.LANGUAGE_ID AND L.LANGUAGE_ID = ${param.langId}
		   AND S.STOREENT_ID IN
		       (SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId})
		   AND C.MASTERCATALOG = 1 AND C.CATALOG_ID = ${param.catalogId}
		   AND C.STOREENT_ID IN
		       (SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId})
	         GROUP BY S.SRCHTERMASSOC_ID
Input:
[
  {
    "SRCHTERMASSOC_ID": 3074457345616678000,
    "ASSOCIATIONTYPE": "1###1",
    "STATUS": "1###1",
    "LOCALENAME": "en_US           ###en_US           ",
    "TYPE": "1###1",
    "TERM": "laptop###thinkpad"
  }
]
Output:
{
  "analysis": {
    "filter": {
      "custom_en_US_sta": {
        "synonyms": [
          "laptop, thinkpad"
        ],
        "type": "synonym"
      }
    }
  }
}
Stage 2. Update Product's language specific dictionaries with the STA ​document generated from Stage One
The following dataflow decribes the proc
  1. Close Product Index
  2. Update Product Index
  3. Open Product Index​​
ess of updating (Overwriting) the Language Specific Dictionary with the previously generated documentation through the following steps: