102 lines
3.0 KiB
Python
102 lines
3.0 KiB
Python
import os
|
|
from io import StringIO
|
|
|
|
import pandas as pd
|
|
import curl_cffi
|
|
import dagster as dg
|
|
|
|
import datetime
|
|
import pendulum
|
|
|
|
from sqlalchemy import create_engine
|
|
|
|
IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure"
|
|
|
|
def get_cookies(s: curl_cffi.Session, from_date: datetime.date, to_date: datetime.date):
|
|
"""Fetch cookies into session"""
|
|
cookie_postdata = {
|
|
'lstUCOfficeType': '0',
|
|
'ddlType': '',
|
|
'txtName': '',
|
|
'txtExpenderName': '',
|
|
'txtExpenseRecipientName': '',
|
|
'lstUCCounty': '',
|
|
'lstUCMuncipality': '',
|
|
'lstUCOffice': '',
|
|
'lstUCDistrict': '',
|
|
'txtDateFrom': from_date.strftime('%m/%d/%Y'),
|
|
'txtDateTo': to_date.strftime('%m/%d/%Y'),
|
|
'lstUCYear': '- Select -',
|
|
'ddlDateType': 'Submitted',
|
|
'ddlSearchBy': 'All'
|
|
}
|
|
|
|
return s.post(
|
|
f"{IE_ENDPOINT}/BindIndExpData/",
|
|
data=cookie_postdata,
|
|
impersonate="chrome",
|
|
)
|
|
|
|
def gen_ie_query(from_date: datetime.date, to_date: datetime.date):
|
|
"""Fill in query parameters for independent expenditures and date range"""
|
|
return {
|
|
'lstUCOfficeType': '0',
|
|
'lstUCCounty': '',
|
|
'lstUCMuncipality': '',
|
|
'ddlSearchBy': '1',
|
|
'txtFilerId': '',
|
|
'txtName': '',
|
|
'txtExpenderName': '',
|
|
'ddlAutoCompleteConName': '',
|
|
'txtExpenseRecipientName': '',
|
|
'lstAutoCompleteCommittee': '',
|
|
'lstElectionType': '',
|
|
'lstUCDistrict': '',
|
|
'ddlSelectDate': '2',
|
|
'lstUCYear': '- Select -',
|
|
'txtDateFrom': from_date.strftime('%m/%d/%Y'),
|
|
'txtDateTo': to_date.strftime('%m/%d/%Y'),
|
|
'ddlDateType': '2',
|
|
'Command': 'CSV',
|
|
'gridView24HourIE_length': '10',
|
|
}
|
|
|
|
NY_DAILY_PARTITION = dg.DailyPartitionsDefinition(
|
|
start_date="2024-01-01",
|
|
timezone="America/New_York",
|
|
)
|
|
|
|
@dg.asset(
|
|
group_name="nyboe",
|
|
compute_kind="NYBOE API",
|
|
partitions_def=NY_DAILY_PARTITION,
|
|
)
|
|
def fetch_expenditures(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
|
|
"""Fetch the day before the partition date"""
|
|
target_date = pendulum.parse(context.partition_key)
|
|
|
|
with curl_cffi.Session() as s:
|
|
res = get_cookies(s, target_date, target_date)
|
|
if not res.json()["aaData"]:
|
|
return None
|
|
|
|
req = s.get(f"{IE_ENDPOINT}/IndependentExpenditure",
|
|
params=gen_ie_query(target_date, target_date),
|
|
impersonate="chrome",
|
|
)
|
|
df = pd.read_csv(StringIO(req.text), index_col=False)
|
|
|
|
engine = create_engine("postgresql://superset:PASSWORD@IP_ADDR/superset")
|
|
df.to_sql(
|
|
"independent_expenditures_raw",
|
|
con=engine,
|
|
if_exists="append",
|
|
)
|
|
|
|
return dg.MaterializeResult(
|
|
metadata={
|
|
"num_records": len(df),
|
|
"preview": dg.MetadataValue.md(df.head().to_markdown()),
|
|
}
|
|
)
|