appears to be working
This commit is contained in:
@@ -1,96 +0,0 @@
|
|||||||
import base64
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from io import BytesIO
|
|
||||||
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
import pandas as pd
|
|
||||||
import requests
|
|
||||||
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset
|
|
||||||
|
|
||||||
|
|
||||||
@asset(group_name="hackernews", compute_kind="HackerNews API")
|
|
||||||
def topstory_ids() -> None:
|
|
||||||
"""Get up to 100 top stories from the HackerNews topstories endpoint.
|
|
||||||
|
|
||||||
API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
|
|
||||||
"""
|
|
||||||
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
|
|
||||||
top_new_story_ids = requests.get(newstories_url).json()[:100]
|
|
||||||
|
|
||||||
os.makedirs("data", exist_ok=True)
|
|
||||||
with open("data/topstory_ids.json", "w") as f:
|
|
||||||
json.dump(top_new_story_ids, f)
|
|
||||||
|
|
||||||
|
|
||||||
@asset(deps=[topstory_ids], group_name="hackernews", compute_kind="HackerNews API")
|
|
||||||
def topstories(context: AssetExecutionContext) -> MaterializeResult:
|
|
||||||
"""Get items based on story ids from the HackerNews items endpoint. It may take 30 seconds to fetch all 100 items.
|
|
||||||
|
|
||||||
API Docs: https://github.com/HackerNews/API#items
|
|
||||||
"""
|
|
||||||
with open("data/topstory_ids.json") as f:
|
|
||||||
topstory_ids = json.load(f)
|
|
||||||
|
|
||||||
results = []
|
|
||||||
for item_id in topstory_ids:
|
|
||||||
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
|
|
||||||
results.append(item)
|
|
||||||
|
|
||||||
if len(results) % 20 == 0:
|
|
||||||
context.log.info(f"Got {len(results)} items so far.")
|
|
||||||
|
|
||||||
df = pd.DataFrame(results)
|
|
||||||
df.to_csv("data/topstories.csv")
|
|
||||||
|
|
||||||
return MaterializeResult(
|
|
||||||
metadata={
|
|
||||||
"num_records": len(df), # Metadata can be any key-value pair
|
|
||||||
"preview": MetadataValue.md(df.head().to_markdown()),
|
|
||||||
# The `MetadataValue` class has useful static methods to build Metadata
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@asset(deps=[topstories], group_name="hackernews", compute_kind="Plot")
|
|
||||||
def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult:
|
|
||||||
"""Get the top 25 most frequent words in the titles of the top 100 HackerNews stories."""
|
|
||||||
stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]
|
|
||||||
|
|
||||||
topstories = pd.read_csv("data/topstories.csv")
|
|
||||||
|
|
||||||
# loop through the titles and count the frequency of each word
|
|
||||||
word_counts = {}
|
|
||||||
for raw_title in topstories["title"]:
|
|
||||||
title = raw_title.lower()
|
|
||||||
for word in title.split():
|
|
||||||
cleaned_word = word.strip(".,-!?:;()[]'\"-")
|
|
||||||
if cleaned_word not in stopwords and len(cleaned_word) > 0:
|
|
||||||
word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1
|
|
||||||
|
|
||||||
# Get the top 25 most frequent words
|
|
||||||
top_words = {
|
|
||||||
pair[0]: pair[1]
|
|
||||||
for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
|
|
||||||
}
|
|
||||||
|
|
||||||
# Make a bar chart of the top 25 words
|
|
||||||
plt.figure(figsize=(10, 6))
|
|
||||||
plt.bar(list(top_words.keys()), list(top_words.values()))
|
|
||||||
plt.xticks(rotation=45, ha="right")
|
|
||||||
plt.title("Top 25 Words in Hacker News Titles")
|
|
||||||
plt.tight_layout()
|
|
||||||
|
|
||||||
# Convert the image to a saveable format
|
|
||||||
buffer = BytesIO()
|
|
||||||
plt.savefig(buffer, format="png")
|
|
||||||
image_data = base64.b64encode(buffer.getvalue())
|
|
||||||
|
|
||||||
# Convert the image to Markdown to preview it within Dagster
|
|
||||||
md_content = f"})"
|
|
||||||
|
|
||||||
with open("data/most_frequent_words.json", "w") as f:
|
|
||||||
json.dump(top_words, f)
|
|
||||||
|
|
||||||
# Attach the Markdown content as metadata to the asset
|
|
||||||
return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})
|
|
||||||
@@ -57,11 +57,16 @@ def gen_ie_query(from_date: datetime.date, to_date: datetime.date):
|
|||||||
'gridView24HourIE_length': '10',
|
'gridView24HourIE_length': '10',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NY_DAILY_PARTITION = dg.DailyPartitionsDefinition(
|
||||||
|
start_date="2025-01-01",
|
||||||
|
timezone="America/New_York",
|
||||||
|
)
|
||||||
|
|
||||||
@dg.asset(
|
@dg.asset(
|
||||||
group_name="nyboe",
|
group_name="nyboe",
|
||||||
compute_kind="NYBOE API",
|
compute_kind="NYBOE API",
|
||||||
partitions_def=dg.DailyPartitionsDefinition(start_date="2025-05-10"),
|
partitions_def=NY_DAILY_PARTITION,
|
||||||
)
|
)
|
||||||
def fetch_expenditures(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
|
def fetch_expenditures(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
|
||||||
"""Fetch the day before the partition date"""
|
"""Fetch the day before the partition date"""
|
||||||
end_date = pendulum.parse(context.partition_key).subtract(days=1)
|
end_date = pendulum.parse(context.partition_key).subtract(days=1)
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import dagster as dg
|
||||||
|
|
||||||
from dagster import (
|
from dagster import (
|
||||||
Definitions,
|
Definitions,
|
||||||
ScheduleDefinition,
|
ScheduleDefinition,
|
||||||
@@ -14,39 +16,16 @@ from dagster._core.definitions.metadata.source_code import AnchorBasedFilePathMa
|
|||||||
|
|
||||||
from . import assets
|
from . import assets
|
||||||
|
|
||||||
daily_refresh_schedule = ScheduleDefinition(
|
nyboe_assets = with_source_code_references(load_assets_from_package_module(assets))
|
||||||
job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
|
nyboe_job = define_asset_job("nyboe_job", selection=nyboe_assets)
|
||||||
)
|
daily_job_schedule = dg.build_schedule_from_partitioned_job(
|
||||||
|
nyboe_job,
|
||||||
|
hour_of_day=1,
|
||||||
@op
|
minute_of_hour=9,
|
||||||
def foo_op():
|
default_status=dg.DefaultScheduleStatus.RUNNING,
|
||||||
return 5
|
|
||||||
|
|
||||||
|
|
||||||
@graph_asset
|
|
||||||
def my_asset():
|
|
||||||
return foo_op()
|
|
||||||
|
|
||||||
|
|
||||||
my_assets = with_source_code_references(
|
|
||||||
[
|
|
||||||
my_asset,
|
|
||||||
*load_assets_from_package_module(assets),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
my_assets = link_code_references_to_git(
|
|
||||||
assets_defs=my_assets,
|
|
||||||
git_url="https://github.com/dagster-io/dagster/",
|
|
||||||
git_branch="master",
|
|
||||||
file_path_mapping=AnchorBasedFilePathMapping(
|
|
||||||
local_file_anchor=Path(__file__).parent,
|
|
||||||
file_anchor_path_in_repository="examples/quickstart_etl/quickstart_etl/",
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
defs = Definitions(
|
defs = Definitions(
|
||||||
assets=my_assets,
|
assets=nyboe_assets,
|
||||||
schedules=[daily_refresh_schedule],
|
schedules=[daily_job_schedule],
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user