Introduction
I focused on building a reliable data ingestion layer that pulls market and sentiment data into a raw data lake. My goal was to automate fetching CSV and Parquet files daily, validate them, and store them for downstream processes.
Data Sources
- Market data:
- VIX (CBOE) via CSV download
- SPX (S&P 500) via Parquet feed
- Treasury yields (1mo, 2yr, 10yr) from FRED API
- Credit spreads computed from corporate bond and Treasury rates
- Sentiment feeds:
- News headlines scraped using NewsAPI
- Twitter sentiment via Tweepy (filtered by finance keywords)
Architecture
+--------------------+
| Data Ingestion |
| - Prefect Flow |
| - API / CSV fetch |
+--------------------+
|
v
+--------------------+
| Raw Data Lake |
| - S3 (Parquet) |
| - Local backups |
+--------------------+
Implementation Details
-
Prefect Flow:
- Defined a daily schedule:
with Flow("ingest", schedule=Schedule(clocks=[CronClock("0 4 * * *")])) as flow: fetch_vix() fetch_spx() fetch_yield_curve() fetch_credit_spreads() fetch_sentiment() - Each task downloads or queries data, then writes to a local staging folder.
- Defined a daily schedule:
-
CSV & Parquet Handling:
- For CSV (VIX, yields), I used Pandas to read, validate, and convert to Parquet:
df = pd.read_csv("vix.csv", parse_dates=["Date"]) df["VIX"] = df["Close"].astype(float) df.to_parquet("raw/vix.parquet", index=False) - For Parquet (SPX), I read directly and verified schema:
df_spx = pd.read_parquet("spx_feed.parquet") assert "Close" in df_spx.columns df_spx.to_parquet("raw/spx.parquet", index=False)
- For CSV (VIX, yields), I used Pandas to read, validate, and convert to Parquet:
-
FRED API for Yields:
- Used
fredapi:from fredapi import Fred fred = Fred(api_key="YOUR_KEY") tarex = fred.get_series("DGS10") df_yield = pd.DataFrame({"Date": tarex.index, "Yield10": tarex.values}) df_yield.to_parquet("raw/yield_10y.parquet", index=False)
- Used
-
Credit Spreads Calculation:
- Merged corporate bond yields (from CSV) with Treasury yields to compute spread:
df_corp = pd.read_csv("corp_bonds.csv") df_treas = pd.read_parquet("raw/yield_10y.parquet") df_merge = pd.merge(df_corp, df_treas, on="Date") df_merge["CreditSpread"] = df_merge["CorpYield"] - df_merge["Yield10"] df_merge.to_parquet("raw/credit_spreads.parquet", index=False)
- Merged corporate bond yields (from CSV) with Treasury yields to compute spread:
-
Sentiment Fetch:
- NewsAPI for headlines, stored JSON → DataFrame with sentiment score (using
textblob):from newsapi import NewsApiClient newsapi = NewsApiClient(api_key="KEY") articles = newsapi.get_everything(q="finance", from_param=today) df_news = pd.DataFrame(articles["articles"]) df_news["sentiment"] = df_news["description"].apply(lambda t: TextBlob(t).sentiment.polarity) df_news.to_parquet("raw/news_sentiment.parquet", index=False)
- NewsAPI for headlines, stored JSON → DataFrame with sentiment score (using
-
Storage in S3:
- After validation, each Parquet file is uploaded to an S3 bucket:
import boto3 s3 = boto3.client("s3") s3.upload_file("raw/vix.parquet", "my-bucket", "quant/raw/vix.parquet")
- After validation, each Parquet file is uploaded to an S3 bucket:
Challenges & Solutions
- Schema drift: feed providers occasionally change column names. I added assertions and fallback renaming logic in each task.
- Network retries: some downloads failed intermittently. I wrapped fetch calls in retries with exponential backoff.
Next Steps
- Add delta ingestion for incremental updates (only fetch newest rows).
- Integrate streaming Kafka for live tweet ingestion.
- Build monitoring dashboards (Alert on missing data).
# This pipeline laid the foundation. Next, I’ll build feature engineering on top of these clean raw tables.