Ingest Price index pipeline

For information about the Product index schema design, see Ingest Product index schema. For information on calling the Ingest service, see Search Ingest Service API. For a complete listing of Elasticsearch index fields and parameters, see Elasticsearch index field types.

Price field mapping from database

Stage 1: Associate prices to Product document
This stage describes how Price data can be transformed and loaded into the Product index.

You begin the process by running the following SQL to retrieve price data from the Commerce database:

WITH RELATED_STORES ( STORE_ID ) AS (
	    SELECT RELATEDSTORE_ID 
	      FROM STOREREL
	     WHERE STATE       =  1 
	       AND STRELTYP_ID = -4 
	       AND STORE_ID    =  ${param.storeId} ),
	CATALOG_CATENTRY ( CATENTRY_ID ) AS (
	      SELECT G.CATENTRY_ID
	        FROM CATGPENREL G, 
	             CATENTRY   E,
	             STORECAT   SC,
	             RELATED_STORES RS
	       WHERE G.CATENTRY_ID   = E.CATENTRY_ID 
	         AND G.CATALOG_ID    = ${param.catalogId} 
	         AND E.MARKFORDELETE = 0
	         AND G.CATALOG_ID    = SC.CATALOG_ID
	         AND RS.STORE_ID     = SC.STOREENT_ID )     
SELECT CER.CATENTRY_ID_CHILD CATENTRY_ID,
       LISTAGG(TO_CHAR(P.PRICE),', ') WITHIN GROUP (ORDER BY FILTER.USAGE DESC) PRICE,
       LISTAGG(P.CURRENCY,'###') CURRENCY,
       LISTAGG(FILTER.USAGE,', ') USAGE
  FROM OFFER O,
       OFFERPRICE P,
       ( SELECT A.CATENTRY_ID_PARENT CATENTRY_ID_PARENT, 
                A.CATENTRY_ID_CHILD  CATENTRY_ID_CHILD
           FROM CATENTREL        A,
                CATALOG_CATENTRY B,
                CATENTRY         C
          WHERE A.CATENTRY_ID_PARENT = B.CATENTRY_ID 
            AND C.CATENTRY_ID        = A.CATENTRY_ID_CHILD 
            AND C.MARKFORDELETE      = 0
            AND A.CATRELTYPE_ID IN ('PRODUCT_VARIANT')
          UNION
         SELECT A.CATENTRY_ID CATENTRY_ID_PARENT, 
                A.CATENTRY_ID CATENTRY_ID_CHILD
           FROM CATALOG_CATENTRY A ) CER,
       CATENTRY C,
       ( SELECT MAX(O.PRECEDENCE) MAX_PRECEDENCE, 
                O.TRADEPOSCN_ID, 
                O.CATENTRY_ID, 
                TS.STTPCUSG_ID USAGE
          FROM OFFER      O,
               TRADEPOSCN T,
               STORECENT  S, 
               STORETPC   TS,
               RELATED_STORES RS
         WHERE T.TRADEPOSCN_ID = TS.TRADEPOSCN_ID 
           AND S.STOREENT_ID   = TS.STOREENT_ID
           AND S.STOREENT_ID   = RS.STORE_ID
           AND O.TRADEPOSCN_ID = T.TRADEPOSCN_ID 
           AND O.CATENTRY_ID   = S.CATENTRY_ID
           AND (O.STARTDATE IS NULL OR CURRENT_TIMESTAMP > O.STARTDATE) AND (O.ENDDATE IS NULL OR O.ENDDATE > CURRENT_TIMESTAMP)
           AND O.PUBLISHED     = 1 
           AND (O.MINIMUMQUANTITY IN (1,0) OR O.MINIMUMQUANTITY IS NULL)
         GROUP BY O.TRADEPOSCN_ID, O.CATENTRY_ID, TS.STTPCUSG_ID ) FILTER
 WHERE O.TRADEPOSCN_ID = FILTER.TRADEPOSCN_ID
   AND O.CATENTRY_ID   = FILTER.CATENTRY_ID
   AND O.PRECEDENCE    = FILTER.MAX_PRECEDENCE
   AND (O.STARTDATE IS NULL OR CURRENT_TIMESTAMP > O.STARTDATE)
   AND (O.ENDDATE IS NULL OR O.ENDDATE > CURRENT_TIMESTAMP)
   AND O.PUBLISHED   = 1
   AND O.OFFER_ID    = P.OFFER_ID
   AND C.CATENTRY_ID = CER.CATENTRY_ID_CHILD ${extCatentryAndSQL}
   AND C.CATENTTYPE_ID <> 'BundleBean'
   AND O.CATENTRY_ID = CER.CATENTRY_ID_PARENT
   AND ( ( CER.CATENTRY_ID_PARENT = CER.CATENTRY_ID_CHILD ) 
   		OR NOT EXISTS    
         ( SELECT 1
             FROM OFFER, 
                  TRADEPOSCN, 
                  STORECENT, 
                  CATGRPTPC
            WHERE OFFER.CATENTRY_ID        = CER.CATENTRY_ID_CHILD 
              AND OFFER.PUBLISHED          = 1 
              AND OFFER.TRADEPOSCN_ID      = TRADEPOSCN.TRADEPOSCN_ID
              AND OFFER.CATENTRY_ID        = STORECENT.CATENTRY_ID 
              AND CATGRPTPC.STORE_ID       = STORECENT.STOREENT_ID
              AND TRADEPOSCN.TRADEPOSCN_ID = CATGRPTPC.TRADEPOSCN_ID ) )
 GROUP BY CER.CATENTRY_ID_CHILD
 ORDER BY CATENTRY_ID
 LIMIT ${param.offset}, ${param.pageSize}
Next, the result set is passed to the FindPricesFromDatabase processor for transformation, using the following table to ​map the database field returned from the SQL above to an index field in the Product index:​
Index Field​ Name​ ​Index Field Type ​​Description
Properties​​​​
​​prices/list/<currency>/price ​​float ​​List prices in the provided currency; mapped to table OFFERPRICE
prices​​/offer/<currency>/price ​float ​​Contracted price or standard offer price in the provided currency; mapped to table OFFERPRICE​
Sample
The following code is an example of the input data for the FindPricesFromDatabase processor.
{
  "CATENTRY_ID": 10010,
  "PRICE": "50.00000, 50.00000",
  "CURRENCY": "USD###USD",
  "USAGE": "1, 2"
}
The FindPricesFromDatabase processor transforms the input data into the following output data. The store id, language id, and catalog id are passed in from NiFi FlowFile class as attributes.
{ "update": { "_id": "1--1-10001-10010", "_index": ".auth.1.product.202006160325", "retry_on_conflict": 5, "_source": false } }
{ "doc": {"prices":{"offer":{"usd":50.0},"list":{"usd":50.0},"10005":{"usd":50.0}},"__meta":{"modified":"2020-07-27T17:50:41.951Z"}} }