diff --git a/quickstart_etl/assets/nyboe.py b/quickstart_etl/assets/nyboe.py index 71debc0..3a03839 100644 --- a/quickstart_etl/assets/nyboe.py +++ b/quickstart_etl/assets/nyboe.py @@ -3,12 +3,13 @@ from io import StringIO import pandas as pd import requests -from dagster import asset import dagster as dg import datetime import pendulum +from sqlalchemy import create_engine + IE_ENDPOINT = "https://publicreporting.elections.ny.gov/IndependentExpenditure" def get_cookies(s: requests.Session, from_date: datetime.date, to_date: datetime.date): @@ -56,12 +57,12 @@ def gen_ie_query(from_date: datetime.date, to_date: datetime.date): 'gridView24HourIE_length': '10', } -@asset( +@dg.asset( group_name="nyboe", compute_kind="NYBOE API", - partitions_def=dg.DailyPartitionsDefinition(start_date="2025-05-10") + partitions_def=dg.DailyPartitionsDefinition(start_date="2025-05-10"), ) -def fetch_expenditures(context: dg.AssetExecutionContext) -> None: +def fetch_expenditures(context: dg.AssetExecutionContext) -> dg.MaterializeResult: """Fetch the day before the partition date""" end_date = pendulum.parse(context.partition_key).subtract(days=1) @@ -75,80 +76,17 @@ def fetch_expenditures(context: dg.AssetExecutionContext) -> None: params=gen_ie_query(start_date, end_date), ) df = pd.read_csv(StringIO(req.text), index_col=False) - os.makedirs("data", exist_ok=True) - with open(f"data/expenditures_{end_date.format("YYYYMMDD")}.parquet", "wb") as f: - df.to_parquet(f) - return None + engine = create_engine("postgresql://superset:PASSWORD@IP_ADDR/superset") + df.to_sql( + "independent_expenditures_raw", + con=engine, + if_exists="append", + ) -# @asset(deps=[topstory_ids], group_name="nyboe", compute_kind="HackerNews API") -# def topstories(context: AssetExecutionContext) -> MaterializeResult: -# """Get items based on story ids from the HackerNews items endpoint. It may take 30 seconds to fetch all 100 items. - -# API Docs: https://github.com/HackerNews/API#items -# """ -# with open("data/topstory_ids.json") as f: -# topstory_ids = json.load(f) - -# results = [] -# for item_id in topstory_ids: -# item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json() -# results.append(item) - -# if len(results) % 20 == 0: -# context.log.info(f"Got {len(results)} items so far.") - -# df = pd.DataFrame(results) -# df.to_csv("data/topstories.csv") - -# return MaterializeResult( -# metadata={ -# "num_records": len(df), # Metadata can be any key-value pair -# "preview": MetadataValue.md(df.head().to_markdown()), -# # The `MetadataValue` class has useful static methods to build Metadata -# } -# ) - - -# @asset(deps=[topstories], group_name="nyboe", compute_kind="Plot") -# def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult: -# """Get the top 25 most frequent words in the titles of the top 100 HackerNews stories.""" -# stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"] - -# topstories = pd.read_csv("data/topstories.csv") - -# # loop through the titles and count the frequency of each word -# word_counts = {} -# for raw_title in topstories["title"]: -# title = raw_title.lower() -# for word in title.split(): -# cleaned_word = word.strip(".,-!?:;()[]'\"-") -# if cleaned_word not in stopwords and len(cleaned_word) > 0: -# word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1 - -# # Get the top 25 most frequent words -# top_words = { -# pair[0]: pair[1] -# for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25] -# } - -# # Make a bar chart of the top 25 words -# plt.figure(figsize=(10, 6)) -# plt.bar(list(top_words.keys()), list(top_words.values())) -# plt.xticks(rotation=45, ha="right") -# plt.title("Top 25 Words in Hacker News Titles") -# plt.tight_layout() - -# # Convert the image to a saveable format -# buffer = BytesIO() -# plt.savefig(buffer, format="png") -# image_data = base64.b64encode(buffer.getvalue()) - -# # Convert the image to Markdown to preview it within Dagster -# md_content = f"![img](data:image/png;base64,{image_data.decode()})" - -# with open("data/most_frequent_words.json", "w") as f: -# json.dump(top_words, f) - -# # Attach the Markdown content as metadata to the asset -# return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)}) + return dg.MaterializeResult( + metadata={ + "num_records": len(df), + "preview": dg.MetadataValue.md(df.head().to_markdown()), + } + )