This repository contains the backend services for the Riverline project, focusing on a robust data ingestion pipeline and Next Best Action (NBA) analytics.
To get the Riverline Backend up and running, follow these steps:
-
Prerequisites:
- Python 3.9+
- Java (for PySpark, if you plan to use the Spark data engine)
- Docker (for local database setup, if not using cloud instances)
- Poetry (recommended for dependency management)
-
Clone the repository:
git clone git@github.com:Arvind-puthucode/Multiplatform-Customer-Support-Next-Best-Action-Engine.git cd Multiplatform-Customer-Support-Next-Best-Action-Engine/ -
Install Dependencies: It's recommended to use
uvfor dependency management.# Install poetry if you haven't already pip install uv # Install project dependencies uv sync
If you prefer
pip:pip install -r requirements.txt # You might need to generate this is not present -
Environment Configuration (
.envfile): Create a.envfile in theriverline/backend/directory (one level up fromriverline_backend) with your database credentials and API keys. This file is automatically loaded by the application usingdotenv.Example
.envcontent:# Supabase (PostgreSQL) Configuration SUPABASE_URL="https://your-supabase-url.supabase.co" SUPABASE_KEY="your-supabase-anon-key" # ClickHouse Configuration CLICKHOUSE_HOST="your-clickhouse-host" CLICKHOUSE_PORT="your-clickhouse-port" # e.g., 8443 for HTTPS CLICKHOUSE_USER="your-clickhouse-user" CLICKHOUSE_PASSWORD="your-clickhouse-password" # Azure OpenAI Configuration (for LLM Enhancement) AZURE_OPENAI_API_KEY="your_azure_openai_key" AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/" AZURE_OPENAI_DEPLOYMENT_NAME="gpt-4" # or your specific deployment name # Default data engine (can be overridden via command-line argument) DATA_ENGINE=pandas # or "spark"
The Riverline Backend provides a robust system for processing customer interaction data and generating Next Best Actions (NBA) to address open issues.
The data pipeline is responsible for:
- Reading Data: Ingests raw customer interaction data (e.g., from CSV files).
- Normalization: Transforms raw data into a standardized format.
- Quality Checks: Ensures data integrity and consistency.
- Batch Insertion: Efficiently loads processed data into the chosen database.
- Watermarking: Guarantees idempotent re-runs by tracking the last processed timestamp, preventing duplicate data ingestion.
The NBA engine is designed to determine the optimal next step for open customer issues, leveraging a hybrid approach:
- Data Fetching: Retrieves relevant customer profiles, conversation summaries, and detailed interaction histories from the database.
- Feature Extraction: Extracts simple, actionable features from the raw interaction data (e.g., total interactions, hours since last interaction, presence of urgent keywords).
- Rule-Based Decision Engine: Applies a set of predefined business rules to determine an initial channel and timing for the next action. These rules are designed to capture common scenarios and provide explainable decisions (e.g., long conversations suggest a phone call, dormant issues suggest an email follow-up).
- LLM Enhancement Layer: Utilizes a Large Language Model (LLM) (specifically Azure OpenAI's GPT-4) to enhance the message and reasoning generated by the rule engine. The LLM receives the rule-based output, conversation summary, and full conversation history, and is prompted to:
- Generate a personalized, concise message for the customer, using their ID.
- Provide detailed and crisp reasoning for the chosen channel and timing, leveraging insights from the conversation and customer behavior.
- The LLM is strictly instructed to return output in a JSON format, ensuring structured and reliable parsing.
The NBA engine supports two primary operational modes:
- Batch Processing (for Evaluation): Designed for processing a large number of customer profiles and exporting predictions for analysis. This mode is crucial for evaluating the effectiveness of the NBA engine.
- API Server (for Production Use): Provides a FastAPI endpoint for real-time, single-customer NBA predictions. This is intended for seamless integration into live customer support systems.
Supabase was used only for inital sample csv runs and data is not fully ingested in my supabase instance , if you want to run the pipeline with supabase, you need to ingest the data first.
Clickhouse however already have ingested the twitter csv file
The pipeline supports two different database backends: Supabase (PostgreSQL) and ClickHouse. This design allows for a direct comparison of their performance and scalability characteristics for data ingestion and analytical workloads.
- Supabase (PostgreSQL): A robust, general-purpose relational database. It's excellent for transactional workloads, complex queries, and provides a familiar SQL interface. It's a solid choice for applications requiring strong consistency and a wide range of features.
- ClickHouse: An analytical column-oriented database optimized for high-speed data processing and real-time analytics. It excels at ingesting and querying massive volumes of data, making it ideal for analytical workloads like those found in customer interaction logs.
Performance Observation: During development and testing, we observed a significant performance difference in data ingestion. ClickHouse demonstrated superior ingestion speeds, handling over 2 million records in under 2 minutes from a local system to a free-tier cluster. This highlights ClickHouse's strength for large-scale data warehousing and analytical use cases, making it the preferred choice for high-volume data pipelines in this project.
The decision to combine a rule-based engine with an LLM enhancement layer offers several advantages:
- Explainability: The rule-based layer provides a clear, auditable, and easily understandable foundation for core decisions. This is crucial for business stakeholders who need to understand why a particular action was recommended.
- Efficiency: Simple, common scenarios can be handled quickly and efficiently by rules, reducing reliance on costly LLM calls for every interaction.
- Flexibility and Personalization: The LLM layer adds a layer of sophistication, allowing for more nuanced messaging and reasoning that can adapt to specific conversation contexts and customer nuances. It enhances the output without replacing the core business logic.
- Robustness: By having a rule-based fallback, the system remains functional even if LLM services are unavailable or return unexpected output.
You can specify the database target and the data processing engine (Pandas or Spark).
Command Structure:
python main.py --action run_pipeline --db <database_target>Parameters:
--action run_pipeline: Specifies that the data ingestion pipeline should be executed.--db <database_target>:supabase: Ingests data into the configured Supabase (PostgreSQL) instance.clickhouse: Ingests data into the configured ClickHouse instance.
Data Engine Selection (Environment Variable):
The DATA_ENGINE environment variable determines which processing engine is used.
DATA_ENGINE=pandas: Uses the Pandas-based data engine (default if not set). Suitable for smaller datasets or local development.DATA_ENGINE=spark: Uses the PySpark-based data engine. Recommended for larger datasets and distributed processing.
Examples:
-
Run pipeline with Pandas engine and Supabase:
DATA_ENGINE=pandas python main.py --action run_pipeline --db supabase
-
Run pipeline with Spark engine and ClickHouse:
DATA_ENGINE=spark python main.py --action run_pipeline --db clickhouse
This action processes raw interactions to generate aggregated conversation threads and customer profiles, storing them in the selected database. This step is a prerequisite for running NBA predictions.
python main.py --action process_nba_data --db <database_target>This action fetches processed customer data and runs the NBA prediction engine in batch mode. It's suitable for evaluation and generating predictions for a specified number of customers.
Command Structure:
python main.py --action run_nba_predictions --db <database_target> [--customers <limit>]Parameters:
--action run_nba_predictions: Specifies the batch NBA prediction mode.--db <database_target>:supabase: Uses the Supabase (PostgreSQL) database as the data source.clickhouse: Uses the ClickHouse database as the data source.
--customers <limit>(Optional): Limits the number of customer profiles to process. If omitted, all available customer profiles will be processed. This is useful for controlling the scope of evaluation runs.
Example:
- Process 1000 customers for evaluation using ClickHouse:
python main.py --action run_nba_predictions --db clickhouse --customers 1000
This action starts a FastAPI server that exposes an endpoint for real-time NBA predictions for individual customers. This is intended for production deployment.
Command Structure:
python main.py --action run_nba_api --db <database_target>Parameters:
--action run_nba_api: Specifies that the FastAPI NBA prediction server should be started.--db <database_target>:supabase: Configures the API to fetch data from Supabase.clickhouse: Configures the API to fetch data from ClickHouse.
Example:
-
Start NBA API server using Supabase:
python main.py --action run_nba_api --db supabase
-
Example API Call (after server is running):
curl -X POST "http://localhost:8080/predict_nba" \ -H "Content-Type: application/json" \ -d '{"customer_id": "twitter_user_123"}'
The run_nba_predictions action generates an nba_predictions.csv file that serves as the primary output for evaluation.
The generated CSV file includes the following columns, designed to align with the provided evaluation requirements:
customer_id: Unique identifier for the customer.chat_log: A readable representation of the customer's conversation history, differentiating between customer and support agent interactions.channel: The recommended communication channel (e.g.,twitter_dm_reply,email_reply,scheduling_phone_call).message: The personalized message generated for the customer.send_time: The proposed timestamp for sending the message.reasoning: Detailed explanation for the chosen channel, message, and timing.issue_status: The predicted status of the issue after the recommended action (e.g.,pending_customer_response,escalated).
The run_nba_predictions process automatically identifies and skips customer issues that are already marked as resolved in the conversations table. The number of skipped customers is printed to the console during execution. This ensures that the NBA engine focuses only on open issues.
While a full resolution prediction model would require historical outcome data, our current issue_status column provides a preliminary indication of the expected state after the recommended action. Further development could involve training a model to predict the likelihood of resolution based on the NBA output and historical patterns.
(Optional section for contribution guidelines)
(Optional section for licensing information)