strip down to just nyboe code

This commit is contained in:
Alex Fox
2025-05-20 12:30:19 +00:00
parent 4a4f4c1a5d
commit 1683426275

View File

@@ -3,12 +3,13 @@ from io import StringIO
import pandas as pd import pandas as pd
import requests import requests
from dagster import asset
import dagster as dg import dagster as dg
import datetime import datetime
import pendulum import pendulum
from sqlalchemy import create_engine
IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure" IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure"
def get_cookies(s: requests.Session, from_date: datetime.date, to_date: datetime.date): def get_cookies(s: requests.Session, from_date: datetime.date, to_date: datetime.date):
@@ -56,12 +57,12 @@ def gen_ie_query(from_date: datetime.date, to_date: datetime.date):
'gridView24HourIE_length': '10', 'gridView24HourIE_length': '10',
} }
@asset( @dg.asset(
group_name="nyboe", group_name="nyboe",
compute_kind="NYBOE API", compute_kind="NYBOE API",
partitions_def=dg.DailyPartitionsDefinition(start_date="2025-05-10") partitions_def=dg.DailyPartitionsDefinition(start_date="2025-05-10"),
) )
def fetch_expenditures(context: dg.AssetExecutionContext) -> None: def fetch_expenditures(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Fetch the day before the partition date""" """Fetch the day before the partition date"""
end_date = pendulum.parse(context.partition_key).subtract(days=1) end_date = pendulum.parse(context.partition_key).subtract(days=1)
@@ -75,80 +76,17 @@ def fetch_expenditures(context: dg.AssetExecutionContext) -> None:
params=gen_ie_query(start_date, end_date), params=gen_ie_query(start_date, end_date),
) )
df = pd.read_csv(StringIO(req.text), index_col=False) df = pd.read_csv(StringIO(req.text), index_col=False)
os.makedirs("data", exist_ok=True)
with open(f"data/expenditures_{end_date.format("YYYYMMDD")}.parquet", "wb") as f:
df.to_parquet(f)
return None engine = create_engine("postgresql://superset:PASSWORD@IP_ADDR/superset")
df.to_sql(
"independent_expenditures_raw",
con=engine,
if_exists="append",
)
# @asset(deps=[topstory_ids], group_name="nyboe", compute_kind="HackerNews API") return dg.MaterializeResult(
# def topstories(context: AssetExecutionContext) -> MaterializeResult: metadata={
# """Get items based on story ids from the HackerNews items endpoint. It may take 30 seconds to fetch all 100 items. "num_records": len(df),
"preview": dg.MetadataValue.md(df.head().to_markdown()),
# API Docs: https://github.com/HackerNews/API#items }
# """ )
# with open("data/topstory_ids.json") as f:
# topstory_ids = json.load(f)
# results = []
# for item_id in topstory_ids:
# item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
# results.append(item)
# if len(results) % 20 == 0:
# context.log.info(f"Got {len(results)} items so far.")
# df = pd.DataFrame(results)
# df.to_csv("data/topstories.csv")
# return MaterializeResult(
# metadata={
# "num_records": len(df), # Metadata can be any key-value pair
# "preview": MetadataValue.md(df.head().to_markdown()),
# # The `MetadataValue` class has useful static methods to build Metadata
# }
# )
# @asset(deps=[topstories], group_name="nyboe", compute_kind="Plot")
# def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult:
# """Get the top 25 most frequent words in the titles of the top 100 HackerNews stories."""
# stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]
# topstories = pd.read_csv("data/topstories.csv")
# # loop through the titles and count the frequency of each word
# word_counts = {}
# for raw_title in topstories["title"]:
# title = raw_title.lower()
# for word in title.split():
# cleaned_word = word.strip(".,-!?:;()[]'\"-")
# if cleaned_word not in stopwords and len(cleaned_word) > 0:
# word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1
# # Get the top 25 most frequent words
# top_words = {
# pair[0]: pair[1]
# for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
# }
# # Make a bar chart of the top 25 words
# plt.figure(figsize=(10, 6))
# plt.bar(list(top_words.keys()), list(top_words.values()))
# plt.xticks(rotation=45, ha="right")
# plt.title("Top 25 Words in Hacker News Titles")
# plt.tight_layout()
# # Convert the image to a saveable format
# buffer = BytesIO()
# plt.savefig(buffer, format="png")
# image_data = base64.b64encode(buffer.getvalue())
# # Convert the image to Markdown to preview it within Dagster
# md_content = f"![img](data:image/png;base64,{image_data.decode()})"
# with open("data/most_frequent_words.json", "w") as f:
# json.dump(top_words, f)
# # Attach the Markdown content as metadata to the asset
# return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})