Skip to main content

Bolt Chaining

Bolt chaining enables users to execute and fetch data from any other bolt in their own bolt using Custom Script Engine. It is an excellent way to combine, compare and enrich the data of multiple data sources in one place.

For example, you want to email your sellers regarding their orders. The order details are present in a database to which you have access, like SQL, but the email Ids of the sellers can only be fetched using an API; in this case, you can use Bolt Chaining to solve your use case as follows:-

  1. First, create a Bolt that fetches data from a Rest API and performs corrections, and set the destination to Rest API

  2. Create another Bolt that will fetch data from the SQL database. By using Bolt Chaining, you can call the first Bolt from your SQL database Bolt, combine the information of these two data sources and then configure the email as a destination to send the final report to the sellers.

Limitation

  • Maximum no of rows fetched from the chained bolt. Default : 50,000 and Max : 1L

Functions to Use in Scripts

You can use the following functions in your script to chain the Bolt-

1. Bolt Name: withName() supports Bolt as an argument. You can use this function to call another Bolt from your Bolt.

For example, bolt.withName("ExampleBolt").execute()

2. Parameter: withParameter()supports String, Any as the arguments. It is used to chain the parameterized bolt; you can configure the parameter's value by passing the key-value pair.

For example, bolt.withName("Example Bolt").withParameter("key","value")

3. All Parameters: withParameters() supports object \<String,Any> as an argument. You can use this function to configure all the parameter values of a bolt by passing a dictionary of key-value.

For example, bolt.withName("Example Bolt".withParameters({"param1":"value1", "param2: value2"})

4. Source Integration: sourceIntegration() supports String as an argument. It is used to change the Integration of the dataset, all you need to pass the id of a created Integration.

For example, bolt.withName("Example Bolt").sourceIntegration("uuid-of-the-Integration)

5. Row Limit: rowLimit() supports Integer as an argument. It is used to fetch maximum no of rows from the chained bolt. Default : 50,000 and Max : 1L.

For example, bolt.withName("Example Bolt".rowLimit(5)

6. Batch Size: batch() you can pass Integer as an argument. It is used to configure the batch size of the chained Bolt. i.e., Incase the Bolt contains 1L results it and batchSize are 10000, the chained Bolt will process data in batches of 10,000 and a total of 10 batches will be created.

For example, bolt.withName("Example Bolt").batchSize(500)

Script View

  1. Use this script to execute a bolt in sync, which will not override the response mode. You can also set Email as Destination to send the report to the end users via email.

bolt
.withName("name")
.withParameter("key","value")
.withParameter("key",value)
.rowLimit(10)
.execute()

2. Use this script to get Bolt results as list and it will not allow sink_mode to run.

bolt
.withName("name")
.withParameter("key","value")
.withParameter("key",value)
.rowLimit()
.list()

3.Use this script to get single bolt object.



bolt
.withName("name")
.withParameter("key","value")
.withParameter("key",value)
.rowLimit()
.mono()


Response Object

{ 
"status":200,
"ok":true,
"meta":{
//contain meta information
},
"results":[
//contains bolt results
],
"result":{
//contain bolt result incase of mono_mode
},
"error":{
"reason" : "Hello",
"app_error" : {
"type" : "BAD"
"message" : "",
"debug_message" : ""
}
}
}

Step by Step to Chain a Bolt

Step 1: Select Custom Script Engine

Once you create a Dataset, you will move to Data Transformation stage, where you can select Custom Script Engine to execute Bolt chainning.

Custom Script

Step 2: Write Script

In the script editor, you can write a script to call another Bolt using the withname() function and do the needful as per your business use case.

Read Script Engine doc here.

In the following example, we have written a script to call a Bolt to compare it's data with our source's data. For debugging we have also logged the Bolt's status, metadata, and result.

Script

function processRow(rowNo, row, context) {
// Write your code here

return row
}


/*
rows: batch data being processed in the pipeline
context: context information for the pipeline batch
returns: modified rows of the pipeline batch data
*/
function process(rows, context) {

var result = bolt.withName("MySQLtoRestAPIBolt666015").execute()
script.log("Result status" + result.ok)
script.log("MEta status" , result.meta)
script.log("Result " , result.result)

}
return processedRows
}

Step 3: Test

Once you done with the scripts, click the Test button to run the script.

Test

Any Question? 🤓

We are always an email away to help you resolve your queries. If you need any help, write to us at - 📧 support@boltic.io