Kreuzberg provides a flexible system for adding and removing custom extractors through the ExtractorRegistry
. This allows you to extend Kreuzberg's capabilities to handle additional file formats or customize how existing formats are processed.
To create a custom extractor, you need to subclass the Extractor
abstract base class and implement its required methods:
| from kreuzberg import ExtractorRegistry, ExtractionResult, ExtractionConfig
from kreuzberg._extractors._base import Extractor
from pathlib import Path
class CustomExtractor(Extractor):
"""Custom extractor for handling a specific file format."""
# Define the MIME types this extractor supports
SUPPORTED_MIME_TYPES = {"application/x-custom", "application/x-custom-format"}
async def extract_bytes_async(self, content: bytes) -> ExtractionResult:
"""Asynchronously extract content from bytes."""
# Implement your extraction logic here
extracted_text = self._process_content(content)
return ExtractionResult(content=extracted_text, mime_type=self.mime_type, metadata={"extractor": "CustomExtractor"})
async def extract_path_async(self, path: Path) -> ExtractionResult:
"""Asynchronously extract content from a file path."""
# Read the file and process it
content = await self._read_file_async(path)
return await self.extract_bytes_async(content)
def extract_bytes_sync(self, content: bytes) -> ExtractionResult:
"""Synchronously extract content from bytes."""
# Implement your extraction logic here
extracted_text = self._process_content(content)
return ExtractionResult(content=extracted_text, mime_type=self.mime_type, metadata={"extractor": "CustomExtractor"})
def extract_path_sync(self, path: Path) -> ExtractionResult:
"""Synchronously extract content from a file path."""
# Read the file and process it
with open(path, "rb") as f:
content = f.read()
return self.extract_bytes_sync(content)
def _process_content(self, content: bytes) -> str:
"""Process the content and extract text."""
# Implement your content processing logic here
# This is just an example
return content.decode("utf-8", errors="ignore")
async def _read_file_async(self, path: Path) -> bytes:
"""Read a file asynchronously."""
# This is a simple implementation; you might want to use aiofiles in practice
with open(path, "rb") as f:
return f.read()
|
Once you've created your custom extractor, you can register it with Kreuzberg using the ExtractorRegistry
:
| from kreuzberg import ExtractorRegistry
from my_module import CustomExtractor
# Register the custom extractor
ExtractorRegistry.add_extractor(CustomExtractor)
# Now you can use it with standard extraction functions
from kreuzberg import extract_file
result = await extract_file("custom_document.xyz")
|
Extractors are tried in the order they are registered. When extracting content, Kreuzberg will:
- Try all user-registered extractors first (in the order they were added)
- Then try all default extractors
This means your custom extractors take precedence over the built-in ones. If you want to override how a specific MIME type is handled, you can register a custom extractor that supports that MIME type.
You can remove a previously registered extractor:
| from kreuzberg import ExtractorRegistry
from my_module import CustomExtractor
# First register it
ExtractorRegistry.add_extractor(CustomExtractor)
# Later, remove it when no longer needed
ExtractorRegistry.remove_extractor(CustomExtractor)
|
When creating custom extractors that need OCR capabilities, you can leverage Kreuzberg's OCR configuration options:
| from kreuzberg import ExtractorRegistry, ExtractionResult, ExtractionConfig, TesseractConfig, PSMMode
from kreuzberg._extractors._base import Extractor
class CustomImageExtractor(Extractor):
"""Custom extractor for image files with OCR capabilities."""
SUPPORTED_MIME_TYPES = {"image/custom"}
def extract_bytes_sync(self, content: bytes) -> ExtractionResult:
# Get OCR configuration from the extraction config
ocr_config = self.config.ocr_config
if isinstance(ocr_config, TesseractConfig):
# Access Tesseract-specific settings
language = ocr_config.language # Language model to use (e.g., "eng", "deu")
psm = ocr_config.psm # Page segmentation mode
# Use these settings in your OCR processing
# ...
# Implement the rest of your extraction logic
# ...
return ExtractionResult(content="Extracted text", mime_type=self.mime_type, metadata={"ocr_engine": "tesseract"})
# Implement other required methods...
|
Here's a complete example of a custom CSV extractor that extracts text from CSV files:
| from kreuzberg import ExtractorRegistry, ExtractionResult, ExtractionConfig
from kreuzberg._extractors._base import Extractor
from pathlib import Path
import csv
import io
class CSVExtractor(Extractor):
"""Custom extractor for CSV files."""
SUPPORTED_MIME_TYPES = {"text/csv", "application/csv"}
async def extract_bytes_async(self, content: bytes) -> ExtractionResult:
return self.extract_bytes_sync(content)
async def extract_path_async(self, path: Path) -> ExtractionResult:
with open(path, "rb") as f:
content = f.read()
return await self.extract_bytes_async(content)
def extract_bytes_sync(self, content: bytes) -> ExtractionResult:
text_content = content.decode("utf-8", errors="ignore")
extracted_text = self._process_csv(text_content)
return ExtractionResult(
content=extracted_text, mime_type=self.mime_type, metadata={"extractor": "CSVExtractor", "format": "csv"}
)
def extract_path_sync(self, path: Path) -> ExtractionResult:
with open(path, "rb") as f:
content = f.read()
return self.extract_bytes_sync(content)
def _process_csv(self, csv_content: str) -> str:
"""Process CSV content and convert to plain text."""
output = []
csv_file = io.StringIO(csv_content)
try:
reader = csv.reader(csv_file)
headers = next(reader, None)
if headers:
output.append(" | ".join(headers))
output.append("-" * 40)
for row in reader:
output.append(" | ".join(row))
except Exception as e:
output.append(f"Error processing CSV: {str(e)}")
return "\n".join(output)
# Register the custom extractor
ExtractorRegistry.add_extractor(CSVExtractor)
|
Best Practices
- Define clear MIME type support: Be specific about which MIME types your extractor supports
- Implement both sync and async methods: Ensure your extractor works in both synchronous and asynchronous contexts
- Handle errors gracefully: Catch and handle exceptions within your extractor methods
- Provide rich metadata: Include useful information about the extraction process in the result metadata
- Test with various inputs: Ensure your extractor works with a variety of file formats and edge cases
- Consider performance: For large files, implement streaming or chunking to avoid memory issues
- Document your extractors: Include clear documentation for your custom extractors