Ingest Catalog index pipeline

The complete data mappings from specification, database and schema are shown for the Catalog category.

Catalog index field mapping from database

The following sequence of steps ​illustrates the Catalog indexing pipeline implemented in Apache NiFi. The flow consists of mainly two stages:
  1. Creating a Catalog document
  2. Associating Catalog filters

Stage 1: Creating a Catalog document

This stage describes how to transform the Catalog data and load it into the Catalog index. It starts with running the following SQL to retrieve Catalog data from the Commerce database.
SELECT C.CATALOG_ID, C.IDENTIFIER, C.MEMBER_ID, D.NAME, D.SHORTDESCRIPTION, 
            COALESCE(D.LANGUAGE_ID, L.LANGUAGE_ID) LANGUAGE_ID, 
            L.LOCALENAME, T.MASTERCATALOG, F.STOREENT_ID
		  FROM LANGUAGE L, STORELANG F, STORECAT T, CATALOG C
          LEFT OUTER JOIN CATALOGDSC D ON(C.CATALOG_ID = D.CATALOG_ID AND D.LANGUAGE_ID = ${param.langId})
		 WHERE T.STOREENT_ID IN (SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId})
            AND T.CATALOG_ID = C.CATALOG_ID  AND C.CATALOG_ID = ${param.catalogId}
		    AND F.STOREENT_ID = ${param.storeId} AND F.LANGUAGE_ID = L.LANGUAGE_ID AND L.LANGUAGE_ID = ${param.langId}	

Next, the result set is passed to the CreateCatalogDocumentFromDatabase processor for transformation, using the following table to ​map the database field returned from the SQL above to an index field in the Catalog index.

​Index Field​ Name​ ​Index Field Type ​​Description
​​Document Identifier​​
​id/catalog id_string Internal id of the operational sales catalog; mapped to CATALOG.CATALOG_ID
​id/member ​id_string The internal reference number that identifies the owner of the catalog​; mapped to CATALOG.MEMBER_ID
​id/​language id_string​ ​​The language used for all language specific data in this document for the current store; mapped to CATALOGDSC.LANGUAGE_ID
​​identifier/specification id_string Set to "​catalog"
identifier/​catalog id_string A string that uniquely identifies the owning catalog; mapped to CATALOG​​.IDENTIFIER
identifier/​language id_string The language string of this supported language; mapped to CATALOGDSC​.LANGUAGE_ID and LANGUAGE.LANGUAGE
​​Language Sensitive Data​​​
​name/raw raw ​The language-dependent name of this catalog group; mapped to CATALOGDSC​​.NAME
​​name/normalized normalized ​Same as above
​​description/raw raw ​A short description of this catalog group​; mapped to CATALOGDSC.SHORTDESCRIPTION
Properties​​​
​type id_string "master" as the master catalog, or "sales" as the sales catalog; mapped to STORECAT.MASTERCATALOG
For example code, see Stage 1 samples.

Stage 2: Associating Catalog filters

This stage describes how to transform the catalog filter data and load it into the Catalog index. It starts with running the following SQL to retrieve Catalog data from the Commerce database:
SELECT F.STOREENT_ID, F.CATALOG_ID, F.USAGE, F.CATFILTER_ID, F.IDENTIFIER, X.EXPRESSION_ID, X.MEMBER_ID,
	       CAST(X.QUERY AS VARCHAR) QUERY, X.TRADING_ID
		  FROM EXPRESSION X, CATFILTER F
	WHERE X.CATFILTER_ID = F.CATFILTER_ID
	AND F.CATALOG_ID IN (SELECT C.CATALOG_ID FROM STORECAT C​ WHERE C.MASTERCATALOG = 1
		                           AND C.STOREENT_ID IN (SELECT RELATEDSTORE_ID FROM STOREREL
			                                              WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
        AND F.STOREENT_ID IN ( SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -32 AND STORE_ID = ${param.storeId} )
​    
Next, the result set is passed to the FindFiltersFromDatabase​ processor for transformation, using the following table to ​map the database field returned from the SQL above to an index field in the Catalog index:​
​Index Field​ Name​ ​Index Field Type ​​Description
Filters​
​filters/id/filter ​id_string ​The internal identifier of the catalog filter that applies to the current catalog
​filters/id/contract id_string ​The internal identifier of the contract that is associated with this catalog filter for the current catalog
​filters/id/member id_string ​The internal identifier of the owner that is associated with this catalog filter for the current catalog
​​filters/id/expression​ ​raw ​The internal identifier of the expression of this catalog filter
​filters/identifier ​​id_string ​The external identifier of this catalog filter
​filters/query raw ​The query string for representing this catalog filter
​filters/usage raw ​The usage of this catalog filter
For example code, see Stage 2 samples.

Stage 1 samples

The following code is an example of the input data for the CreateCatalogDocumentFromDatabase processor:

{
  "CATALOG_ID": 10001,
  "IDENTIFIER": "Extended Sites Catalog Asset Store",
  "MEMBER_ID": 7000000000000001000,
  "NAME": "Extended Sites Catalog Asset Store",
  "SHORTDESCRIPTION": null,
  "LANGUAGE_ID": -1,
  "LOCALENAME": "en_US           ",
  "MASTERCATALOG": "1",
  "STOREENT_ID": 1
}
The CreateCatalogDocumentFromDatabase processor transforms the input data into the following output data:

{ "update": { "_id": "1--1-10001", "_index": ".auth.1.catalog.202006160325", 
"retry_on_conflict": 5, "_source": false } }
{
  "doc": {
    "identifier": {
      "catalog": "Extended Sites Catalog Asset Store",
      "specification": "catalog",
      "language": "en_US"
    },
    "name": {
      "normalized": "Extended Sites Catalog Asset Store",
      "raw": "Extended Sites Catalog Asset Store"
    },
    "id": {
      "catalog": "10001",
      "member": "7000000000000001000",
      "language": "-1",
      "store": "1"
    },
    "type": "master",
    "__meta": {
      "created": "2020-07-28T18:42:27.926Z",
      "modified": "2020-07-28T18:42:27.942Z",
      "version": {
        "min": 0,
        "max": 0
      }
    }
  },
  "doc_as_upsert": true
}

Stage 2 samples

The following code is an example of the input data for the FindFiltersFromDatabase processor:
{
  "STOREENT_ID": 1,
  "CATALOG_ID": 10001,
  "USAGE": null,
  "CATFILTER_ID": 3074457345616679000,
  "IDENTIFIER": "TestCatalogFilter",
  "EXPRESSION_ID": 3074457345618259500,
  "MEMBER_ID": null,
  "QUERY": "( +*:* -parentCatgroup_id_search:\"10001_10006\")",
  "TRADING_ID": null
}

The FindFiltersFromDatabase processor transforms the input data with the store id, language id, and catalog id passed in from the NiFi FlowFile class into the following output data:


{ "update": { "_id": "1--1-10001", "_index": ".auth.1.catalog.202006160325", 
"retry_on_conflict": 5, "_source": false } }
{
  "doc": {
    "filters": [
      {
        "identifier": "TestCatalogFilter",
        "usage": "",
        "query": "( +*:* -path.tree:\"10006\")",
        "id": {
          "filter": "3074457345616679000",
          "expression": "3074457345618259500",
          "contract": "",
          "member": ""
        }
      }
    ],
    "__meta": {
      "modified": "2020-07-28T18:54:46.440Z"
    }
  }
}