Chan Inthisone
Back to Projects

Quant Research Platform: Data Ingestion

QuantData

How I built the data ingestion layer for my quant research platform.

Quant Research Platform: Data Ingestion

Introduction

I focused on building a reliable data ingestion layer that pulls market and sentiment data into a raw data lake. My goal was to automate fetching CSV and Parquet files daily, validate them, and store them for downstream processes.

Data Sources

  • Market data:
    • VIX (CBOE) via CSV download
    • SPX (S&P 500) via Parquet feed
    • Treasury yields (1mo, 2yr, 10yr) from FRED API
    • Credit spreads computed from corporate bond and Treasury rates
  • Sentiment feeds:
    • News headlines scraped using NewsAPI
    • Twitter sentiment via Tweepy (filtered by finance keywords)

Architecture

+--------------------+
|  Data Ingestion    |
|  - Prefect Flow    |
|  - API / CSV fetch |
+--------------------+
          |
          v
+--------------------+
|  Raw Data Lake     |
|  - S3 (Parquet)    |
|  - Local backups   |
+--------------------+

Implementation Details

  1. Prefect Flow:

    • Defined a daily schedule:
      with Flow("ingest", schedule=Schedule(clocks=[CronClock("0 4 * * *")])) as flow:
          fetch_vix()
          fetch_spx()
          fetch_yield_curve()
          fetch_credit_spreads()
          fetch_sentiment()
      
    • Each task downloads or queries data, then writes to a local staging folder.
  2. CSV & Parquet Handling:

    • For CSV (VIX, yields), I used Pandas to read, validate, and convert to Parquet:
      df = pd.read_csv("vix.csv", parse_dates=["Date"])
      df["VIX"] = df["Close"].astype(float)
      df.to_parquet("raw/vix.parquet", index=False)
      
    • For Parquet (SPX), I read directly and verified schema:
      df_spx = pd.read_parquet("spx_feed.parquet")
      assert "Close" in df_spx.columns
      df_spx.to_parquet("raw/spx.parquet", index=False)
      
  3. FRED API for Yields:

    • Used fredapi:
      from fredapi import Fred
      fred = Fred(api_key="YOUR_KEY")
      tarex = fred.get_series("DGS10")
      df_yield = pd.DataFrame({"Date": tarex.index, "Yield10": tarex.values})
      df_yield.to_parquet("raw/yield_10y.parquet", index=False)
      
  4. Credit Spreads Calculation:

    • Merged corporate bond yields (from CSV) with Treasury yields to compute spread:
      df_corp = pd.read_csv("corp_bonds.csv")
      df_treas = pd.read_parquet("raw/yield_10y.parquet")
      df_merge = pd.merge(df_corp, df_treas, on="Date")
      df_merge["CreditSpread"] = df_merge["CorpYield"] - df_merge["Yield10"]
      df_merge.to_parquet("raw/credit_spreads.parquet", index=False)
      
  5. Sentiment Fetch:

    • NewsAPI for headlines, stored JSON → DataFrame with sentiment score (using textblob):
      from newsapi import NewsApiClient
      newsapi = NewsApiClient(api_key="KEY")
      articles = newsapi.get_everything(q="finance", from_param=today)
      df_news = pd.DataFrame(articles["articles"])
      df_news["sentiment"] = df_news["description"].apply(lambda t: TextBlob(t).sentiment.polarity)
      df_news.to_parquet("raw/news_sentiment.parquet", index=False)
      
  6. Storage in S3:

    • After validation, each Parquet file is uploaded to an S3 bucket:
      import boto3
      s3 = boto3.client("s3")
      s3.upload_file("raw/vix.parquet", "my-bucket", "quant/raw/vix.parquet")
      

Challenges & Solutions

  • Schema drift: feed providers occasionally change column names. I added assertions and fallback renaming logic in each task.
  • Network retries: some downloads failed intermittently. I wrapped fetch calls in retries with exponential backoff.

Next Steps

  • Add delta ingestion for incremental updates (only fetch newest rows).
  • Integrate streaming Kafka for live tweet ingestion.
  • Build monitoring dashboards (Alert on missing data).

# This pipeline laid the foundation. Next, I’ll build feature engineering on top of these clean raw tables.
console.log(JSON.stringify(project, null, 2))