got initial fetching working

This commit is contained in:
Alex Fox
2025-05-20 03:00:11 +00:00
parent ef80c94d2e
commit 4a4f4c1a5d

View File

@@ -0,0 +1,154 @@
import os
from io import StringIO
import pandas as pd
import requests
from dagster import asset
import dagster as dg
import datetime
import pendulum
IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure"
def get_cookies(s: requests.Session, from_date: datetime.date, to_date: datetime.date):
"""Fetch cookies into session"""
cookie_postdata = {
'lstUCOfficeType': '0',
'ddlType': '',
'txtName': '',
'txtExpenderName': '',
'txtExpenseRecipientName': '',
'lstUCCounty': '',
'lstUCMuncipality': '',
'lstUCOffice': '',
'lstUCDistrict': '',
'txtDateFrom': from_date.strftime('%m/%d/%Y'),
'txtDateTo': to_date.strftime('%m/%d/%Y'),
'lstUCYear': '- Select -',
'ddlDateType': 'Submitted',
'ddlSearchBy': 'All'
}
return s.post(f"{IE_ENDPOINT}/BindIndExpData/", json=cookie_postdata)
def gen_ie_query(from_date: datetime.date, to_date: datetime.date):
"""Fill in query parameters for independent expenditures and date range"""
return {
'lstUCOfficeType': '0',
'lstUCCounty': '',
'lstUCMuncipality': '',
'ddlSearchBy': '1',
'txtFilerId': '',
'txtName': '',
'txtExpenderName': '',
'ddlAutoCompleteConName': '',
'txtExpenseRecipientName': '',
'lstAutoCompleteCommittee': '',
'lstElectionType': '',
'lstUCDistrict': '',
'ddlSelectDate': '2',
'lstUCYear': '- Select -',
'txtDateFrom': from_date.strftime('%m/%d/%Y'),
'txtDateTo': to_date.strftime('%m/%d/%Y'),
'ddlDateType': '2',
'Command': 'CSV',
'gridView24HourIE_length': '10',
}
@asset(
group_name="nyboe",
compute_kind="NYBOE API",
partitions_def=dg.DailyPartitionsDefinition(start_date="2025-05-10")
)
def fetch_expenditures(context: dg.AssetExecutionContext) -> None:
"""Fetch the day before the partition date"""
end_date = pendulum.parse(context.partition_key).subtract(days=1)
start_date = end_date.subtract(days=1)
with requests.Session() as s:
res = get_cookies(s, start_date, end_date)
if not res.json()["aaData"]:
return None
req = s.get(f"{IE_ENDPOINT}/IndependentExpenditure",
params=gen_ie_query(start_date, end_date),
)
df = pd.read_csv(StringIO(req.text), index_col=False)
os.makedirs("data", exist_ok=True)
with open(f"data/expenditures_{end_date.format("YYYYMMDD")}.parquet", "wb") as f:
df.to_parquet(f)
return None
# @asset(deps=[topstory_ids], group_name="nyboe", compute_kind="HackerNews API")
# def topstories(context: AssetExecutionContext) -> MaterializeResult:
# """Get items based on story ids from the HackerNews items endpoint. It may take 30 seconds to fetch all 100 items.
# API Docs: https://github.com/HackerNews/API#items
# """
# with open("data/topstory_ids.json") as f:
# topstory_ids = json.load(f)
# results = []
# for item_id in topstory_ids:
# item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
# results.append(item)
# if len(results) % 20 == 0:
# context.log.info(f"Got {len(results)} items so far.")
# df = pd.DataFrame(results)
# df.to_csv("data/topstories.csv")
# return MaterializeResult(
# metadata={
# "num_records": len(df), # Metadata can be any key-value pair
# "preview": MetadataValue.md(df.head().to_markdown()),
# # The `MetadataValue` class has useful static methods to build Metadata
# }
# )
# @asset(deps=[topstories], group_name="nyboe", compute_kind="Plot")
# def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult:
# """Get the top 25 most frequent words in the titles of the top 100 HackerNews stories."""
# stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]
# topstories = pd.read_csv("data/topstories.csv")
# # loop through the titles and count the frequency of each word
# word_counts = {}
# for raw_title in topstories["title"]:
# title = raw_title.lower()
# for word in title.split():
# cleaned_word = word.strip(".,-!?:;()[]'\"-")
# if cleaned_word not in stopwords and len(cleaned_word) > 0:
# word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1
# # Get the top 25 most frequent words
# top_words = {
# pair[0]: pair[1]
# for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
# }
# # Make a bar chart of the top 25 words
# plt.figure(figsize=(10, 6))
# plt.bar(list(top_words.keys()), list(top_words.values()))
# plt.xticks(rotation=45, ha="right")
# plt.title("Top 25 Words in Hacker News Titles")
# plt.tight_layout()
# # Convert the image to a saveable format
# buffer = BytesIO()
# plt.savefig(buffer, format="png")
# image_data = base64.b64encode(buffer.getvalue())
# # Convert the image to Markdown to preview it within Dagster
# md_content = f"![img](data:image/png;base64,{image_data.decode()})"
# with open("data/most_frequent_words.json", "w") as f:
# json.dump(top_words, f)
# # Attach the Markdown content as metadata to the asset
# return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})