From b78076cac75fc84d90c95ab401b98f3fa0fcbe1f Mon Sep 17 00:00:00 2001 From: teknium Date: Thu, 29 Jan 2026 06:04:13 +0000 Subject: [PATCH] Enhance trajectory_compressor.py with new input options and sampling functionality - Updated the main function to accept both single JSONL files and directories for compression. - Added support for sampling a percentage of trajectories before compression. - Improved usage documentation with detailed examples for various compression scenarios. - Enhanced error handling for input validation and dry run mode. - Streamlined output handling to manage temporary files during processing. --- trajectory_compressor.py | 220 ++++++++++++++++++++++++++++++++++----- 1 file changed, 192 insertions(+), 28 deletions(-) diff --git a/trajectory_compressor.py b/trajectory_compressor.py index 511b9b848..23a1d9910 100644 --- a/trajectory_compressor.py +++ b/trajectory_compressor.py @@ -14,9 +14,20 @@ Compression Strategy: 6. Keep remaining tool calls intact (model continues working after summary) Usage: - python trajectory_compressor.py --input_dir=data/my_run - python trajectory_compressor.py --input_dir=data/my_run --config=configs/trajectory_compression.yaml - python trajectory_compressor.py --input_dir=data/my_run --target_max_tokens=16000 + # Compress a directory of JSONL files + python trajectory_compressor.py --input=data/my_run + + # Compress a single JSONL file + python trajectory_compressor.py --input=data/trajectories.jsonl + + # Compress 15% sample of a file + python trajectory_compressor.py --input=data/trajectories.jsonl --sample_percent=15 + + # Compress with custom output and token target + python trajectory_compressor.py --input=data/trajectories.jsonl --output=compressed.jsonl --target_max_tokens=16000 + + # Compress 10% sample from a directory + python trajectory_compressor.py --input=data/my_run --sample_percent=10 """ import json @@ -1176,24 +1187,49 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix.""" def main( - input_dir: str, - output_dir: str = None, + input: str, + output: str = None, config: str = "configs/trajectory_compression.yaml", target_max_tokens: int = None, tokenizer: str = None, + sample_percent: float = None, + seed: int = 42, dry_run: bool = False, ): """ Compress agent trajectories to fit within a target token budget. + Supports both single JSONL files and directories containing multiple JSONL files. + Optionally sample a percentage of trajectories before compression. + Args: - input_dir: Directory containing JSONL trajectory files - output_dir: Output directory (default: input_dir + "_compressed") + input: Path to JSONL file or directory containing JSONL files + output: Output path (file for file input, directory for dir input) + Default: adds "_compressed" suffix to input name config: Path to YAML configuration file target_max_tokens: Override target token count from config tokenizer: Override tokenizer name from config + sample_percent: Sample this percentage of trajectories (1-100) before compression + seed: Random seed for sampling reproducibility (default: 42) dry_run: Analyze without compressing (just show what would happen) + + Examples: + # Compress a directory (original behavior) + python trajectory_compressor.py --input=data/my_run + + # Compress a single file + python trajectory_compressor.py --input=data/trajectories.jsonl + + # Compress 15% sample of a file + python trajectory_compressor.py --input=data/trajectories.jsonl --sample_percent=15 + + # Compress 10% sample with custom output + python trajectory_compressor.py --input=data/trajectories.jsonl --sample_percent=10 --output=data/sampled_compressed.jsonl """ + import random + import tempfile + import shutil + print("šŸ—œļø Trajectory Compressor") print("=" * 60) @@ -1212,31 +1248,159 @@ def main( if tokenizer: compression_config.tokenizer_name = tokenizer - # Setup paths - input_path = Path(input_dir) + # Validate sample_percent + if sample_percent is not None: + if sample_percent <= 0 or sample_percent > 100: + print(f"āŒ sample_percent must be between 1 and 100, got {sample_percent}") + return + print(f"šŸŽ² Will sample {sample_percent}% of trajectories (seed={seed})") + + # Setup paths and determine input type + input_path = Path(input) if not input_path.exists(): - print(f"āŒ Input directory not found: {input_dir}") + print(f"āŒ Input not found: {input}") return - if output_dir: - output_path = Path(output_dir) + is_file_input = input_path.is_file() + + if is_file_input: + print(f"šŸ“„ Input mode: Single JSONL file") + + # For file input, default output is file with _compressed suffix + if output: + output_path = Path(output) + else: + output_path = input_path.parent / (input_path.stem + compression_config.output_suffix + ".jsonl") + + # Load entries from the single file + entries = [] + with open(input_path, 'r', encoding='utf-8') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if line: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError as e: + print(f"āš ļø Skipping invalid JSON at line {line_num}: {e}") + + total_entries = len(entries) + print(f" Loaded {total_entries:,} trajectories from {input_path.name}") + + # Sample if requested + if sample_percent is not None: + random.seed(seed) + sample_size = max(1, int(total_entries * sample_percent / 100)) + entries = random.sample(entries, sample_size) + print(f" Sampled {len(entries):,} trajectories ({sample_percent}% of {total_entries:,})") + + if dry_run: + print(f"\nšŸ” DRY RUN MODE - analyzing without writing") + print(f"šŸ“„ Would process: {len(entries):,} trajectories") + print(f"šŸ“„ Would output to: {output_path}") + return + + # Create a temporary directory for processing + with tempfile.TemporaryDirectory() as temp_dir: + temp_input_dir = Path(temp_dir) / "input" + temp_output_dir = Path(temp_dir) / "output" + temp_input_dir.mkdir() + + # Write entries to temp file + temp_input_file = temp_input_dir / "trajectories.jsonl" + with open(temp_input_file, 'w', encoding='utf-8') as f: + for entry in entries: + f.write(json.dumps(entry, ensure_ascii=False) + '\n') + + # Initialize compressor and process + compressor = TrajectoryCompressor(compression_config) + compressor.process_directory(temp_input_dir, temp_output_dir) + + # Copy result to output path (merge all files in temp_output_dir) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, 'w', encoding='utf-8') as out_f: + for jsonl_file in sorted(temp_output_dir.glob("*.jsonl")): + with open(jsonl_file, 'r', encoding='utf-8') as in_f: + for line in in_f: + out_f.write(line) + + # Copy metrics file if it exists + metrics_file = temp_output_dir / compression_config.metrics_output_file + if metrics_file.exists(): + metrics_output = output_path.parent / (output_path.stem + "_metrics.json") + shutil.copy(metrics_file, metrics_output) + print(f"šŸ’¾ Metrics saved to {metrics_output}") + + print(f"\nāœ… Compression complete!") + print(f"šŸ“„ Output: {output_path}") + else: - output_path = input_path.parent / (input_path.name + compression_config.output_suffix) - - if dry_run: - print(f"\nšŸ” DRY RUN MODE - analyzing without writing") - print(f"šŸ“ Would process: {input_path}") - print(f"šŸ“ Would output to: {output_path}") - # TODO: Implement dry run analysis - return - - # Initialize compressor - compressor = TrajectoryCompressor(compression_config) - - # Process directory - compressor.process_directory(input_path, output_path) - - print("\nāœ… Compression complete!") + # Directory input - original behavior + print(f"šŸ“ Input mode: Directory of JSONL files") + + if output: + output_path = Path(output) + else: + output_path = input_path.parent / (input_path.name + compression_config.output_suffix) + + # If sampling is requested for directory mode, we need to handle it differently + if sample_percent is not None: + print(f"\nāš ļø Sampling from directory: will sample {sample_percent}% from each file") + + # Create a temp directory with sampled files + with tempfile.TemporaryDirectory() as temp_dir: + temp_input_dir = Path(temp_dir) / "input" + temp_input_dir.mkdir() + + random.seed(seed) + total_original = 0 + total_sampled = 0 + + # Sample from each JSONL file + for jsonl_file in sorted(input_path.glob("*.jsonl")): + entries = [] + with open(jsonl_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + pass + + total_original += len(entries) + sample_size = max(1, int(len(entries) * sample_percent / 100)) + sampled_entries = random.sample(entries, min(sample_size, len(entries))) + total_sampled += len(sampled_entries) + + # Write sampled entries + temp_file = temp_input_dir / jsonl_file.name + with open(temp_file, 'w', encoding='utf-8') as f: + for entry in sampled_entries: + f.write(json.dumps(entry, ensure_ascii=False) + '\n') + + print(f" Sampled {total_sampled:,} from {total_original:,} total trajectories") + + if dry_run: + print(f"\nšŸ” DRY RUN MODE - analyzing without writing") + print(f"šŸ“ Would process: {temp_input_dir}") + print(f"šŸ“ Would output to: {output_path}") + return + + # Initialize compressor and process the sampled data + compressor = TrajectoryCompressor(compression_config) + compressor.process_directory(temp_input_dir, output_path) + else: + if dry_run: + print(f"\nšŸ” DRY RUN MODE - analyzing without writing") + print(f"šŸ“ Would process: {input_path}") + print(f"šŸ“ Would output to: {output_path}") + return + + # Initialize compressor and process directly + compressor = TrajectoryCompressor(compression_config) + compressor.process_directory(input_path, output_path) + + print("\nāœ… Compression complete!") if __name__ == "__main__":