Traitement NLP des Textes Extraits

Une fois le texte extrait par OCR des zones sélectionnées, un pipeline NLP complet prend le relais pour analyser, structurer et permettre l’interaction intelligente avec le contenu.

Architecture du Système NLP

Le module central CompleteOCRQASystem implémente une approche hybride combinant :

  • Traitement local via NLTK, spaCy et Scikit-learn

  • Modèles de langage externes (Gemini, Mistral) pour les requêtes complexes

  • Préservation intégrale du contenu sans perte d’information

        flowchart TD
    A[Texte OCR Brut] --> B[Nettoyage & Prétraitement]
    B --> C[Segmentation en Phrases]
    C --> D[Vectorisation TF-IDF]
    D --> E[Indexation Complète]
    E --> F{Type de Requête}
    F -->|Simple| G[Traitement Local]
    F -->|Complexe| H[LLM Externe]
    G --> I[Réponse Structurée]
    H --> I
    E --> J[Résumé Automatique]
    E --> K[Extraction Entités]
    

Principe de Préservation à 100%

Important

Philosophie du système : Contrairement aux approches qui tronquent ou échantillonnent le contenu, notre système préserve l’intégralité du texte extrait par OCR.

L’échantillonnage n’intervient que pour les calculs intensifs (vectorisation TF-IDF) tout en gardant le texte complet accessible pour les requêtes.

class CompleteOCRQASystem:
    def __init__(self):
        # Conservation intégrale du contenu
        self.all_sentences = []  # JAMAIS tronqué
        self.sample_sentences = []  # Échantillon pour calculs

    def preserve_full_content(self, text):
        """Préservation complète sans perte"""
        sentences = self.segment_sentences(text)
        self.all_sentences.extend(sentences)

        # Échantillonnage seulement si nécessaire
        if len(sentences) > 400:
            self.sample_sentences = random.sample(sentences, 400)
        else:
            self.sample_sentences = sentences

Étapes de Traitement

1. Nettoyage et Pré-traitement

La classe TextCleaner effectue un nettoyage robuste :

class TextCleaner:
    def __init__(self):
        # Corrections OCR pré-compilées
        self.ocr_fixes = {
            r'\brn\b': 'm',      # rn → m
            r'\bvv\b': 'w',      # vv → w
            r'\b1\b': 'l',       # 1 → l (contexte)
            r'\b0\b': 'o',       # 0 → o (contexte)
        }

    def clean_ocr_text(self, text):
        # Application des corrections OCR
        for pattern, replacement in self.ocr_fixes.items():
            text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)

        # Normalisation générale
        text = text.lower()
        text = re.sub(r'[^\w\s]', ' ', text)  # Suppression ponctuation
        text = re.sub(r'\s+', ' ', text)      # Espaces multiples

        return text.strip()

Processus de Nettoyage :

Étapes de Nettoyage

Étape

Action

Corrections OCR

Remplacement des erreurs fréquentes (rn→m, vv→w)

Normalisation casse

Conversion en minuscules

Suppression ponctuation

Garde uniquement alphanumérique et espaces

Filtrage stop-words

Suppression mots vides (français/anglais)

Lemmatisation

Réduction aux formes canoniques (optionnel)

2. Segmentation en Phrases

def segment_sentences(self, text):
    """Segmentation robuste avec filtrage qualité"""
    import nltk
    from nltk.tokenize import sent_tokenize

    # Segmentation initiale
    sentences = sent_tokenize(text, language='french')

    # Filtrage vectorisé pour performance
    filtered_sentences = []
    for sentence in sentences:
        # Critères de qualité
        if (5 <= len(sentence.split()) <= 50 and  # Longueur raisonnable
            re.search(r'[a-zA-Z]', sentence) and   # Contient des lettres
            len(sentence.strip()) > 10):           # Minimum de contenu
            filtered_sentences.append(sentence.strip())

    return filtered_sentences

Critères de Filtrage :

  • Longueur : 5-50 mots par phrase (évite fragments et paragraphes)

  • Contenu alphabétique : Au moins une lettre (élimine les numéros purs)

  • Taille minimale : 10 caractères minimum

  • Cohérence syntaxique : Validation structure de phrase basique

3. Vectorisation et Indexation

Le système utilise une approche hybride TF-IDF + Word2Vec :

from sklearn.feature_extraction.text import TfidfVectorizer
from gensim.models import Word2Vec

def build_vectors(self, sentences):
    """Construction des vecteurs pour similarité"""

    # TF-IDF pour recherche rapide
    self.tfidf_vectorizer = TfidfVectorizer(
        max_features=5000,
        ngram_range=(1, 2),
        stop_words=self.stop_words,
        min_df=2,
        max_df=0.8
    )

    # Échantillon pour TF-IDF (performance)
    sample_size = min(400, len(sentences))
    sample_sentences = random.sample(sentences, sample_size)

    self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(sample_sentences)

    # Word2Vec pour sémantique
    tokenized_sentences = [sentence.split() for sentence in sentences]
    self.word2vec_model = Word2Vec(
        tokenized_sentences,
        vector_size=100,
        window=5,
        min_count=2,
        epochs=5,
        workers=4
    )

Avantages de l’Approche Hybride :

4. Extraction d’Entités Nommées

import spacy

def extract_entities(self, text):
    """Extraction entités avec spaCy français"""
    nlp = spacy.load("fr_core_news_sm")
    doc = nlp(text)

    entities = {
        'PERSON': [],
        'ORG': [],
        'DATE': [],
        'GPE': [],  # Lieux géopolitiques
        'MONEY': [],
        'MISC': []
    }

    for ent in doc.ents:
        if ent.label_ in entities:
            entities[ent.label_].append({
                'text': ent.text,
                'start': ent.start_char,
                'end': ent.end_char,
                'confidence': ent._.confidence if hasattr(ent._, 'confidence') else 1.0
            })

    return entities

Fonctionnalités de Traitement

Résumé Automatique Extractif

def generate_summary_from(self, sentences, max_sentences=8):
    """Résumé par extraction des phrases les plus représentatives"""

    if len(sentences) <= max_sentences:
        return sentences

    # Calcul scores TF-IDF par phrase
    sentence_scores = []
    for i, sentence in enumerate(sentences):
        if i < len(self.tfidf_matrix.toarray()):
            # Score = moyenne des poids TF-IDF des mots
            tfidf_vector = self.tfidf_matrix.toarray()[i]
            score = np.mean(tfidf_vector[tfidf_vector > 0])

            # Pénalité longueur extrême
            length_penalty = 1.0
            if len(sentence.split()) < 8:
                length_penalty = 0.7
            elif len(sentence.split()) > 40:
                length_penalty = 0.8

            sentence_scores.append((sentence, score * length_penalty))

    # Sélection des meilleures phrases
    sentence_scores.sort(key=lambda x: x[1], reverse=True)
    summary_sentences = [s[0] for s in sentence_scores[:max_sentences]]

    return summary_sentences

Détection de Sujets par Clustering

from sklearn.cluster import KMeans

def detect_dominant_topics(self, n_topics=3):
    """Clustering KMeans pour identifier les sujets"""

    if self.tfidf_matrix.shape[0] < n_topics:
        n_topics = max(1, self.tfidf_matrix.shape[0] // 2)

    # Clustering adaptatif
    kmeans = KMeans(
        n_clusters=n_topics,
        random_state=42,
        n_init=10,
        max_iter=100
    )

    clusters = kmeans.fit_predict(self.tfidf_matrix.toarray())

    # Sélection phrase représentative par cluster
    topics = []
    for cluster_id in range(n_topics):
        cluster_indices = np.where(clusters == cluster_id)[0]

        if len(cluster_indices) > 0:
            # Phrase la plus proche du centroïde
            centroid = kmeans.cluster_centers_[cluster_id]
            distances = [
                cosine_similarity([self.tfidf_matrix.toarray()[idx]], [centroid])[0][0]
                for idx in cluster_indices
            ]

            best_idx = cluster_indices[np.argmax(distances)]
            topics.append(self.sample_sentences[best_idx])

    return topics

Système Question-Réponse Local

Classification d’Intention

def classify_question_intent(self, question):
    """Classification rapide par mots-clés"""
    question_lower = question.lower()

    intent_patterns = {
        'summary': ['résume', 'résumé', 'synthèse', 'essentiel'],
        'what': ['quoi', 'que', 'qu\'est-ce', 'définition'],
        'when': ['quand', 'date', 'moment', 'période'],
        'who': ['qui', 'auteur', 'personne', 'nom'],
        'where': ['où', 'lieu', 'endroit', 'localisation'],
        'how': ['comment', 'méthode', 'procédure', 'étapes'],
        'why': ['pourquoi', 'raison', 'cause', 'motif']
    }

    for intent, keywords in intent_patterns.items():
        if any(keyword in question_lower for keyword in keywords):
            return intent

    return 'general'

Recherche par Similarité

def find_relevant_sentences(self, question, threshold=0.1):
    """Recherche sentences pertinentes par similarité cosine"""
    from sklearn.metrics.pairwise import cosine_similarity

    # Vectorisation de la question
    question_vector = self.tfidf_vectorizer.transform([question])

    # Calcul similarités
    similarities = cosine_similarity(question_vector, self.tfidf_matrix).flatten()

    # Seuil adaptatif basé sur la distribution
    if len(similarities) > 0:
        adaptive_threshold = max(threshold, np.mean(similarities) + np.std(similarities))
    else:
        adaptive_threshold = threshold

    # Sélection sentences pertinentes
    relevant_indices = np.where(similarities >= adaptive_threshold)[0]
    relevant_sentences = [
        (self.sample_sentences[idx], similarities[idx])
        for idx in relevant_indices
    ]

    # Tri par pertinence
    relevant_sentences.sort(key=lambda x: x[1], reverse=True)

    return [s[0] for s in relevant_sentences[:5]]  # Top 5

Enrichissement Heuristique

def enrich_answer_with_context(self, question, base_answer):
    """Enrichissement par recherche de mots communs"""
    question_words = set(question.lower().split())

    bonus_sentences = []
    for sentence in self.all_sentences:  # Recherche dans le corpus complet
        sentence_words = set(sentence.lower().split())
        common_words = question_words.intersection(sentence_words)

        # Si ≥2 mots communs et pas déjà dans la réponse
        if (len(common_words) >= 2 and
            sentence not in base_answer and
            len(sentence.split()) >= 5):
            bonus_sentences.append(sentence)

    # Limitation pour éviter surcharge
    return base_answer + bonus_sentences[:3]

Optimisations Performance

Mode CPU pour Machines Limitées

def enable_cpu_mode(self, max_image_height=2000):
    """Optimisations pour environnements contraints"""
    self.cpu_mode = True
    self.max_features_tfidf = 2000  # Réduit de 5000
    self.max_kmeans_iter = 50       # Réduit de 100
    self.word2vec_epochs = 3        # Réduit de 5
    self.max_sample_sentences = 200 # Réduit de 400

Cache et Mémorisation

from functools import lru_cache

@lru_cache(maxsize=100)
def cached_similarity_search(self, question_hash):
    """Cache des recherches fréquentes"""
    # Implémentation avec hash de la question
    pass

Métriques de Performance

Benchmarks Traitement NLP

Opération

Temps (CPU)

Temps (GPU)

Précision

Nettoyage texte

0.1-0.5s

0.05-0.2s

> 95%

Vectorisation TF-IDF

1-3s

0.5-1s

N/A

Clustering topics

2-5s

1-2s

Subjectif

Recherche similarité

0.5-1s

0.2-0.5s

75-85%

Résumé extractif

1-2s

0.5-1s

80-90%

Astuce

Bonnes pratiques identifiées :

  • Préserver l’intégralité du contenu extrait

  • Utiliser l’échantillonnage seulement pour les calculs intensifs

  • Combiner approches statistiques (TF-IDF) et sémantiques (Word2Vec)

  • Implémenter un cache pour les requêtes fréquentes

  • Adapter les seuils de similarité selon le contexte