Scalable Architecture Design

Overview

This document describes a scalable architecture for the PKScreener ecosystem that operates entirely within GitHub Actions, using Telegram for orchestration signals and GitHub as the data layer.

Current Architecture Problems

Problem

Impact

Current Workaround

Telegram as data bus

Slow, rate-limited, 50MB file limit

Split files, polling

No persistent data layer

Each workflow downloads full data

Cache in artifacts

Bot-to-bot communication fragile

Depends on Telegram reliability

Retry logic

Large file transfers

10-50MB ticks.json

Zip compression

No parallel data access

Workflows queue for data

Sequential runs

High GitHub minutes usage

Long-running data jobs

Manual optimization

Proposed Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                         ORCHESTRATION LAYER                                 │
│                        (Telegram Bots - Signals Only)                       │
├─────────────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐              │
│  │  PKScreenerBot  │  │   PKTickBot     │  │   User CLI      │              │
│  │  (Commands)     │  │  (Tick Status)  │  │  (Local Runs)   │              │
│  └────────┬────────┘  └────────┬────────┘  └────────┬────────┘              │
│           │                    │                     │                      │
│           │    CONTROL SIGNALS ONLY (no data)       │                       │
│           └────────────────────┼─────────────────────┘                      │
└────────────────────────────────┼────────────────────────────────────────────┘
                                 │
                                 ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                          GITHUB ACTIONS LAYER                               │
│                       (Compute - Ephemeral Workers)                         │
├─────────────────────────────────────────────────────────────────────────────┤
│  ┌──────────────────────────────────────────────────────────────────────┐   │
│  │                    Workflow: Data Collector                          │   │
│  │  • Runs during market hours (9:15 AM - 3:30 PM IST)                  │   │
│  │  • Connects to Zerodha WebSocket                                     │   │
│  │  • Aggregates ticks into candles (InMemoryCandleStore)               │   │
│  │  • Publishes to GitHub Data Repository every 5 minutes               │   │
│  └──────────────────────────────────────────────────────────────────────┘   │
│                                    │                                        │
│                                    ▼                                        │
│  ┌──────────────────────────────────────────────────────────────────────┐   │
│  │                    Workflow: Scan Workers (Parallel)                 │   │
│  │  • Triggered by schedule or user command                             │   │
│  │  • Pulls latest data from GitHub Data Repository                     │   │
│  │  • Runs scans using PKScreener                                       │   │
│  │  • Posts results to Telegram                                         │   │
│  └──────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
┌────────────────────────────────────────────────────────────────────────────┐
│                        GITHUB DATA REPOSITORY                              │
│                  (Persistent Storage - Free & Fast)                        │
├────────────────────────────────────────────────────────────────────────────┤
│  Repository: pkjmesra/PKScreener-Data (or actions-data-download branch)    │
│                                                                            │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  /data/                                                             │   │
│  │  ├── candles/                                                       │   │
│  │  │   ├── 2024-01-15/                                                │   │
│  │  │   │   ├── candles_0915.json.gz    (9:15 AM snapshot)             │   │
│  │  │   │   ├── candles_0920.json.gz    (9:20 AM snapshot)             │   │
│  │  │   │   ├── candles_0925.json.gz    (9:25 AM incremental)          │   │
│  │  │   │   └── ...                                                    │   │
│  │  │   └── latest.json.gz              (symlink to most recent)       │   │
│  │  │                                                                  │   │
│  │  ├── ticks/                                                         │   │
│  │  │   └── ticks_latest.json.gz        (current day's OHLCV summary)  │   │
│  │  │                                                                  │   │
│  │  └── metadata.json                   (last update time, version)    │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                            │
│  Access: Raw GitHub URLs (no API rate limits for raw content)              │
│  Example: https://raw.githubusercontent.com/.../data/candles/latest.json.gz│
└────────────────────────────────────────────────────────────────────────────┘

Key Design Principles

1. Broadcast Pattern (Push) Instead of Request-Response (Pull)

Before (Request-Response):

Scanner Workflow → Request /ticks → PKTickBot → Send File → Scanner Workflow
                   (waits 30-60s)        (blocked)

After (Broadcast/Push):

Data Collector → Push to GitHub → (available immediately)
                    ↓
Scanner Workflow 1 → Pull from GitHub (parallel)
Scanner Workflow 2 → Pull from GitHub (parallel)
Scanner Workflow N → Pull from GitHub (parallel)

2. Incremental Updates

Instead of sending full 50MB files, send only changed data:

// metadata.json
{
  "last_full_snapshot": "2024-01-15T09:15:00Z",
  "last_incremental": "2024-01-15T14:25:00Z",
  "incremental_files": [
    "candles_0920.json.gz",
    "candles_0925.json.gz",
    "candles_0930.json.gz"
  ],
  "version": "1.0.0"
}

3. Telegram for Signals Only

┌─────────────────────────────────────────────────────────────────┐
│ User sends: /scan X:12:7:4                                      │
│                                                                 │
│ PKScreenerBot responds:                                         │
│   "🚀 Scan queued! Workflow triggered."                         │
│   (triggers GitHub workflow via API)                            │
│                                                                 │
│ Workflow completes → Posts results to Telegram                  │
└─────────────────────────────────────────────────────────────────┘

4. Layered Data Access

class ScalableDataProvider:
    """
    Data access priority:
    1. Local cache (if fresh < 5 min)
    2. GitHub raw content (latest.json.gz)
    3. GitHub API (if raw fails)
    4. Telegram fallback (last resort)
    """
    
    GITHUB_RAW_BASE = "https://raw.githubusercontent.com/pkjmesra/PKScreener/actions-data-download"
    
    def get_candle_data(self):
        # Try local cache first
        if self._is_cache_fresh():
            return self._load_from_cache()
        
        # Try GitHub raw content (fastest, no rate limits)
        data = self._fetch_from_github_raw()
        if data:
            self._update_cache(data)
            return data
        
        # Fallback to Telegram (last resort)
        return self._fetch_from_telegram()

Implementation Components

Component 1: Data Publisher Workflow

# .github/workflows/w-data-publisher.yml
name: Data Publisher

on:
  schedule:
    # Run every 5 minutes during market hours (IST: 9:15-15:30)
    - cron: '*/5 3-10 * * 1-5'  # UTC times
  workflow_dispatch:

jobs:
  publish_data:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          ref: actions-data-download
          
      - name: Setup Python
        uses: actions/setup-python@v5
        
      - name: Collect and Publish Data
        run: |
          python scripts/collect_and_publish.py
        env:
          KTOKEN: ${{ secrets.KTOKEN }}
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          
      - name: Commit Data
        run: |
          git config user.name "github-actions[bot]"
          git config user.email "actions@github.com"
          git add data/
          git commit -m "Data update $(date -u +%Y-%m-%dT%H:%M:%SZ)" || true
          git push

Component 2: Scalable Data Fetcher

# PKDevTools/classes/PKScalableDataFetcher.py

import gzip
import json
import os
import time
from typing import Optional, Dict, Any
from urllib.request import urlopen, Request
from urllib.error import URLError

class PKScalableDataFetcher:
    """
    Scalable data fetcher that uses GitHub as primary data source.
    Eliminates Telegram dependency for data transfer.
    """
    
    GITHUB_RAW_BASE = "https://raw.githubusercontent.com/pkjmesra/PKScreener/actions-data-download/data"
    CACHE_TTL_SECONDS = 300  # 5 minutes
    
    def __init__(self, cache_dir: str = None):
        self.cache_dir = cache_dir or os.path.join(
            os.path.expanduser("~"), ".pkscreener", "cache"
        )
        os.makedirs(self.cache_dir, exist_ok=True)
        self._metadata_cache = None
        self._metadata_fetch_time = 0
    
    def get_latest_candles(self) -> Optional[Dict[str, Any]]:
        """Get latest candle data from the most efficient source."""
        
        # 1. Check local cache
        cached = self._get_from_cache("candles_latest.json")
        if cached and self._is_fresh(cached.get("_cache_time", 0)):
            return cached
        
        # 2. Fetch from GitHub (primary source)
        data = self._fetch_from_github("candles/latest.json.gz")
        if data:
            data["_cache_time"] = time.time()
            self._save_to_cache("candles_latest.json", data)
            return data
        
        # 3. Return stale cache if available
        if cached:
            return cached
        
        return None
    
    def get_ticks_summary(self) -> Optional[Dict[str, Any]]:
        """Get current day's OHLCV summary for all instruments."""
        return self._fetch_from_github("ticks/ticks_latest.json.gz")
    
    def get_metadata(self) -> Optional[Dict[str, Any]]:
        """Get data repository metadata."""
        if self._metadata_cache and self._is_fresh(self._metadata_fetch_time):
            return self._metadata_cache
        
        self._metadata_cache = self._fetch_from_github("metadata.json", compressed=False)
        self._metadata_fetch_time = time.time()
        return self._metadata_cache
    
    def _fetch_from_github(
        self, 
        path: str, 
        compressed: bool = True
    ) -> Optional[Dict[str, Any]]:
        """Fetch data from GitHub raw content."""
        url = f"{self.GITHUB_RAW_BASE}/{path}"
        
        try:
            request = Request(url, headers={"User-Agent": "PKScreener/1.0"})
            with urlopen(request, timeout=30) as response:
                content = response.read()
                
                if compressed and path.endswith(".gz"):
                    content = gzip.decompress(content)
                
                return json.loads(content.decode("utf-8"))
                
        except (URLError, json.JSONDecodeError) as e:
            print(f"GitHub fetch failed for {path}: {e}")
            return None
    
    def _is_fresh(self, cache_time: float) -> bool:
        """Check if cached data is still fresh."""
        return (time.time() - cache_time) < self.CACHE_TTL_SECONDS
    
    def _get_from_cache(self, filename: str) -> Optional[Dict[str, Any]]:
        """Load data from local cache."""
        cache_path = os.path.join(self.cache_dir, filename)
        
        if os.path.exists(cache_path):
            try:
                with open(cache_path, "r") as f:
                    return json.load(f)
            except (json.JSONDecodeError, IOError):
                pass
        
        return None
    
    def _save_to_cache(self, filename: str, data: Dict[str, Any]):
        """Save data to local cache."""
        cache_path = os.path.join(self.cache_dir, filename)
        
        try:
            with open(cache_path, "w") as f:
                json.dump(data, f)
        except IOError as e:
            print(f"Cache save failed: {e}")

Component 3: Data Publisher Script

# scripts/collect_and_publish.py

import gzip
import json
import os
from datetime import datetime

def collect_candle_data():
    """Collect candle data from InMemoryCandleStore."""
    try:
        from pkbrokers.kite import get_candle_store
        
        store = get_candle_store()
        
        # Export to JSON format
        all_data = {}
        
        for token, instrument in store.instruments.items():
            symbol = store.instrument_symbols.get(token, str(token))
            all_data[symbol] = {
                "token": token,
                "candles": {}
            }
            
            # Export each interval
            for interval in ["1m", "5m", "15m", "60m", "day"]:
                candles = store.get_candles(
                    instrument_token=token,
                    interval=interval,
                    count=100
                )
                if candles:
                    all_data[symbol]["candles"][interval] = candles
        
        return all_data
        
    except ImportError:
        return None

def publish_to_github(data, output_dir="data"):
    """Publish data to GitHub repository structure."""
    today = datetime.now().strftime("%Y-%m-%d")
    current_time = datetime.now().strftime("%H%M")
    
    # Create directory structure
    candles_dir = os.path.join(output_dir, "candles", today)
    os.makedirs(candles_dir, exist_ok=True)
    
    # Save timestamped snapshot
    snapshot_path = os.path.join(candles_dir, f"candles_{current_time}.json.gz")
    with gzip.open(snapshot_path, "wt", encoding="utf-8") as f:
        json.dump(data, f)
    
    # Update latest symlink
    latest_path = os.path.join(output_dir, "candles", "latest.json.gz")
    with gzip.open(latest_path, "wt", encoding="utf-8") as f:
        json.dump(data, f)
    
    # Update metadata
    from datetime import timezone
    metadata = {
        "last_update": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'),
        "snapshot_count": len(os.listdir(candles_dir)),
        "instrument_count": len(data),
        "version": "2.0.0"
    }
    
    metadata_path = os.path.join(output_dir, "metadata.json")
    with open(metadata_path, "w") as f:
        json.dump(metadata, f, indent=2)
    
    print(f"Published {len(data)} instruments to {snapshot_path}")

if __name__ == "__main__":
    data = collect_candle_data()
    if data:
        publish_to_github(data)
    else:
        print("No data to publish")

Workflow Optimization

Before: Sequential Bot Communication

Workflow Start → Wait for PKTickBot → Download File → Process → Complete
    (0s)              (30-60s)           (10-30s)      (60s)     (2-3 min)

After: Parallel GitHub Access

Workflow Start → Fetch from GitHub → Process → Complete
    (0s)              (2-5s)          (60s)     (1 min)

GitHub Minutes Savings

Scenario

Before

After

Savings

Single scan

3 min

1 min

66%

10 parallel scans

30 min

10 min

66%

Daily scheduled (50 scans)

150 min

50 min

100 min/day

Telegram Bot Changes

PKScreenerBot (Minimal Changes)

# Only change: Add workflow trigger instead of data fetch

async def handle_scan_command(update, context):
    user_id = update.effective_user.id
    scan_params = parse_scan_command(update.message.text)
    
    # Trigger GitHub workflow (no data transfer via Telegram)
    success = trigger_github_workflow(
        workflow="w8-workflow-alert-scan_generic.yml",
        params={
            "params": scan_params,
            "user": str(user_id)
        }
    )
    
    if success:
        await update.message.reply_text(
            f"🚀 Scan queued!\n"
            f"Parameters: {scan_params}\n"
            f"Results will be posted here when ready."
        )
    else:
        await update.message.reply_text("❌ Failed to queue scan. Try again.")

PKTickBot (Simplified)

# PKTickBot now only needs to:
# 1. Collect ticks and aggregate to candles
# 2. Push to GitHub (not Telegram)
# 3. Report status via Telegram

class PKTickBot:
    async def status(self, update, context):
        """Report data collection status."""
        store = get_candle_store()
        stats = store.get_stats()
        
        await update.message.reply_text(
            f"📊 Data Collection Status\n"
            f"Instruments: {stats['instrument_count']}\n"
            f"Ticks Processed: {stats['ticks_processed']}\n"
            f"Last Update: {stats['last_tick_time']}\n"
            f"Data available on GitHub ✓"
        )
    
    # Remove: send_ticks, send_db (no longer needed via Telegram)

Migration Path

Phase 1: Add GitHub Publishing (Week 1)

  • Add data publisher workflow

  • Keep existing Telegram data transfer as fallback

  • Test GitHub raw content access

Phase 2: Update Data Consumers (Week 2)

  • Add PKScalableDataFetcher to PKDevTools

  • Update screenerStockDataFetcher to use new fetcher

  • Test parallel workflow execution

Phase 3: Simplify Telegram Bots (Week 3)

  • Remove data transfer commands from PKTickBot

  • Keep status and control commands

  • Remove PKTickBotConsumer (no longer needed)

Phase 4: Optimization (Week 4)

  • Implement incremental updates

  • Add compression optimization

  • Monitor and tune cache TTL

Fallback Strategy

┌─────────────────────────────────────────────────────────────────┐
│                     Data Access Priority                        │
├─────────────────────────────────────────────────────────────────┤
│ 1. Local Cache (< 5 min old)     │ Instant           │ Primary  │
│ 2. GitHub Raw Content            │ 2-5 seconds       │ Primary  │
│ 3. GitHub API                    │ 5-10 seconds      │ Fallback │
│ 4. InMemoryCandleStore (if live) │ Instant (in-proc) │ Fallback │
│ 5. Telegram Bot (/ticks)         │ 30-60 seconds     │ Last     │
└─────────────────────────────────────────────────────────────────┘

Monitoring & Alerts

# Add to metadata.json for monitoring
{
    "health": {
        "last_successful_publish": "2024-01-15T14:25:00Z",
        "consecutive_failures": 0,
        "data_freshness_seconds": 300
    },
    "alerts": {
        "stale_data_threshold_minutes": 15,
        "alert_channel": "@pktickbot_alerts"
    }
}

24x7 Data Availability

The system is designed to provide stock data availability around the clock, enabling users to trigger scans from the Telegram bot at any time.

Architecture Overview

+---------------------------------------------------------------------+
|                      24x7 DATA AVAILABILITY                         |
+---------------------------------------------------------------------+
|                                                                     |
|  MARKET HOURS (9:15 AM - 3:30 PM IST, Mon-Fri)                      |
|  +-- w-data-publisher.yml runs every 5 minutes                      |
|  +-- Uses real-time ticks from PKTickBot/InMemoryCandleStore        |
|  +-- Aggregates into 1m, 2m, 3m, 4m, 5m, 10m, 15m, 30m, 60m candles |
|  +-- Publishes fresh data to GitHub (results/Data/)                 |
|                                                                     |
|  AFTER MARKET HOURS                                                 |
|  +-- w9-workflow-download-data.yml runs at 3:28 PM IST              |
|  +-- Downloads 52-week historical data for all stocks               |
|  +-- Saves pickle files to actions-data-download/                   |
|  +-- w-data-publisher.yml runs every 2 hours, uses pickle data      |
|                                                                     |
|  WEEKENDS & HOLIDAYS                                                |
|  +-- Data publisher continues running every 2 hours                 |
|  +-- Uses last available pickle/cached data                         |
|  +-- Ensures data is always accessible via GitHub                   |
|                                                                     |
|  USER TRIGGERS SCAN (anytime, 24x7)                                 |
|  +-- Telegram bot -> GitHub workflow dispatch                       |
|  +-- Scan workflow fetches from GitHub raw content                  |
|  +-- Data always available (fresh during market, cached otherwise)  |
|  +-- 2-5 second latency vs 30-60 seconds via Telegram               |
|                                                                     |
+---------------------------------------------------------------------+

Data Source Priority (24x7)

The system uses a layered approach to ensure data is always available:

Priority

Source

When Used

Latency

1

Real-time ticks (InMemoryCandleStore)

Market hours, live tick collection

Instant

2

GitHub ticks.json

Fresh data published during market

2-5s

3

Pickle files (w9 workflow)

After market, weekends, holidays

2-5s

4

ticks.json.zip fallback

Database blocked (quota exceeded)

2-5s

5

Local disk cache

Network issues, fallback

Instant

6

Stale cache data

All sources fail

Instant

Fallback: ticks.json.zip to Pickle Conversion

When the primary data sources fail (e.g., Turso database quota exceeded), the system uses ticks.json.zip as a fallback:

+------------------------------------------------------------+
|                   FALLBACK MECHANISM                       |
+------------------------------------------------------------+
|                                                            |
|  1. w9-workflow-download-data.yml runs                     |
|     +-- pkscreenercli -d -a Y (downloads stock data)       |
|     +-- If database blocked -> no pkl files created        |
|                                                            |
|  2. Fallback step activates                                |
|     +-- Downloads ticks.json.zip from GitHub               |
|     +-- Extracts ticks.json                                |
|     +-- Runs ticks_to_pickle.py                            |
|     +-- Creates stock_data_YYMMDD.pkl                      |
|                                                            |
|  3. Result: pkl files available for scans                  |
+------------------------------------------------------------+

Script: .github/workflows/ticks_to_pickle.py

Converts ticks.json to pickle format:

  • Reads instrument data from ticks.json

  • Converts each symbol’s OHLCV data to pandas DataFrame

  • Saves as stock_data_YYMMDD.pkl and intraday_stock_data_YYMMDD.pkl

Local SQLite Database (Offline Support)

For complete offline scan support, a local SQLite database system stores candle data:

+------------------------------------------------------------+
|               LOCAL SQLITE DATABASE SYSTEM                 |
+------------------------------------------------------------+
|                                                            |
|  DAILY CANDLES                                             |
|  +-- candles_daily_YYMMDD.db                               |
|  +-- Contains 1-year historical OHLCV data                 |
|  +-- ~40MB compressed                                      |
|                                                            |
|  INTRADAY CANDLES                                          |
|  +-- candles_YYMMDD_intraday.db                            |
|  +-- Contains 1-minute candles for trading day             |
|  +-- ~10MB per day                                         |
|                                                            |
|  SYNC WORKFLOW (w-local-candle-sync.yml)                   |
|  +-- During market hours: Every 30 minutes                 |
|  +-- After market: Once at 3:35 PM IST                     |
|  +-- Commits SQLite files to repository                    |
|                                                            |
+------------------------------------------------------------+

Module: pkbrokers.kite.localCandleDatabase.LocalCandleDatabase

Features:

  • Syncs from Turso when available

  • Falls back to tick data aggregation

  • Exports to PKScreener-compatible pickle format

  • Thread-safe operations

Workflow Schedule

# w-data-publisher.yml schedule
on:
  schedule:
    # During market hours: every 5 minutes
    - cron: '*/5 3-10 * * 1-5'  # UTC (IST 9:00-16:00)
    # Outside market hours: every 2 hours for 24x7 availability
    - cron: '30 */2 * * *'

Data Freshness Guarantees

Time Period

Data Type

Max Age

Update Frequency

Market hours

Real-time ticks

5 min

Every 5 min

After market (same day)

End-of-day OHLCV

2 hours

Every 2 hours

Weekends/Holidays

Last trading day

2 hours

Every 2 hours

Network failure

Cached data

24 hours

On recovery

Integration with Existing Workflows

The 24x7 data availability integrates with existing PKScreener workflows:

  1. w7-workflow-prod-scans-trigger.yml - Scheduled production scans

    • Now uses GitHub data layer instead of Telegram

    • Can run anytime with available data

  2. w8-workflow-alert-scan_generic.yml - User-triggered scans

    • Fetches data from GitHub raw content

    • Works 24x7 with appropriate data (fresh or cached)

  3. w9-workflow-download-data.yml - After-market data download

    • Downloads 52-week historical data

    • Pickle files used by data publisher for non-market hours

User Experience

Users can trigger scans from Telegram at any time:

+------------------------------------------------------------------+
| User: /scan X:12:7:4 (at 11:00 PM IST)                           |
|                                                                  |
| System:                                                          |
| 1. Bot receives command -> Triggers GitHub workflow              |
| 2. Workflow starts -> Fetches data from GitHub                   |
| 3. Data source: End-of-day OHLCV from last trading session       |
| 4. Scan runs with available data                                 |
| 5. Results posted to Telegram                                    |
|                                                                  |
| Response: "Scan complete! Using data from 2024-01-15 EOD"        |
+------------------------------------------------------------------+

Metadata Tracking

The data publisher maintains metadata for transparency:

{
  "last_update": "2024-01-15T22:30:00Z",
  "last_update_ist": "2024-01-16T04:00:00+05:30",
  "is_market_hours": false,
  "data_source": "pickle",
  "instrument_count": 2500,
  "version": "2.0.0",
  "publisher": "w-data-publisher.yml",
  "availability": "24x7",
  "health": {
    "status": "healthy"
  }
}

Summary

Aspect

Before

After

Data Transfer

Telegram (slow, limited)

GitHub (fast, unlimited)

Parallelism

1 workflow at a time

Unlimited parallel

Latency

30-60 seconds

2-5 seconds

Reliability

Depends on Telegram

Git-backed durability

Cost

High GitHub minutes

66% reduction

Complexity

Bot-to-bot messaging

Simple HTTP GET

Availability

Market hours only

24x7

After-hours scans

Not supported

Full support with EOD data