-
Notifications
You must be signed in to change notification settings - Fork 56
Add Sample-level Logging API #486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Thanks for the PR! Multiple people are eager to see it landing.
I left some comments. I think we are close to landing, but we need to make some changes. Could you please solve the merge conflicts too? Thank you for moving forward with this!
src/forge/observability/metrics.py
Outdated
| async def finish(self) -> None: | ||
| pass | ||
|
|
||
| async def log_samples(self, samples: Dict[str, List[dict]], step: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am debating with myself if we should have log_batch_samples and log_stream_samples.
If we keep only 1, then:
- its a bit weird that we have
log_streamandlog_batch, but this doesnt apply to samples. - We also need to do asyncio.create_task.
- in the stream mode, its a bit too verbose with all the "=====" printing.
If we keep 2, then its a new method for every backend.
Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the stream mode, its a bit too verbose with all the "=====" printing.
What do you expect to see in stream mode?
|
thanks @DNXie! i will review it tomorrow. Do you mind solving the conflicts too when you have a chance? |
felipemello1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some follow up comments. Will finish the review soon.
| if self.reward_breakdown is not None and "reward_breakdown" not in exclude: | ||
| result.update(self.reward_breakdown) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is flattening the rewards, but it is not obvious that this is what we are doing. I wonder if (a) we should flatten. But assuming we should, lets add a comment here.
Do you see any strong argument for not leaving it as a dictionary? not sure if this is poorly displayed in wandb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The record_episode_sample function needs this to be flattened. I can flatten this alternatively in record_episode_sample, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should delete record_episode_sample :X
Why does it need it to be flat?
| request: str | None = None | ||
| response: str | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need those? I believe this is redundant. They are already in Completion. It would be redundant.
https://github.com/DNXie/forge/blob/main/src/forge/data_models/completion.py#L19C5-L23C14
| tensor = F.pad(tensor, (0, diff), value=self.pad_id) | ||
| return tensor | ||
|
|
||
| def to_dict(self, exclude: list[str] | None = None) -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking that we could add some of the fields from Completion, i.e. prompt, text, stop_reason, generator_version, metadata. Wdyt?
| return reduced_metrics | ||
|
|
||
|
|
||
| def record_episode_sample(table_name: str, episode): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can delete this and in the code just do:
sample = episode.to_dict(exclude=["ref_logprobs", "completion"])
record_metric(table_name, sample, Reduce.SAMPLE)
I dont think we gain much with this abstraction
| MAX = "max" | ||
| MIN = "min" | ||
| STD = "std" | ||
| SAMPLE = "sample" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am bothered by the term "Sample". In reality, we can log anything that is a dictionary / table. It doesnt need to be a sample, right?
It is like having a Reduce.TIME, when we are just doing .mean, for example.
I think we could rename it to TABLE, and keep it aligned with wandb API, which is log.table.
i.e., rename sample -> table everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before you go with the change, do you see a better alternative? Should it be sample -> dict?
|
|
||
| def __init__( | ||
| self, reduction: Reduce, top_k: int = 1, bottom_k: int = 1, key: str = "reward" | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that the key name should be "score". If someone wants to rank based on anything that is not reward (e.g. length), they can just do:
mydict["score"] = len(mydict["response"])
| logger.info(f"========== SAMPLE LOGS STEP {step} ==========") | ||
| for sample in samples: | ||
| table_name, table_rows = sample.key, sample.value | ||
| logger.info(f"[{table_name}] ({len(table_rows)} samples)") | ||
| logger.info(json.dumps(table_rows, indent=2, ensure_ascii=False)) | ||
| logger.info("==============================================\n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logger.info(f"========== SAMPLE LOGS STEP {step} ==========") | |
| for sample in samples: | |
| table_name, table_rows = sample.key, sample.value | |
| logger.info(f"[{table_name}] ({len(table_rows)} samples)") | |
| logger.info(json.dumps(table_rows, indent=2, ensure_ascii=False)) | |
| logger.info("==============================================\n") | |
| for sample in samples: | |
| table_name, table_rows = sample.key, sample.value | |
| logger.info(f"[{table_name}] ({len(table_rows)} samples)") | |
| logger.info(json.dumps(table_rows, indent=2, ensure_ascii=False)) |
| # Convert to list if single sample. This happens when logging stream | ||
| if isinstance(table_rows, dict): | ||
| table_rows = [table_rows] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead, when creating the metric, can we make sure that its always a list, so we dont need this check in multiple places?
| else: | ||
| table = self._tables[table_name] | ||
|
|
||
| # Add rows (fill missing columns with None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the table_rows has a new column not present in table.columns? it looks like we would drop this value silently. Can we add a warning? I think we can do that with sets. I havent used python sets api in a long time, but i think something like:
input_extra_columns = table_rows.diff(set(table.columns))
if input_extra_columns>
logger.info("input table {table_name} has not expected column {input_extra_columns}. Extra columns will be ignored.")
This PR introduces structured sample-level logging to complement existing scalar metrics, allowing users to inspect concrete prompt–response–reward examples during RL training.
More discussion: #301
The current implementation logs 2 samples: one top (highest reward) and one bottom (lowest reward) for each step. Supporting customized sampling strategy is out of scope of this PR. For now, this can be achieved by changing the filter for
SampleAccumulator. For example:Summary of Changes
SampleAccumulatorto support logging structured dict samples (e.g., per-episode data) viarecord_episode_sampleAPI withReduce.SAMPLE.TopBottomKFilterfor selecting top/bottom samples based on reward (heap-based, O(log k) per append).wandb.Tablefor live sample inspection.Logged Fields
Each logged sample includes:
{ "episode_id": episode.episode_id, "policy_version": episode.policy_version, "prompt": episode.request, "response": episode.response, "target": str(episode.target), **( episode.reward_breakdown or {} ), # per-fn breakdown including the average reward "advantage": episode.advantage, "request_len": episode.request_len, "response_len": episode.response_len, "pad_id": episode.pad_id, }Tests:
python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yamlwith modes:global_reduceper_rank_reduce: The table is logged in the runController_xxxper_rank_no_reduceThe table is logged in the runController_xxxand with higher latency.Backend: wandb
Backend: Console:
========== SAMPLE LOGS STEP 1 ========== [rollout/sample] (2 samples) { "episode_id": "ea2f341b-bf7a-4647-9913-c5f2c8bb525f", "policy_version": 0, "prompt": "<|im_start|>system\n\n Put all your scratchpad work between <think> and </think> tags.\n Your final answer should be between <answer> and </answer> tags otherwise it will not be scored.\n <|im_end|>\n<|im_start|>user\nJames buys $3000 worth of stuff from Amazon. He has to return a TV that cost $700 and a bike that cost $500. He also sells another bike that cost 20% more than the bike he returned for 80% of what he bought it for. He then buys a toaster for $100. How much is he out of pocket for everything?<|im_end|>\n<|im_start|>assistant\n", "response": "<think>\nOkay, let me try to figure out how much James is out of pocket for everything. Hmm, let's start by breaking down the problem step by step.\n\nFirst, James buys $3000 worth of stuff from Amazon. So, his initial total cost is $3000. But then he has to return a TV that cost $700 and a bike that cost $500. Wait, so when he returns those items, he's getting them back, right? So maybe that means he's losing the value of those items? But I need to check if that's how it's calculated. \n\nBut the question is about how much he's out of pocket. So, when he returns the TV and bike, he's probably losing the amount he paid for them. So, the TV was $700, bike was $500. So, total money he gave back is 700 + 500 = $1200. But he had $3000 initially. So, if he returns $1200, he's out of pocket for those items? Wait, but maybe he has to pay for them back? Or does returning them mean he's just not keeping them? \n\nWait, maybe I need to think in terms of net cash flow. Let me think. When he buys stuff, he spends money. When he returns stuff, he gives money back, so that's like a deduction from his total expenditure. \n\nSo, initial purchase: $3000. Then he returns $700 (TV) and $500 (bike). So, total returned is $1200. Therefore, his net expenditure so far is $3000 - $1200 = $1800. But then he sells another bike that cost 20% more than the bike he returned. Wait, the bike he returned was $500, so the bike he sold is 20% more than that. Let me calculate that. 20% of 500 is 100, so the bike he sold cost 500 + 100 = $600. But he sells it for 80% of what he bought it for. So, he bought it for $600, so 80% of that is 0.8 * 600 = $480. So", "target": "2020", "MathReward": 0.0, "ThinkingReward": 0.2, "reward": 0.1, "advantage": -7.449986878782511e-05, "ref_logprobs": -1.359375, "request_len": 512, "response_len": 512, "pad_id": 151643 } { "episode_id": "b0a1d37d-c3a6-403a-8e9f-8ae510b6702a", "policy_version": 0, "prompt": "<|im_start|>system\n\n Put all your scratchpad work between <think> and </think> tags.\n Your final answer should be between <answer> and </answer> tags otherwise it will not be scored.\n <|im_end|>\n<|im_start|>user\nJohn uses 5 liters of fuel per km to travel. How many liters of fuel should John plan to use if he plans to travel on two trips of 30 km and 20 km?<|im_end|>\n<|im_start|>assistant\n", "response": "<think>\nOkay, let's see. John is traveling twice, first 30 km and then 20 km. He uses 5 liters per km. So I need to calculate the total fuel needed for both trips.\n\nFirst, I should figure out how much fuel he uses for each trip separately and then add them together. For the first trip, 30 km multiplied by 5 liters per km. Let me do that calculation: 30 times 5 is 150 liters. Then for the second trip, 20 km times 5 liters per km. 20 times 5 is 100 liters. Adding those two amounts together, 150 plus 100 equals 250 liters. So John should plan to use 250 liters of fuel in total. I think that's it. Let me double-check. 30 km trip uses 150 liters, 20 km uses 100. Total is indeed 250. Yep, that seems right.\n</think>\n\nJohn uses 5 liters of fuel per km. For the first trip of 30 km, he uses $30 \\times 5 = 150$ liters. For the second trip of 20 km, he uses $20 \\times 5 = 100$ liters. Total fuel required is $150 + 100 = 250$ liters.\n\n<answer>250</answer>", "target": "250", "MathReward": 1.0, "ThinkingReward": 1.0, "reward": 1.0, "advantage": 0.5398026704788208, "ref_logprobs": -4.90625, "request_len": 512, "response_len": 512, "pad_id": 151643 } ==============================================Notes