import os from io import StringIO import pandas as pd import curl_cffi import dagster as dg import datetime import pendulum from sqlalchemy import create_engine IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure" def get_cookies(s: curl_cffi.Session, from_date: datetime.date, to_date: datetime.date): """Fetch cookies into session""" cookie_postdata = { 'lstUCOfficeType': '0', 'ddlType': '', 'txtName': '', 'txtExpenderName': '', 'txtExpenseRecipientName': '', 'lstUCCounty': '', 'lstUCMuncipality': '', 'lstUCOffice': '', 'lstUCDistrict': '', 'txtDateFrom': from_date.strftime('%m/%d/%Y'), 'txtDateTo': to_date.strftime('%m/%d/%Y'), 'lstUCYear': '- Select -', 'ddlDateType': 'Submitted', 'ddlSearchBy': 'All' } return s.post( f"{IE_ENDPOINT}/BindIndExpData/", data=cookie_postdata, impersonate="chrome", ) def gen_ie_query(from_date: datetime.date, to_date: datetime.date): """Fill in query parameters for independent expenditures and date range""" return { 'lstUCOfficeType': '0', 'lstUCCounty': '', 'lstUCMuncipality': '', 'ddlSearchBy': '1', 'txtFilerId': '', 'txtName': '', 'txtExpenderName': '', 'ddlAutoCompleteConName': '', 'txtExpenseRecipientName': '', 'lstAutoCompleteCommittee': '', 'lstElectionType': '', 'lstUCDistrict': '', 'ddlSelectDate': '2', 'lstUCYear': '- Select -', 'txtDateFrom': from_date.strftime('%m/%d/%Y'), 'txtDateTo': to_date.strftime('%m/%d/%Y'), 'ddlDateType': '2', 'Command': 'CSV', 'gridView24HourIE_length': '10', } NY_DAILY_PARTITION = dg.DailyPartitionsDefinition( start_date="2024-01-01", timezone="America/New_York", ) @dg.asset( group_name="nyboe", compute_kind="NYBOE API", partitions_def=NY_DAILY_PARTITION, ) def fetch_expenditures(context: dg.AssetExecutionContext) -> dg.MaterializeResult: """Fetch the day before the partition date""" end_date = pendulum.parse(context.partition_key).subtract(days=1) start_date = end_date.subtract(days=1) with curl_cffi.Session() as s: res = get_cookies(s, start_date, end_date) if not res.json()["aaData"]: return None req = s.get(f"{IE_ENDPOINT}/IndependentExpenditure", params=gen_ie_query(start_date, end_date), impersonate="chrome", ) df = pd.read_csv(StringIO(req.text), index_col=False) engine = create_engine("postgresql://superset:PASSWORD@IP_ADDR/superset") df.to_sql( "independent_expenditures_raw_redo", con=engine, if_exists="append", ) return dg.MaterializeResult( metadata={ "num_records": len(df), "preview": dg.MetadataValue.md(df.head().to_markdown()), } )