#!/usr/bin/env python3 """ Extract encyclopedia entries from Ogden's Encyclopedia of Radio Horror PDFs and convert them to structured JSON files. """ import fitz import json import os import re import struct import sys from pathlib import Path PDF_DIR = Path("/workspace/pdf") MSWORD_DIR = Path("/workspace/msword") OUTPUT_DIR = Path("/workspace/json") IMAGES_DIR = Path("/workspace/images") # Font thresholds detected from analysis TITLE_SIZE_MIN = 14.0 # Entry titles are Georgia Bold ~16pt FIELD_SIZE_MAX = 12.0 # Field headers / category are Georgia Bold 7.9-9.1pt # Known field headers (text to look for at start of span/line) FIELD_HEADERS = { "ORIGINATION": "origination", "DURATION": "duration", "PERSONNEL": "personnel", "EXTANT RECORDINGS:": "extant_recordings", "EXTANT RECORDINGS": "extant_recordings", "EXTANT RECORDING:": "extant_recordings", "EXTANT RECORDING": "extant_recordings", "[CHRONOLOGY]": "chronology", "[SOURCES]": "sources", "[GALLERY]": "gallery", } def is_category_text(text): """Check if text looks like a category tag.""" text = text.strip() # Categories are like [RADIO-SERIES], [SHORT-STORY], [STAGE-PLAY; RADIO-SCRIPT], etc. # Handle rare typos like [RADIO-TALKS) return bool(re.match(r'^\[[A-Z][A-Z\-\s;]+[\]\)]$', text)) def find_field_header(span_text): """Check if span text starts with a known field header. Returns (json_key, value_text_after_colon).""" text = span_text.strip() for header, key in FIELD_HEADERS.items(): if text.upper().startswith(header.upper()): # Extract the rest of the text after the header (and colon) rest = text[len(header):].lstrip(": ").strip() return key, rest return None, None def extract_images_from_doc(doc_path, letter): """Extract images from .doc file by scanning for image signatures.""" images_dir = IMAGES_DIR / letter images_dir.mkdir(parents=True, exist_ok=True) try: import olefile ole = olefile.OleFileIO(doc_path) if ole.exists("WordDocument"): data = ole.openstream("WordDocument").read() else: ole.close() return [] ole.close() except Exception as e: print(f" Warning: Could not read .doc: {e}", file=sys.stderr) return [] found = [] # Extract BMP images (common in old .doc files) idx = 0 while True: idx = data.find(b'BM', idx) if idx == -1: break if idx + 14 > len(data): idx += 1 continue try: bmp_size = struct.unpack(' len(data) - idx: idx += 1 continue img_data = data[idx:idx+bmp_size] count = len(found) + 1 filename = f"image_{count}.bmp" with open(images_dir / filename, 'wb') as f: f.write(img_data) found.append(filename) print(f" Extracted {filename} ({len(img_data)} bytes)", file=sys.stderr) idx += bmp_size # Extract JPEG images idx = 0 while True: idx = data.find(b'\xff\xd8\xff', idx) if idx == -1: break end = data.find(b'\xff\xd9', idx + 3) if end == -1: idx += 1 continue end += 2 img_data = data[idx:end] if len(img_data) < 20: idx += 1 continue count = len(found) + 1 filename = f"image_{count}.jpg" with open(images_dir / filename, 'wb') as f: f.write(img_data) found.append(filename) print(f" Extracted {filename} ({len(img_data)} bytes)", file=sys.stderr) idx = end # Extract PNG images idx = 0 while True: idx = data.find(b'\x89PNG\r\n\x1a\n', idx) if idx == -1: break end = data.find(b'IEND\xae\x42\x60\x82', idx) if end == -1: idx += 1 continue end += 8 img_data = data[idx:end] count = len(found) + 1 filename = f"image_{count}.png" with open(images_dir / filename, 'wb') as f: f.write(img_data) found.append(filename) print(f" Extracted {filename} ({len(img_data)} bytes)", file=sys.stderr) idx = end return found def parse_page(pdf_doc, page_idx): """Parse a single page and return a list of annotated lines.""" page = pdf_doc[page_idx] blocks = page.get_text("dict")["blocks"] lines = [] for block in blocks: if "lines" not in block: continue for line in block["lines"]: full_text = "" is_title = False title_parts = [] field_parts = [] body_parts = [] for span in line["spans"]: text = span["text"] font = span.get("font", "") size = span.get("size", 0) is_bold = "Bold" in font if is_bold and size >= TITLE_SIZE_MIN: is_title = True title_parts.append(text) elif is_bold and size <= FIELD_SIZE_MAX: field_parts.append(text) else: body_parts.append(text) full_text = "".join(span["text"] for span in line["spans"]).strip() title_text = "".join(title_parts).strip() field_text = "".join(field_parts).strip() if not full_text: continue # Determine line type line_type = "body" metadata = {} if title_text: line_type = "title" metadata["title"] = title_text elif field_text: line_type = "field" metadata["field_text"] = field_text # Get X position from bbox x_pos = 0 if line.get("bbox"): x_pos = line["bbox"][0] lines.append({ "type": line_type, "text": full_text, "title": metadata.get("title", ""), "field_text": metadata.get("field_text", ""), "x_pos": x_pos, }) return lines def merge_multi_line_titles(all_pages_lines): """Merge titles that span multiple lines. A title that wraps will have consecutive lines that are ALL title-type (Georgia Bold ~16pt). Only merge consecutive title lines. """ merged = [] i = 0 while i < len(all_pages_lines): line = all_pages_lines[i] if line["type"] == "title": current_title = line["title"] j = i + 1 # Merge consecutive title lines (all Georgia Bold ~16pt) while j < len(all_pages_lines): next_line = all_pages_lines[j] if next_line["type"] == "title": current_title += " " + next_line["title"] j += 1 else: break merged.append({ "type": "title", "text": current_title, "title": current_title, "field_text": "", }) i = j else: merged.append(line) i += 1 return merged def parse_pdf(pdf_path, letter): """Parse a single PDF and extract structured entries.""" doc = fitz.open(pdf_path) # Parse all pages all_lines = [] for page_idx in range(doc.page_count): page_lines = parse_page(doc, page_idx) all_lines.extend(page_lines) doc.close() # Merge multi-line titles all_lines = merge_multi_line_titles(all_lines) # Split into entries entries = [] current_entry = None for line in all_lines: if line["type"] == "title": # Skip the encyclopedia page header if "ENCYCLOPEDIA OF RADIO" in line["title"].upper(): continue # Skip single letter headers if re.match(r'^[A-Z]\s*$', line["title"].strip()): continue # Skip empty titles if not line["title"].strip(): continue # Save previous entry if current_entry is not None: entries.append(current_entry) # Start new entry current_entry = { "title": line["title"].strip(), "category": "", "article": "", "origination": "", "duration": "", "personnel": "", "extant_recordings": "", "chronology": "", "sources": "", "gallery": "", "images": [], } continue if current_entry is None: continue if line["type"] == "field": field_text = line["field_text"] full_text = line["text"] x_pos = line.get("x_pos", 0) # Check if it's a category if is_category_text(field_text) and not current_entry["category"]: current_entry["category"] = field_text current_entry["_section"] = "article" continue # Only treat as section boundary if at top-level position (x_pos <= 140) # Indented bold text (x_pos > 140) is part of current section is_top_level = x_pos <= 140 if is_top_level: # Check if it's a known field header - use FULL line text for inline values key, inline_value = find_field_header(full_text) if key: if inline_value: current_entry[key] = inline_value current_entry["_section"] = key continue # If it's indented bold text or unrecognized, treat as body text section = current_entry.get("_section", "article") if section in ("chronology", "sources", "gallery", "origination", "duration", "personnel", "extant_recordings"): if current_entry[section]: current_entry[section] += "\n" + full_text else: current_entry[section] = full_text else: if current_entry["article"]: current_entry["article"] += "\n" + full_text else: current_entry["article"] = full_text continue # Body text - append to current section section = current_entry.get("_section", "article") if section in ("chronology", "sources", "gallery", "origination", "duration", "personnel", "extant_recordings"): if current_entry[section]: current_entry[section] += "\n" + line["text"] else: current_entry[section] = line["text"] else: # Article text if current_entry["article"]: current_entry["article"] += "\n" + line["text"] else: current_entry["article"] = line["text"] if current_entry is not None: entries.append(current_entry) return entries def clean_entry(entry): """Clean up an entry for JSON output.""" entry.pop("_section", None) cleaned = {} for key in ["title", "category", "article", "origination", "duration", "personnel", "extant_recordings", "chronology", "sources", "gallery", "images"]: value = entry.get(key, "") if isinstance(value, str): value = re.sub(r'\n{3,}', '\n\n', value).strip() cleaned[key] = value return cleaned def main(): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) IMAGES_DIR.mkdir(parents=True, exist_ok=True) all_entries = [] # Process each letter file pdf_files = sorted(PDF_DIR.glob("OEORH *.pdf")) for pdf_path in pdf_files: letter = pdf_path.stem.split()[-1].lower() print(f"Processing {pdf_path.name}...", file=sys.stderr) # Try to extract images from the .doc file doc_path = MSWORD_DIR / f"OEORH {letter}.doc" images = [] if doc_path.exists(): images = extract_images_from_doc(doc_path, letter) # Parse the PDF entries = parse_pdf(pdf_path, letter) # Attach images to entries with gallery content for entry in entries: if images and entry.get("gallery", "").strip(): entry["images"] = images images = [] break # If no gallery entries, attach to first entry if images: for entry in entries: entry["images"] = images images = [] break # Clean and save entries for entry in entries: cleaned = clean_entry(entry) all_entries.append(cleaned) print(f" Found {len(entries)} entries", file=sys.stderr) # Write individual JSON files (handle duplicates) used_names = {} for i, entry in enumerate(all_entries): safe_title = re.sub(r'[^a-zA-Z0-9\s\-]', '', entry["title"]).strip() safe_title = re.sub(r'\s+', '-', safe_title).lower()[:80] base = safe_title if safe_title else f"entry_{i:04d}" if base not in used_names: used_names[base] = 1 filename = f"{base}.json" else: used_names[base] += 1 filename = f"{base}-{used_names[base]}.json" filepath = OUTPUT_DIR / filename with open(filepath, 'w', encoding='utf-8') as f: json.dump(entry, f, indent=2, ensure_ascii=False) # Write combined index index = [] used_names = {} for entry in all_entries: safe_title = re.sub(r'[^a-zA-Z0-9\s\-]', '', entry["title"]).strip() safe_title = re.sub(r'\s+', '-', safe_title).lower()[:80] base = safe_title if safe_title else f"entry_{len(index):04d}" if base not in used_names: used_names[base] = 1 fname = f"{base}.json" else: used_names[base] += 1 fname = f"{base}-{used_names[base]}.json" index.append({ "title": entry["title"], "category": entry["category"], "file": f"json/{fname}", }) with open(OUTPUT_DIR / "index.json", 'w', encoding='utf-8') as f: json.dump(index, f, indent=2, ensure_ascii=False) print(f"\nDone! Extracted {len(all_entries)} entries to {OUTPUT_DIR}", file=sys.stderr) if __name__ == "__main__": main()