Introduction
I focused on building a reliable data ingestion layer that pulls market and sentiment data into a raw data lake. My goal was to automate fetching CSV and Parquet files daily, validate them, and store them for downstream processes.
Data Sources
- Market data:
- VIX (CBOE) via CSV download
- SPX (S&P 500) via Parquet feed
- Treasury yields (1mo, 2yr, 10yr) from FRED API
- Credit spreads computed from corporate bond and Treasury rates
 
- Sentiment feeds:
- News headlines scraped using NewsAPI
- Twitter sentiment via Tweepy (filtered by finance keywords)
 
Architecture
+--------------------+
|  Data Ingestion    |
|  - Prefect Flow    |
|  - API / CSV fetch |
+--------------------+
          |
          v
+--------------------+
|  Raw Data Lake     |
|  - S3 (Parquet)    |
|  - Local backups   |
+--------------------+
Implementation Details
- 
Prefect Flow: - Defined a daily schedule:
with Flow("ingest", schedule=Schedule(clocks=[CronClock("0 4 * * *")])) as flow: fetch_vix() fetch_spx() fetch_yield_curve() fetch_credit_spreads() fetch_sentiment()
- Each task downloads or queries data, then writes to a local staging folder.
 
- Defined a daily schedule:
- 
CSV & Parquet Handling: - For CSV (VIX, yields), I used Pandas to read, validate, and convert to Parquet:
df = pd.read_csv("vix.csv", parse_dates=["Date"]) df["VIX"] = df["Close"].astype(float) df.to_parquet("raw/vix.parquet", index=False)
- For Parquet (SPX), I read directly and verified schema:
df_spx = pd.read_parquet("spx_feed.parquet") assert "Close" in df_spx.columns df_spx.to_parquet("raw/spx.parquet", index=False)
 
- For CSV (VIX, yields), I used Pandas to read, validate, and convert to Parquet:
- 
FRED API for Yields: - Used fredapi:from fredapi import Fred fred = Fred(api_key="YOUR_KEY") tarex = fred.get_series("DGS10") df_yield = pd.DataFrame({"Date": tarex.index, "Yield10": tarex.values}) df_yield.to_parquet("raw/yield_10y.parquet", index=False)
 
- Used 
- 
Credit Spreads Calculation: - Merged corporate bond yields (from CSV) with Treasury yields to compute spread:
df_corp = pd.read_csv("corp_bonds.csv") df_treas = pd.read_parquet("raw/yield_10y.parquet") df_merge = pd.merge(df_corp, df_treas, on="Date") df_merge["CreditSpread"] = df_merge["CorpYield"] - df_merge["Yield10"] df_merge.to_parquet("raw/credit_spreads.parquet", index=False)
 
- Merged corporate bond yields (from CSV) with Treasury yields to compute spread:
- 
Sentiment Fetch: - NewsAPI for headlines, stored JSON → DataFrame with sentiment score (using textblob):from newsapi import NewsApiClient newsapi = NewsApiClient(api_key="KEY") articles = newsapi.get_everything(q="finance", from_param=today) df_news = pd.DataFrame(articles["articles"]) df_news["sentiment"] = df_news["description"].apply(lambda t: TextBlob(t).sentiment.polarity) df_news.to_parquet("raw/news_sentiment.parquet", index=False)
 
- NewsAPI for headlines, stored JSON → DataFrame with sentiment score (using 
- 
Storage in S3: - After validation, each Parquet file is uploaded to an S3 bucket:
import boto3 s3 = boto3.client("s3") s3.upload_file("raw/vix.parquet", "my-bucket", "quant/raw/vix.parquet")
 
- After validation, each Parquet file is uploaded to an S3 bucket:
Challenges & Solutions
- Schema drift: feed providers occasionally change column names. I added assertions and fallback renaming logic in each task.
- Network retries: some downloads failed intermittently. I wrapped fetch calls in retries with exponential backoff.
Next Steps
- Add delta ingestion for incremental updates (only fetch newest rows).
- Integrate streaming Kafka for live tweet ingestion.
- Build monitoring dashboards (Alert on missing data).
# This pipeline laid the foundation. Next, I’ll build feature engineering on top of these clean raw tables.
