Files
nyboe-dag/quickstart_etl/assets/nyboe.py
2025-05-25 22:04:09 +00:00

102 lines
3.0 KiB
Python

import os
from io import StringIO
import pandas as pd
import curl_cffi
import dagster as dg
import datetime
import pendulum
from sqlalchemy import create_engine
IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure"
def get_cookies(s: curl_cffi.Session, from_date: datetime.date, to_date: datetime.date):
"""Fetch cookies into session"""
cookie_postdata = {
'lstUCOfficeType': '0',
'ddlType': '',
'txtName': '',
'txtExpenderName': '',
'txtExpenseRecipientName': '',
'lstUCCounty': '',
'lstUCMuncipality': '',
'lstUCOffice': '',
'lstUCDistrict': '',
'txtDateFrom': from_date.strftime('%m/%d/%Y'),
'txtDateTo': to_date.strftime('%m/%d/%Y'),
'lstUCYear': '- Select -',
'ddlDateType': 'Submitted',
'ddlSearchBy': 'All'
}
return s.post(
f"{IE_ENDPOINT}/BindIndExpData/",
data=cookie_postdata,
impersonate="chrome",
)
def gen_ie_query(from_date: datetime.date, to_date: datetime.date):
"""Fill in query parameters for independent expenditures and date range"""
return {
'lstUCOfficeType': '0',
'lstUCCounty': '',
'lstUCMuncipality': '',
'ddlSearchBy': '1',
'txtFilerId': '',
'txtName': '',
'txtExpenderName': '',
'ddlAutoCompleteConName': '',
'txtExpenseRecipientName': '',
'lstAutoCompleteCommittee': '',
'lstElectionType': '',
'lstUCDistrict': '',
'ddlSelectDate': '2',
'lstUCYear': '- Select -',
'txtDateFrom': from_date.strftime('%m/%d/%Y'),
'txtDateTo': to_date.strftime('%m/%d/%Y'),
'ddlDateType': '2',
'Command': 'CSV',
'gridView24HourIE_length': '10',
}
NY_DAILY_PARTITION = dg.DailyPartitionsDefinition(
start_date="2024-01-01",
timezone="America/New_York",
)
@dg.asset(
group_name="nyboe",
compute_kind="NYBOE API",
partitions_def=NY_DAILY_PARTITION,
)
def fetch_expenditures(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Fetch the day before the partition date"""
target_date = pendulum.parse(context.partition_key)
with curl_cffi.Session() as s:
res = get_cookies(s, target_date, target_date)
if not res.json()["aaData"]:
return None
req = s.get(f"{IE_ENDPOINT}/IndependentExpenditure",
params=gen_ie_query(target_date, target_date),
impersonate="chrome",
)
df = pd.read_csv(StringIO(req.text), index_col=False)
engine = create_engine("postgresql://superset:PASSWORD@IP_ADDR/superset")
df.to_sql(
"independent_expenditures_raw",
con=engine,
if_exists="append",
)
return dg.MaterializeResult(
metadata={
"num_records": len(df),
"preview": dg.MetadataValue.md(df.head().to_markdown()),
}
)