Files
timmy-config/scripts/meaning-kernels/extract_meaning_kernels.py

426 lines
16 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Meaning Kernel Extraction Pipeline
Extract structured meaning kernels from academic PDF diagrams.
Issue #493: [Multimodal] Extract Meaning Kernels from Research Diagrams
"""
import os
import sys
import json
import argparse
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional
import hashlib
# Try to import vision libraries
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
print("Warning: PIL not available. Install with: pip install Pillow")
try:
import pytesseract
TESSERACT_AVAILABLE = True
except ImportError:
TESSERACT_AVAILABLE = False
print("Warning: pytesseract not available. Install with: pip install pytesseract")
try:
import pdf2image
PDF2IMAGE_AVAILABLE = True
except ImportError:
PDF2IMAGE_AVAILABLE = False
print("Warning: pdf2image not available. Install with: pip install pdf2image")
class MeaningKernel:
"""Represents an extracted meaning kernel."""
def __init__(self, kernel_id: str, content: str, source: str,
kernel_type: str = "text", confidence: float = 0.0,
metadata: Dict[str, Any] = None):
self.kernel_id = kernel_id
self.content = content
self.source = source
self.kernel_type = kernel_type # text, structure, summary, philosophical
self.confidence = confidence
self.metadata = metadata or {}
self.timestamp = datetime.now().isoformat()
self.hash = self._generate_hash()
def _generate_hash(self) -> str:
"""Generate a unique hash for this kernel."""
content_str = f"{self.kernel_id}:{self.content}:{self.source}:{self.timestamp}"
return hashlib.sha256(content_str.encode()).hexdigest()[:16]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"kernel_id": self.kernel_id,
"content": self.content,
"source": self.source,
"kernel_type": self.kernel_type,
"confidence": self.confidence,
"metadata": self.metadata,
"timestamp": self.timestamp,
"hash": self.hash
}
def __str__(self) -> str:
return f"Kernel[{self.kernel_id}]: {self.content[:100]}..."
class DiagramAnalyzer:
"""Analyze diagrams using multiple methods."""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
def analyze_image(self, image_path: str) -> Dict[str, Any]:
"""Analyze an image using multiple methods."""
if not PIL_AVAILABLE:
raise ImportError("PIL is required for image analysis")
image = Image.open(image_path)
# Basic image analysis
analysis = {
"dimensions": {"width": image.width, "height": image.height},
"aspect_ratio": image.width / image.height,
"mode": image.mode,
"format": image.format,
"size_bytes": os.path.getsize(image_path)
}
# OCR text extraction
if TESSERACT_AVAILABLE:
try:
ocr_text = pytesseract.image_to_string(image)
analysis["ocr_text"] = ocr_text.strip()
analysis["ocr_confidence"] = self._estimate_ocr_confidence(image)
except Exception as e:
analysis["ocr_text"] = ""
analysis["ocr_confidence"] = 0.0
analysis["ocr_error"] = str(e)
# Diagram type estimation
analysis["diagram_type"] = self._estimate_diagram_type(image)
return analysis
def _estimate_ocr_confidence(self, image: Image.Image) -> float:
"""Estimate OCR confidence (simplified)."""
# In reality, would use pytesseract's confidence output
return 0.8 # Placeholder
def _estimate_diagram_type(self, image: Image.Image) -> str:
"""Estimate diagram type based on image characteristics."""
width, height = image.size
aspect_ratio = width / height
if aspect_ratio > 2:
return "flowchart"
elif aspect_ratio < 0.5:
return "vertical_hierarchy"
elif 0.8 <= aspect_ratio <= 1.2:
return "square_diagram"
else:
return "standard_diagram"
class MeaningKernelExtractor:
"""Extract meaning kernels from diagrams."""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.analyzer = DiagramAnalyzer(config)
self.kernels: List[MeaningKernel] = []
self.stats = {
"pages_processed": 0,
"diagrams_analyzed": 0,
"kernels_extracted": 0,
"errors": 0
}
def extract_from_pdf(self, pdf_path: str, output_dir: str = None) -> List[MeaningKernel]:
"""Extract meaning kernels from a PDF file."""
if not PDF2IMAGE_AVAILABLE:
raise ImportError("pdf2image is required for PDF processing")
pdf_path = Path(pdf_path)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF not found: {pdf_path}")
print(f"Processing PDF: {pdf_path}")
# Create output directory
if output_dir:
output_path = Path(output_dir)
else:
output_path = pdf_path.parent / f"{pdf_path.stem}_kernels"
output_path.mkdir(parents=True, exist_ok=True)
# Convert PDF to images
try:
from pdf2image import convert_from_path
images = convert_from_path(pdf_path, dpi=300)
print(f"Converted {len(images)} pages to images")
except Exception as e:
print(f"Error converting PDF: {e}")
self.stats["errors"] += 1
return []
# Process each page
all_kernels = []
for i, image in enumerate(images):
page_num = i + 1
print(f"Processing page {page_num}/{len(images)}")
# Save image temporarily
temp_image_path = output_path / f"page_{page_num:03d}.png"
image.save(temp_image_path)
# Extract kernels from image
page_kernels = self.extract_from_image(temp_image_path, page_num)
all_kernels.extend(page_kernels)
self.stats["pages_processed"] += 1
# Save all kernels
self._save_kernels(all_kernels, output_path)
return all_kernels
def extract_from_image(self, image_path: str, page_num: int = None) -> List[MeaningKernel]:
"""Extract meaning kernels from an image."""
print(f"Processing image: {image_path}")
# Analyze image
try:
analysis = self.analyzer.analyze_image(str(image_path))
except Exception as e:
print(f"Error analyzing image: {e}")
self.stats["errors"] += 1
return []
# Generate kernels
kernels = self._generate_kernels(analysis, str(image_path), page_num)
self.stats["diagrams_analyzed"] += 1
self.stats["kernels_extracted"] += len(kernels)
return kernels
def _generate_kernels(self, analysis: Dict[str, Any], source: str, page_num: int = None) -> List[MeaningKernel]:
"""Generate meaning kernels from analysis."""
kernels = []
# Create base ID
base_id = f"kernel_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if page_num:
base_id += f"_p{page_num}"
# 1. Text kernel (from OCR)
if analysis.get("ocr_text"):
text_kernel = MeaningKernel(
kernel_id=f"{base_id}_text",
content=analysis["ocr_text"],
source=source,
kernel_type="text",
confidence=analysis.get("ocr_confidence", 0.0),
metadata={
"word_count": len(analysis["ocr_text"].split()),
"diagram_type": analysis.get("diagram_type", "unknown")
}
)
kernels.append(text_kernel)
# 2. Structure kernel
structure_content = f"Diagram type: {analysis.get('diagram_type', 'unknown')}. "
structure_content += f"Dimensions: {analysis['dimensions']['width']}x{analysis['dimensions']['height']}. "
structure_content += f"Aspect ratio: {analysis['aspect_ratio']:.2f}."
structure_kernel = MeaningKernel(
kernel_id=f"{base_id}_structure",
content=structure_content,
source=source,
kernel_type="structure",
confidence=0.9,
metadata={
"dimensions": analysis["dimensions"],
"aspect_ratio": analysis["aspect_ratio"],
"diagram_type": analysis.get("diagram_type", "unknown")
}
)
kernels.append(structure_kernel)
# 3. Summary kernel
summary = f"Research diagram analysis: {analysis.get('diagram_type', 'unknown')} diagram. "
if analysis.get("ocr_text"):
summary += f"Contains text: {analysis['ocr_text'][:200]}..."
else:
summary += "No text detected."
summary_kernel = MeaningKernel(
kernel_id=f"{base_id}_summary",
content=summary,
source=source,
kernel_type="summary",
confidence=0.7,
metadata={
"has_text": bool(analysis.get("ocr_text")),
"text_length": len(analysis.get("ocr_text", ""))
}
)
kernels.append(summary_kernel)
# 4. Philosophical kernel (if we have text)
if analysis.get("ocr_text") and len(analysis["ocr_text"]) > 50:
# Simple philosophical extraction
philosophical_content = self._extract_philosophical_content(analysis["ocr_text"])
if philosophical_content:
philosophical_kernel = MeaningKernel(
kernel_id=f"{base_id}_philosophical",
content=philosophical_content,
source=source,
kernel_type="philosophical",
confidence=0.6,
metadata={
"extraction_method": "keyword_analysis",
"source_text_length": len(analysis["ocr_text"])
}
)
kernels.append(philosophical_kernel)
# Add to internal list
self.kernels.extend(kernels)
return kernels
def _extract_philosophical_content(self, text: str) -> Optional[str]:
"""Extract philosophical content from text (simplified)."""
# Look for philosophical keywords
philosophical_keywords = [
"truth", "knowledge", "wisdom", "meaning", "purpose",
"existence", "reality", "consciousness", "ethics", "morality",
"beauty", "justice", "freedom", "responsibility", "identity"
]
text_lower = text.lower()
found_keywords = [kw for kw in philosophical_keywords if kw in text_lower]
if found_keywords:
return f"Philosophical themes detected: {', '.join(found_keywords)}. " f"Source text explores concepts of {found_keywords[0]}."
return None
def _save_kernels(self, kernels: List[MeaningKernel], output_path: Path):
"""Save kernels to files."""
if not kernels:
print("No kernels to save")
return
# Save as JSON
json_path = output_path / "meaning_kernels.json"
kernels_data = [k.to_dict() for k in kernels]
with open(json_path, 'w') as f:
json.dump(kernels_data, f, indent=2)
# Save as Markdown
md_path = output_path / "meaning_kernels.md"
with open(md_path, 'w') as f:
f.write(f"# Meaning Kernels Extraction Report\n")
f.write(f"Generated: {datetime.now().isoformat()}\n")
f.write(f"Total kernels: {len(kernels)}\n\n")
# Group by type
by_type = {}
for kernel in kernels:
by_type.setdefault(kernel.kernel_type, []).append(kernel)
for kernel_type, type_kernels in by_type.items():
f.write(f"## {kernel_type.title()} Kernels ({len(type_kernels)})\n\n")
for kernel in type_kernels:
f.write(f"### {kernel.kernel_id}\n")
f.write(f"- **Source**: {kernel.source}\n")
f.write(f"- **Confidence**: {kernel.confidence:.2f}\n")
f.write(f"- **Timestamp**: {kernel.timestamp}\n")
f.write(f"- **Content**: {kernel.content}\n")
f.write(f"- **Metadata**: {json.dumps(kernel.metadata, indent=2)}\n\n")
# Save statistics
stats_path = output_path / "extraction_stats.json"
with open(stats_path, 'w') as f:
json.dump(self.stats, f, indent=2)
print(f"Saved {len(kernels)} kernels to {output_path}")
print(f" - JSON: {json_path}")
print(f" - Markdown: {md_path}")
print(f" - Statistics: {stats_path}")
def get_stats(self) -> Dict[str, Any]:
"""Get extraction statistics."""
return self.stats.copy()
def main():
"""Command line interface."""
parser = argparse.ArgumentParser(description="Extract meaning kernels from research diagrams")
parser.add_argument("input", help="Input PDF or image file/directory")
parser.add_argument("-o", "--output", help="Output directory")
parser.add_argument("-c", "--config", help="Configuration file (JSON)")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
# Load config if provided
config = {}
if args.config:
with open(args.config) as f:
config = json.load(f)
# Create extractor
extractor = MeaningKernelExtractor(config)
# Process input
input_path = Path(args.input)
if input_path.is_file():
if input_path.suffix.lower() == '.pdf':
kernels = extractor.extract_from_pdf(input_path, args.output)
elif input_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.tiff', '.bmp']:
kernels = extractor.extract_from_image(input_path)
else:
print(f"Unsupported file type: {input_path.suffix}")
sys.exit(1)
elif input_path.is_dir():
# Process all PDFs and images in directory
all_kernels = []
for file_path in input_path.iterdir():
if file_path.suffix.lower() == '.pdf':
kernels = extractor.extract_from_pdf(file_path, args.output)
all_kernels.extend(kernels)
elif file_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.tiff', '.bmp']:
kernels = extractor.extract_from_image(file_path)
all_kernels.extend(kernels)
else:
print(f"Input not found: {input_path}")
sys.exit(1)
# Print summary
stats = extractor.get_stats()
print("\n" + "="*50)
print("EXTRACTION SUMMARY")
print("="*50)
print(f"Pages processed: {stats['pages_processed']}")
print(f"Diagrams analyzed: {stats['diagrams_analyzed']}")
print(f"Kernels extracted: {stats['kernels_extracted']}")
print(f"Errors: {stats['errors']}")
print("="*50)
# Exit with appropriate code
sys.exit(0 if stats['errors'] == 0 else 1)
if __name__ == "__main__":
main()