Data Validation Block

The Validation block cleans up unnecessary data, transforms it, or validates it before passing it to the next block. A custom Databricks job handles these tasks. You can add this block to your DIY template, or use it in select pre-defined Connect+ templates.

Use Cases

Requirement 1: Cleaning Customer Data Before Loyalty Points Allocation

A retail brand uses Capillary’s loyalty platform to allocate points by importing customer data from an external system. However, the incoming file often includes duplicate records and rows with missing mobile numbers, which impacts point allocation.

Solution
Use the Validation block in Capillary to clean the data before processing it. Configure a custom Databricks job that:

  • Removes duplicate customer records based on email address or mobile number.
  • Eliminates rows where the mobile number is missing.

Provide the Databricks job ID in the Validation block configuration. After validation and cleaning, Capillary forwards the processed data to the next blocks for loyalty points allocation.

Requirement 2: Formatting Inconsistent CSV Files for Ingestion

A US-based brand sends customer data in CSV files for loyalty points allocation through Capillary. However, these files do not contain separators (such as commas or pipes) or headers, making them difficult to process and map correctly.

Solution
Use the Validation block in Capillary to transform the data before further processing. Configure a custom Databricks job that:

  • Adds the required separators to the CSV data based on the expected format.
  • Inserts headers to enable accurate column mapping.
  • Cleans up any malformed or inconsistent data entries.

Include the Databricks job ID in the Validation block configuration. The cleaned and formatted data is then passed to subsequent blocks for loyalty points allocation in Capillary.

Configuring the Data Validation Block Interface

Pre-Requisite

  • Ensure you have access to the database for your organisation's group. For access, create a ticket to the Capillary Access team.
  • Name of the target database.

Field NameDescription
DataBricks JobIdThe Databricks Job ID that defines the validation and transformation logic.
Is Data Validation enabled?Indicates whether to perform data validation:

- If unchecked, the block is skipped and the workflow runs as usual.
- If checked, the configured Databricks job runs to validate the data.

Creating a Databricks Job and Job ID

  1. Log in to the Databricks portal.
  2. From the drop-down menu in the top right corner, select your workspace.
    Your workspace is named notebook-.
    Example: notebook-eucrm.
  1. From the left pane, click Workspace > Create > Notebook.
    Write a Databricks notebook for the required validation and transformation.
Databricks Notebook

Databricks Notebook

  1. From the left pane, click Workflows > Create job.
    The job ID is created. Use the Job ID in the DataBricks JobId field of the Data Validation block.
  1. Create a job and add a task.
    Note: In the Path field, give the path of your Databricks Notebook.
Create a Job and a Task

Create a Job and a Task

Creating the Databricks Notebook Script

The following is a sample Databricks notebook script. The script fetches data from the input table where the ID is less than or equal to 30.

import json
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("Databricks S3 to Table").getOrCreate()

# Step 1: Read Job Parameters from notebook_params
delimiter = dbutils.widgets.get("delimiter")
region_name = dbutils.widgets.get("region_name")
bucket_name = dbutils.widgets.get("bucket_name")
input_file_path = dbutils.widgets.get("input_file_path")
input_table = dbutils.widgets.get("inputTable")
output_table = dbutils.widgets.get("outputTable")
orgId = dbutils.widgets.get("orgId")
database_name = dbutils.widgets.get("database_name")

# Define full table names
input_table_name = f"{database_name}.{input_table}"
output_table_name = f"{database_name}.{output_table}"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# Read data from the input table
df = spark.sql(f"SELECT * FROM {input_table_name} WHERE id <= 30")

# Rename columns as required
renamed_df = (
    df.withColumnRenamed("id", "id")
      .withColumnRenamed("name", "name")
)  # Add more columns as needed

# Write the transformed data to the output table in Delta format
renamed_df.write.mode("overwrite").format("delta").saveAsTable(output_table_name)

The code below must be present in your script. Do not modify it. You can add your code for validation and transformation after this.

input_table = dbutils.widgets.get("inputTable")
output_table = dbutils.widgets.get("outputTable")
database_name = dbutils.widgets.get("database_name")

# Define full table names
input_table_name = f"{database_name}.{input_table}"
output_table_name = f"{database_name}.{output_table}"