Files
timmy-config/scripts/meaning-kernels/extract_meaning_kernels.py
Alexander Whitestone efdc0dc886 Improve #493: Enhanced meaning kernel extraction pipeline
- Added 5 kernel types: text, structure, summary, philosophical, semantic
- Improved diagram type detection with content analysis
- Added color analysis and grayscale detection
- Enhanced philosophical keyword extraction
- Added semantic relationship detection
- Improved error handling for missing dependencies
- Added comprehensive testing with text-rich test images
- Enhanced metadata and tagging system

Key improvements:
✓ Semantic relationship detection (source → target patterns)
✓ Enhanced philosophical content extraction
✓ Color analysis and grayscale detection
✓ Better diagram type classification
✓ Comprehensive metadata and tagging
✓ Improved error handling and dependency warnings

Still requires OCR dependencies for text extraction:
- pytesseract for OCR
- pdf2image for PDF processing
- Tesseract OCR engine (see issue #563)
2026-04-14 11:44:55 -04:00

642 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Improved Meaning Kernel Extraction Pipeline
Extract structured meaning kernels from academic PDF diagrams.
Issue #493: [Multimodal] Extract Meaning Kernels from Research Diagrams
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import hashlib
# Try to import vision libraries
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
print("Warning: PIL not available. Install with: pip install Pillow")
try:
import pytesseract
TESSERACT_AVAILABLE = True
except ImportError:
TESSERACT_AVAILABLE = False
print("Warning: pytesseract not available. Install with: pip install pytesseract")
try:
import pdf2image
PDF2IMAGE_AVAILABLE = True
except ImportError:
PDF2IMAGE_AVAILABLE = False
print("Warning: pdf2image not available. Install with: pip install pdf2image")
class MeaningKernel:
"""Represents an extracted meaning kernel."""
def __init__(self, kernel_id: str, content: str, source: str,
kernel_type: str = "text", confidence: float = 0.0,
metadata: Dict[str, Any] = None, tags: List[str] = None):
self.kernel_id = kernel_id
self.content = content
self.source = source
self.kernel_type = kernel_type # text, structure, summary, philosophical, semantic
self.confidence = confidence
self.metadata = metadata or {}
self.tags = tags or []
self.timestamp = datetime.now().isoformat()
self.hash = self._generate_hash()
def _generate_hash(self) -> str:
"""Generate a unique hash for this kernel."""
content_str = f"{self.kernel_id}:{self.content}:{self.source}:{self.timestamp}"
return hashlib.sha256(content_str.encode()).hexdigest()[:16]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"kernel_id": self.kernel_id,
"content": self.content,
"source": self.source,
"kernel_type": self.kernel_type,
"confidence": self.confidence,
"metadata": self.metadata,
"tags": self.tags,
"timestamp": self.timestamp,
"hash": self.hash
}
def __str__(self) -> str:
return f"Kernel[{self.kernel_id}] ({self.kernel_type}): {self.content[:100]}..."
class DiagramAnalyzer:
"""Analyze diagrams using multiple methods."""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.philosophical_keywords = self.config.get("philosophical_keywords", [
"truth", "knowledge", "wisdom", "meaning", "purpose",
"existence", "reality", "consciousness", "ethics", "morality",
"beauty", "justice", "freedom", "responsibility", "identity",
"causality", "determinism", "free will", "rationality", "logic",
"metaphysics", "epistemology", "ontology", "phenomenology"
])
def analyze_image(self, image_path: str) -> Dict[str, Any]:
"""Analyze an image using multiple methods."""
if not PIL_AVAILABLE:
raise ImportError("PIL is required for image analysis")
image = Image.open(image_path)
# Basic image analysis
analysis = {
"dimensions": {"width": image.width, "height": image.height},
"aspect_ratio": image.width / image.height,
"mode": image.mode,
"format": image.format,
"size_bytes": os.path.getsize(image_path),
"color_analysis": self._analyze_colors(image)
}
# OCR text extraction
if TESSERACT_AVAILABLE:
try:
ocr_data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
ocr_text = " ".join([text for text in ocr_data['text'] if text.strip()])
analysis["ocr_text"] = ocr_text
analysis["ocr_confidence"] = self._calculate_ocr_confidence(ocr_data)
analysis["ocr_word_count"] = len(ocr_text.split())
analysis["ocr_lines"] = self._extract_ocr_lines(ocr_data)
except Exception as e:
analysis["ocr_text"] = ""
analysis["ocr_confidence"] = 0.0
analysis["ocr_error"] = str(e)
# Diagram type estimation
analysis["diagram_type"] = self._estimate_diagram_type(image, analysis)
# Content analysis
analysis["content_analysis"] = self._analyze_content(analysis)
return analysis
def _analyze_colors(self, image: Image.Image) -> Dict[str, Any]:
"""Analyze color distribution in image."""
# Convert to RGB if necessary
if image.mode != 'RGB':
image = image.convert('RGB')
# Get colors
colors = image.getcolors(maxcolors=10000)
if colors:
# Sort by frequency
colors.sort(key=lambda x: x[0], reverse=True)
total_pixels = image.width * image.height
# Get dominant colors
dominant_colors = []
for count, color in colors[:5]:
percentage = (count / total_pixels) * 100
dominant_colors.append({
"color": color,
"count": count,
"percentage": round(percentage, 2)
})
return {
"dominant_colors": dominant_colors,
"unique_colors": len(colors),
"is_grayscale": self._is_grayscale(image)
}
return {"dominant_colors": [], "unique_colors": 0}
def _is_grayscale(self, image: Image.Image) -> bool:
"""Check if image is grayscale."""
# Sample some pixels
width, height = image.size
for x in range(0, width, width // 10):
for y in range(0, height, height // 10):
r, g, b = image.getpixel((x, y))
if not (r == g == b):
return False
return True
def _calculate_ocr_confidence(self, ocr_data: Dict[str, Any]) -> float:
"""Calculate average OCR confidence."""
confidences = [int(conf) for conf in ocr_data['conf'] if int(conf) > 0]
if confidences:
return sum(confidences) / len(confidences) / 100.0
return 0.0
def _extract_ocr_lines(self, ocr_data: Dict[str, Any]) -> List[str]:
"""Extract text lines from OCR data."""
lines = []
current_line = []
current_block = -1
current_par = -1
current_line_num = -1
for i in range(len(ocr_data['text'])):
if int(ocr_data['conf'][i]) <= 0:
continue
block_num = ocr_data['block_num'][i]
par_num = ocr_data['par_num'][i]
line_num = ocr_data['line_num'][i]
if (block_num != current_block or
par_num != current_par or
line_num != current_line_num):
if current_line:
lines.append(' '.join(current_line))
current_line = []
current_block = block_num
current_par = par_num
current_line_num = line_num
current_line.append(ocr_data['text'][i])
if current_line:
lines.append(' '.join(current_line))
return lines
def _estimate_diagram_type(self, image: Image.Image, analysis: Dict[str, Any]) -> str:
"""Estimate diagram type based on image characteristics."""
width, height = image.size
aspect_ratio = width / height
# Check for flowchart characteristics
if aspect_ratio > 2:
return "flowchart"
elif aspect_ratio < 0.5:
return "vertical_hierarchy"
elif 0.8 <= aspect_ratio <= 1.2:
# Check for circular patterns
if self._has_circular_patterns(image):
return "circular_diagram"
return "square_diagram"
# Check OCR content for clues
ocr_text = analysis.get("ocr_text", "").lower()
if any(word in ocr_text for word in ["process", "flow", "step", "arrow"]):
return "process_diagram"
elif any(word in ocr_text for word in ["system", "component", "module"]):
return "system_diagram"
elif any(word in ocr_text for word in ["data", "information", "input", "output"]):
return "data_diagram"
return "standard_diagram"
def _has_circular_patterns(self, image: Image.Image) -> bool:
"""Check for circular patterns in image (simplified)."""
# This is a simplified check - real implementation would use computer vision
return False
def _analyze_content(self, analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze content for themes and patterns."""
ocr_text = analysis.get("ocr_text", "")
content_analysis = {
"word_count": len(ocr_text.split()),
"has_text": bool(ocr_text),
"themes": [],
"entities": [],
"relationships": []
}
if ocr_text:
# Extract potential entities (capitalized words)
words = ocr_text.split()
entities = [word for word in words if word[0].isupper() and len(word) > 2]
content_analysis["entities"] = list(set(entities))[:10]
# Look for relationships
relationship_patterns = [
r"(\w+)\s*->\s*(\w+)",
r"(\w+)\s*→\s*(\w+)",
r"(\w+)\s*to\s*(\w+)",
r"(\w+)\s*from\s*(\w+)"
]
for pattern in relationship_patterns:
matches = re.findall(pattern, ocr_text)
for match in matches:
content_analysis["relationships"].append({
"source": match[0],
"target": match[1],
"type": "connection"
})
return content_analysis
class MeaningKernelExtractor:
"""Extract meaning kernels from diagrams."""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.analyzer = DiagramAnalyzer(config)
self.kernels: List[MeaningKernel] = []
self.stats = {
"pages_processed": 0,
"diagrams_analyzed": 0,
"kernels_extracted": 0,
"errors": 0,
"dependency_warnings": 0
}
# Check dependencies and update stats
if not PIL_AVAILABLE:
self.stats["dependency_warnings"] += 1
if not TESSERACT_AVAILABLE:
self.stats["dependency_warnings"] += 1
if not PDF2IMAGE_AVAILABLE:
self.stats["dependency_warnings"] += 1
def extract_from_pdf(self, pdf_path: str, output_dir: str = None) -> List[MeaningKernel]:
"""Extract meaning kernels from a PDF file."""
if not PDF2IMAGE_AVAILABLE:
print("Error: pdf2image is required for PDF processing")
print("Install with: pip install pdf2image")
print("System dependencies:")
print(" macOS: brew install poppler")
print(" Ubuntu: sudo apt-get install poppler-utils")
self.stats["errors"] += 1
return []
pdf_path = Path(pdf_path)
if not pdf_path.exists():
print(f"Error: PDF not found: {pdf_path}")
self.stats["errors"] += 1
return []
print(f"Processing PDF: {pdf_path}")
# Create output directory
if output_dir:
output_path = Path(output_dir)
else:
output_path = pdf_path.parent / f"{pdf_path.stem}_kernels"
output_path.mkdir(parents=True, exist_ok=True)
# Convert PDF to images
try:
from pdf2image import convert_from_path
images = convert_from_path(pdf_path, dpi=300)
print(f"Converted {len(images)} pages to images")
except Exception as e:
print(f"Error converting PDF: {e}")
self.stats["errors"] += 1
return []
# Process each page
all_kernels = []
for i, image in enumerate(images):
page_num = i + 1
print(f"Processing page {page_num}/{len(images)}")
# Save image temporarily
temp_image_path = output_path / f"page_{page_num:03d}.png"
image.save(temp_image_path)
# Extract kernels from image
page_kernels = self.extract_from_image(temp_image_path, page_num)
all_kernels.extend(page_kernels)
self.stats["pages_processed"] += 1
# Save all kernels
self._save_kernels(all_kernels, output_path)
return all_kernels
def extract_from_image(self, image_path: str, page_num: int = None) -> List[MeaningKernel]:
"""Extract meaning kernels from an image."""
print(f"Processing image: {image_path}")
# Analyze image
try:
analysis = self.analyzer.analyze_image(str(image_path))
except Exception as e:
print(f"Error analyzing image: {e}")
self.stats["errors"] += 1
return []
# Generate kernels
kernels = self._generate_kernels(analysis, str(image_path), page_num)
self.stats["diagrams_analyzed"] += 1
self.stats["kernels_extracted"] += len(kernels)
return kernels
def _generate_kernels(self, analysis: Dict[str, Any], source: str, page_num: int = None) -> List[MeaningKernel]:
"""Generate meaning kernels from analysis."""
kernels = []
# Create base ID
base_id = f"kernel_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if page_num:
base_id += f"_p{page_num}"
# 1. Text kernel (from OCR)
if analysis.get("ocr_text"):
text_kernel = MeaningKernel(
kernel_id=f"{base_id}_text",
content=analysis["ocr_text"],
source=source,
kernel_type="text",
confidence=analysis.get("ocr_confidence", 0.0),
metadata={
"word_count": analysis.get("ocr_word_count", 0),
"line_count": len(analysis.get("ocr_lines", [])),
"diagram_type": analysis.get("diagram_type", "unknown")
},
tags=["ocr", "text", "extracted"]
)
kernels.append(text_kernel)
# 2. Structure kernel
structure_content = f"Diagram type: {analysis.get('diagram_type', 'unknown')}. "
structure_content += f"Dimensions: {analysis['dimensions']['width']}x{analysis['dimensions']['height']}. "
structure_content += f"Aspect ratio: {analysis['aspect_ratio']:.2f}. "
# Add color information
color_analysis = analysis.get("color_analysis", {})
if color_analysis.get("is_grayscale"):
structure_content += "Grayscale image. "
elif color_analysis.get("dominant_colors"):
top_color = color_analysis["dominant_colors"][0]
structure_content += f"Dominant color: RGB{top_color['color']} ({top_color['percentage']}%). "
structure_kernel = MeaningKernel(
kernel_id=f"{base_id}_structure",
content=structure_content,
source=source,
kernel_type="structure",
confidence=0.9,
metadata={
"dimensions": analysis["dimensions"],
"aspect_ratio": analysis["aspect_ratio"],
"diagram_type": analysis.get("diagram_type", "unknown"),
"color_analysis": color_analysis
},
tags=["structure", "layout", "visual"]
)
kernels.append(structure_kernel)
# 3. Summary kernel
summary = f"Research diagram analysis: {analysis.get('diagram_type', 'unknown')} diagram. "
if analysis.get("ocr_text"):
summary += f"Contains text: {analysis['ocr_text'][:200]}..."
else:
summary += "No text detected."
# Add content analysis
content_analysis = analysis.get("content_analysis", {})
if content_analysis.get("entities"):
summary += f" Entities: {', '.join(content_analysis['entities'][:5])}."
summary_kernel = MeaningKernel(
kernel_id=f"{base_id}_summary",
content=summary,
source=source,
kernel_type="summary",
confidence=0.7,
metadata={
"has_text": bool(analysis.get("ocr_text")),
"text_length": len(analysis.get("ocr_text", "")),
"entities": content_analysis.get("entities", []),
"relationships": content_analysis.get("relationships", [])
},
tags=["summary", "overview", "analysis"]
)
kernels.append(summary_kernel)
# 4. Philosophical kernel (if we have text)
if analysis.get("ocr_text") and len(analysis["ocr_text"]) > 50:
philosophical_content = self._extract_philosophical_content(analysis["ocr_text"])
if philosophical_content:
philosophical_kernel = MeaningKernel(
kernel_id=f"{base_id}_philosophical",
content=philosophical_content,
source=source,
kernel_type="philosophical",
confidence=0.6,
metadata={
"extraction_method": "keyword_analysis",
"source_text_length": len(analysis["ocr_text"]),
"keywords_found": self._find_philosophical_keywords(analysis["ocr_text"])
},
tags=["philosophical", "meaning", "conceptual"]
)
kernels.append(philosophical_kernel)
# 5. Semantic kernel (if we have relationships)
content_analysis = analysis.get("content_analysis", {})
if content_analysis.get("relationships"):
relationships = content_analysis["relationships"]
semantic_content = f"Semantic relationships detected: {len(relationships)} connections. "
for rel in relationships[:3]:
semantic_content += f"{rel['source']}{rel['target']}. "
semantic_kernel = MeaningKernel(
kernel_id=f"{base_id}_semantic",
content=semantic_content,
source=source,
kernel_type="semantic",
confidence=0.8,
metadata={
"relationship_count": len(relationships),
"relationships": relationships
},
tags=["semantic", "relationships", "connections"]
)
kernels.append(semantic_kernel)
# Add to internal list
self.kernels.extend(kernels)
return kernels
def _extract_philosophical_content(self, text: str) -> Optional[str]:
"""Extract philosophical content from text."""
# Look for philosophical keywords
found_keywords = self._find_philosophical_keywords(text)
if found_keywords:
return f"Philosophical themes detected: {', '.join(found_keywords)}. " f"Source text explores concepts of {found_keywords[0]}."
return None
def _find_philosophical_keywords(self, text: str) -> List[str]:
"""Find philosophical keywords in text."""
text_lower = text.lower()
found_keywords = []
for keyword in self.analyzer.philosophical_keywords:
if keyword in text_lower:
found_keywords.append(keyword)
return found_keywords
def _save_kernels(self, kernels: List[MeaningKernel], output_path: Path):
"""Save kernels to files."""
if not kernels:
print("No kernels to save")
return
# Save as JSON
json_path = output_path / "meaning_kernels.json"
kernels_data = [k.to_dict() for k in kernels]
with open(json_path, 'w') as f:
json.dump(kernels_data, f, indent=2)
# Save as Markdown
md_path = output_path / "meaning_kernels.md"
with open(md_path, 'w') as f:
f.write(f"# Meaning Kernels Extraction Report\n")
f.write(f"Generated: {datetime.now().isoformat()}\n")
f.write(f"Total kernels: {len(kernels)}\n\n")
# Group by type
by_type = {}
for kernel in kernels:
by_type.setdefault(kernel.kernel_type, []).append(kernel)
for kernel_type, type_kernels in by_type.items():
f.write(f"## {kernel_type.title()} Kernels ({len(type_kernels)})\n\n")
for kernel in type_kernels:
f.write(f"### {kernel.kernel_id}\n")
f.write(f"- **Source**: {kernel.source}\n")
f.write(f"- **Confidence**: {kernel.confidence:.2f}\n")
f.write(f"- **Timestamp**: {kernel.timestamp}\n")
f.write(f"- **Tags**: {', '.join(kernel.tags)}\n")
f.write(f"- **Content**: {kernel.content}\n")
f.write(f"- **Metadata**: {json.dumps(kernel.metadata, indent=2)}\n\n")
# Save statistics
stats_path = output_path / "extraction_stats.json"
with open(stats_path, 'w') as f:
json.dump(self.stats, f, indent=2)
print(f"Saved {len(kernels)} kernels to {output_path}")
print(f" - JSON: {json_path}")
print(f" - Markdown: {md_path}")
print(f" - Statistics: {stats_path}")
def get_stats(self) -> Dict[str, Any]:
"""Get extraction statistics."""
return self.stats.copy()
def main():
"""Command line interface."""
parser = argparse.ArgumentParser(description="Extract meaning kernels from research diagrams")
parser.add_argument("input", help="Input PDF or image file/directory")
parser.add_argument("-o", "--output", help="Output directory")
parser.add_argument("-c", "--config", help="Configuration file (JSON)")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
# Load config if provided
config = {}
if args.config:
with open(args.config) as f:
config = json.load(f)
# Create extractor
extractor = MeaningKernelExtractor(config)
# Process input
input_path = Path(args.input)
if input_path.is_file():
if input_path.suffix.lower() == '.pdf':
kernels = extractor.extract_from_pdf(input_path, args.output)
elif input_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.tiff', '.bmp']:
kernels = extractor.extract_from_image(input_path)
else:
print(f"Unsupported file type: {input_path.suffix}")
sys.exit(1)
elif input_path.is_dir():
# Process all PDFs and images in directory
all_kernels = []
for file_path in input_path.iterdir():
if file_path.suffix.lower() == '.pdf':
kernels = extractor.extract_from_pdf(file_path, args.output)
all_kernels.extend(kernels)
elif file_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.tiff', '.bmp']:
kernels = extractor.extract_from_image(file_path)
all_kernels.extend(kernels)
else:
print(f"Input not found: {input_path}")
sys.exit(1)
# Print summary
stats = extractor.get_stats()
print("\n" + "="*50)
print("EXTRACTION SUMMARY")
print("="*50)
print(f"Pages processed: {stats['pages_processed']}")
print(f"Diagrams analyzed: {stats['diagrams_analyzed']}")
print(f"Kernels extracted: {stats['kernels_extracted']}")
print(f"Errors: {stats['errors']}")
print(f"Dependency warnings: {stats['dependency_warnings']}")
print("="*50)
# Exit with appropriate code
sys.exit(0 if stats['errors'] == 0 else 1)
if __name__ == "__main__":
main()