Data Validation Block
The Data Validation block uses a Databricks job specified by the Job ID to clean, transform, or validate data before passing it to the next block. You can write a Databricks job that reads the input file retrieved through the source block, converts it into a table, applies the necessary validations and transformations, and writes the result to an output table. The output table serves as the data source for field mapping in the subsequent blocks.
You can add this block to your DIY template, or use it in select pre-defined Connect+ templates.
Use Case
Requirement: Cleaning Customer Data Before Loyalty Points Allocation
A retail brand uses Capillary’s loyalty platform to allocate points by importing customer data from an external system. However, the incoming file often includes duplicate records and rows with missing mobile numbers, which impacts point allocation.
Solution
Use the Validation block in Capillary to clean the data before processing it. Configure a custom Databricks job that:
- Removes duplicate customer records based on email address or mobile number.
- Eliminates rows where the mobile number is missing.
Provide the Databricks job ID in the Validation block configuration. After validation and cleaning, Capillary forwards the processed data to the next blocks for loyalty points allocation.
Configuring the Data Validation Block Interface
Pre-Requisites
- Ensure you have access to the database for your organisation's group. For access, create a ticket to the Capillary Access team.
- Name of the target database.

Field Name | Description |
---|---|
DataBricks JobId | The Databricks Job ID that defines the validation and transformation logic. |
Is Data Validation enabled? | Indicates whether to perform data validation: - If unchecked, the block is skipped and the workflow runs as usual. - If checked, the configured Databricks job runs to validate the data. |
Creating a Databricks Job and Job ID
- Log in to the Databricks portal.
- From the drop-down menu in the top right corner, select your workspace.
Your workspace is namednotebook-cluster in which your organisation exists
.
Example:notebook-eucrm
, whereeurcrm
is the cluster in which the organisation exists.

- From the left pane, click Workspace > Create > Notebook.
Write a Databricks notebook for the required validation and transformation.
Note: A complex Databricks job might slow down the workflow.

Databricks Notebook
- From the left pane, click Workflows > Create job.
The job ID is created. Use the Job ID in the DataBricks JobId field of the Data Validation block.

- Create a job and add a task.
Note: In the Path field, give the path of your Databricks Notebook.

Create a Job and a Task
Creating the Databricks Notebook Script
You can create a Databricks script to transform and validate the data as per your requirements. The following code should be included as part of your script.
input_table = dbutils.widgets.get("inputTable")
output_table = dbutils.widgets.get("outputTable")
database_name = dbutils.widgets.get("database_name")
# Define full table names
input_table_name = f"{database_name}.{input_table}"
output_table_name = f"{database_name}.{output_table}"
Use the following line to write data to the output table. You can change the name of the DataFrame in your script, but the rest of the code must remain unchanged. Here, renamed_df
is the Dataframe name.
renamed_df.write.mode("overwrite").format("delta").saveAsTable(output_table_name)
Note: Use the delta
format to store data into the output table
The following is a sample Databricks notebook script. The script fetches all the rows from the input table where the value of id
is less than or equal to 30. These rows are written into the output table, and the columns id
and name
are renamed to customer_id
and customer_name
, respectively.
import json
from pyspark.sql import SparkSession
# Initialize Spark
spark = SparkSession.builder.appName("Databricks S3 to Table").getOrCreate()
# Step 1: Read Job Parameters from notebook_params
input_table = dbutils.widgets.get("inputTable")
output_table = dbutils.widgets.get("outputTable")
database_name = dbutils.widgets.get("database_name")
# Define full table names
input_table_name = f"{database_name}.{input_table}"
output_table_name = f"{database_name}.{output_table}"
# Read data from the input table - This query selects all rows from the table where the `id` column is less than or equal to 30.
df = spark.sql(f"SELECT * FROM {input_table_name} WHERE id <= 30")
# Perform the required transformation - Here, rename columns to customer_id and customer_name
renamed_df = (
df.withColumnRenamed("id", "customer_id")
.withColumnRenamed("name", "customer_name")
) # Add more columns as needed
# Write the transformed data to the output table. It is mandatory to write the transformed data into the output table. Here, the data is written to the output table in Delta format.
renamed_df.write.mode("overwrite").format("delta").saveAsTable(output_table_name)
FAQs
- Where should I create the Databricks job?
You must create the Databricks job in the notebook cluster where your organisation resides. - What kind of transformations can I perform using this block?
You can remove duplicates, drop empty rows/columns, standardise headers, or apply custom logic such as formatting phone numbers or merging columns. - What should the Databricks job contain?
The job should read the input table, apply transformation or validation logic, and write the results to the output table. The output table is then used by subsequent blocks for field mapping. - Can I write a complex Databricks job to achieve my transformation?
You can write a complex Databricks job, but it can slow down the workflow. - How are spaces and hyphens in the header names handled?
The spaces ( ) and hyphens (-) in the header names are converted to underscores (_).
Example: If the header name isCustomer ID
it is converted intoCustomer_ID
.
Updated 12 days ago