diff --git a/quickstart_etl/assets/nyboe.py b/quickstart_etl/assets/nyboe.py new file mode 100644 index 0000000..71debc0 --- /dev/null +++ b/quickstart_etl/assets/nyboe.py @@ -0,0 +1,154 @@ +import os +from io import StringIO + +import pandas as pd +import requests +from dagster import asset +import dagster as dg + +import datetime +import pendulum + +IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure" + +def get_cookies(s: requests.Session, from_date: datetime.date, to_date: datetime.date): + """Fetch cookies into session""" + cookie_postdata = { + 'lstUCOfficeType': '0', + 'ddlType': '', + 'txtName': '', + 'txtExpenderName': '', + 'txtExpenseRecipientName': '', + 'lstUCCounty': '', + 'lstUCMuncipality': '', + 'lstUCOffice': '', + 'lstUCDistrict': '', + 'txtDateFrom': from_date.strftime('%m/%d/%Y'), + 'txtDateTo': to_date.strftime('%m/%d/%Y'), + 'lstUCYear': '- Select -', + 'ddlDateType': 'Submitted', + 'ddlSearchBy': 'All' + } + + return s.post(f"{IE_ENDPOINT}/BindIndExpData/", json=cookie_postdata) + +def gen_ie_query(from_date: datetime.date, to_date: datetime.date): + """Fill in query parameters for independent expenditures and date range""" + return { + 'lstUCOfficeType': '0', + 'lstUCCounty': '', + 'lstUCMuncipality': '', + 'ddlSearchBy': '1', + 'txtFilerId': '', + 'txtName': '', + 'txtExpenderName': '', + 'ddlAutoCompleteConName': '', + 'txtExpenseRecipientName': '', + 'lstAutoCompleteCommittee': '', + 'lstElectionType': '', + 'lstUCDistrict': '', + 'ddlSelectDate': '2', + 'lstUCYear': '- Select -', + 'txtDateFrom': from_date.strftime('%m/%d/%Y'), + 'txtDateTo': to_date.strftime('%m/%d/%Y'), + 'ddlDateType': '2', + 'Command': 'CSV', + 'gridView24HourIE_length': '10', + } + +@asset( + group_name="nyboe", + compute_kind="NYBOE API", + partitions_def=dg.DailyPartitionsDefinition(start_date="2025-05-10") + ) +def fetch_expenditures(context: dg.AssetExecutionContext) -> None: + """Fetch the day before the partition date""" + end_date = pendulum.parse(context.partition_key).subtract(days=1) + + start_date = end_date.subtract(days=1) + with requests.Session() as s: + res = get_cookies(s, start_date, end_date) + if not res.json()["aaData"]: + return None + + req = s.get(f"{IE_ENDPOINT}/IndependentExpenditure", + params=gen_ie_query(start_date, end_date), + ) + df = pd.read_csv(StringIO(req.text), index_col=False) + os.makedirs("data", exist_ok=True) + with open(f"data/expenditures_{end_date.format("YYYYMMDD")}.parquet", "wb") as f: + df.to_parquet(f) + + return None + +# @asset(deps=[topstory_ids], group_name="nyboe", compute_kind="HackerNews API") +# def topstories(context: AssetExecutionContext) -> MaterializeResult: +# """Get items based on story ids from the HackerNews items endpoint. It may take 30 seconds to fetch all 100 items. + +# API Docs: https://github.com/HackerNews/API#items +# """ +# with open("data/topstory_ids.json") as f: +# topstory_ids = json.load(f) + +# results = [] +# for item_id in topstory_ids: +# item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json() +# results.append(item) + +# if len(results) % 20 == 0: +# context.log.info(f"Got {len(results)} items so far.") + +# df = pd.DataFrame(results) +# df.to_csv("data/topstories.csv") + +# return MaterializeResult( +# metadata={ +# "num_records": len(df), # Metadata can be any key-value pair +# "preview": MetadataValue.md(df.head().to_markdown()), +# # The `MetadataValue` class has useful static methods to build Metadata +# } +# ) + + +# @asset(deps=[topstories], group_name="nyboe", compute_kind="Plot") +# def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult: +# """Get the top 25 most frequent words in the titles of the top 100 HackerNews stories.""" +# stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"] + +# topstories = pd.read_csv("data/topstories.csv") + +# # loop through the titles and count the frequency of each word +# word_counts = {} +# for raw_title in topstories["title"]: +# title = raw_title.lower() +# for word in title.split(): +# cleaned_word = word.strip(".,-!?:;()[]'\"-") +# if cleaned_word not in stopwords and len(cleaned_word) > 0: +# word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1 + +# # Get the top 25 most frequent words +# top_words = { +# pair[0]: pair[1] +# for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25] +# } + +# # Make a bar chart of the top 25 words +# plt.figure(figsize=(10, 6)) +# plt.bar(list(top_words.keys()), list(top_words.values())) +# plt.xticks(rotation=45, ha="right") +# plt.title("Top 25 Words in Hacker News Titles") +# plt.tight_layout() + +# # Convert the image to a saveable format +# buffer = BytesIO() +# plt.savefig(buffer, format="png") +# image_data = base64.b64encode(buffer.getvalue()) + +# # Convert the image to Markdown to preview it within Dagster +# md_content = f"![img](data:image/png;base64,{image_data.decode()})" + +# with open("data/most_frequent_words.json", "w") as f: +# json.dump(top_words, f) + +# # Attach the Markdown content as metadata to the asset +# return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})