Build intelligent search experiences with semantic understanding and AI-powered retrieval
import requests
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class AISearchEngine:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.anyapi.ai/v1"
self.document_embeddings = {}
self.documents = {}
def add_document(self, doc_id, content, metadata=None):
"""Add a document to the search index"""
# Generate embedding
embedding = self.get_embedding(content)
# Store document and embedding
self.documents[doc_id] = {
"content": content,
"metadata": metadata or {},
"embedding": embedding
}
def get_embedding(self, text):
"""Generate embedding for text"""
response = requests.post(
f"{self.base_url}/embeddings",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "text-embedding-3-large",
"input": text
}
)
return response.json()["data"][0]["embedding"]
def search(self, query, top_k=10):
"""Perform semantic search"""
# Generate query embedding
query_embedding = self.get_embedding(query)
# Calculate similarities
similarities = []
for doc_id, doc_data in self.documents.items():
similarity = cosine_similarity(
[query_embedding],
[doc_data["embedding"]]
)[0][0]
similarities.append({
"doc_id": doc_id,
"similarity": similarity,
"content": doc_data["content"],
"metadata": doc_data["metadata"]
})
# Sort by similarity and return top results
similarities.sort(key=lambda x: x["similarity"], reverse=True)
return similarities[:top_k]
# Usage example
search_engine = AISearchEngine("YOUR_API_KEY")
# Add documents
search_engine.add_document(
"doc1",
"Python is a programming language used for web development",
{"category": "programming", "author": "John Doe"}
)
search_engine.add_document(
"doc2",
"Machine learning helps computers learn patterns from data",
{"category": "AI", "author": "Jane Smith"}
)
# Search
results = search_engine.search("coding languages for websites")
for result in results:
print(f"Score: {result['similarity']:.3f} - {result['content']}")
import pinecone
import requests
class AdvancedSearchEngine:
def __init__(self, api_key, pinecone_key, index_name):
self.api_key = api_key
# Initialize Pinecone
pinecone.init(api_key=pinecone_key, environment="us-west1-gcp")
self.index = pinecone.Index(index_name)
def index_documents(self, documents):
"""Index multiple documents efficiently"""
# Batch embedding generation
texts = [doc["content"] for doc in documents]
response = requests.post(
"https://api.anyapi.ai/v1/embeddings",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "text-embedding-3-large",
"input": texts
}
)
embeddings = response.json()["data"]
# Prepare vectors for Pinecone
vectors = []
for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
vectors.append({
"id": doc["id"],
"values": embedding["embedding"],
"metadata": {
"content": doc["content"][:1000], # Truncate for metadata
"title": doc.get("title", ""),
"category": doc.get("category", ""),
"url": doc.get("url", ""),
"timestamp": doc.get("timestamp", "")
}
})
# Upsert to Pinecone in batches
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch)
def search_with_filters(self, query, filters=None, top_k=10):
"""Search with metadata filtering"""
# Get query embedding
response = requests.post(
"https://api.anyapi.ai/v1/embeddings",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "text-embedding-3-large",
"input": query
}
)
query_embedding = response.json()["data"][0]["embedding"]
# Search in Pinecone with filters
search_results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True,
filter=filters
)
return search_results["matches"]
def hybrid_search(self, query, top_k=10):
"""Combine semantic search with keyword matching"""
# Semantic search
semantic_results = self.search_with_filters(query, top_k=top_k * 2)
# Keyword enhancement using AI
enhanced_query = self.enhance_query(query)
keyword_results = self.search_with_filters(enhanced_query, top_k=top_k * 2)
# Merge and rerank results
merged_results = self.merge_results(semantic_results, keyword_results)
return merged_results[:top_k]
def enhance_query(self, query):
"""Enhance query with AI to add relevant terms"""
response = requests.post(
"https://api.anyapi.ai/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "gpt-4o-mini",
"messages": [
{
"role": "user",
"content": f"""
Enhance this search query with relevant synonyms and related terms.
Original query: "{query}"
Return only the enhanced query with additional relevant terms.
"""
}
]
}
)
return response.json()["choices"][0]["message"]["content"]
class MultiModalSearchEngine:
def __init__(self, api_key):
self.api_key = api_key
self.text_embeddings = {}
self.image_embeddings = {}
self.documents = {}
def index_text_document(self, doc_id, content, metadata=None):
"""Index text document"""
embedding = self.get_text_embedding(content)
self.text_embeddings[doc_id] = embedding
self.documents[doc_id] = {
"type": "text",
"content": content,
"metadata": metadata or {}
}
def index_image_document(self, doc_id, image_path, description=None, metadata=None):
"""Index image document"""
# Generate image description if not provided
if not description:
description = self.describe_image(image_path)
# Get text embedding for description
text_embedding = self.get_text_embedding(description)
self.text_embeddings[doc_id] = text_embedding
self.documents[doc_id] = {
"type": "image",
"image_path": image_path,
"description": description,
"metadata": metadata or {}
}
def describe_image(self, image_path):
"""Generate description for image using vision model"""
import base64
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
response = requests.post(
"https://api.anyapi.ai/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Describe this image in detail for search indexing. Include objects, colors, setting, mood, and any text visible."
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
]
}
)
return response.json()["choices"][0]["message"]["content"]
def search_multimodal(self, query, content_types=None, top_k=10):
"""Search across text and images"""
if content_types is None:
content_types = ["text", "image"]
# Get query embedding
query_embedding = self.get_text_embedding(query)
# Search through embeddings
similarities = []
for doc_id, embedding in self.text_embeddings.items():
doc = self.documents[doc_id]
# Skip if content type not requested
if doc["type"] not in content_types:
continue
similarity = cosine_similarity([query_embedding], [embedding])[0][0]
similarities.append({
"doc_id": doc_id,
"similarity": similarity,
"type": doc["type"],
"document": doc
})
# Sort and return results
similarities.sort(key=lambda x: x["similarity"], reverse=True)
return similarities[:top_k]
def visual_search(self, image_path, top_k=10):
"""Search using an image as query"""
# Describe the query image
query_description = self.describe_image(image_path)
# Use description for search
return self.search_multimodal(query_description, content_types=["image"], top_k=top_k)
class IntelligentSearchEngine:
def __init__(self, api_key):
self.api_key = api_key
self.search_analytics = {}
self.user_preferences = {}
def search_with_reranking(self, query, user_id=None, top_k=10):
"""Search with AI-powered reranking"""
# Initial semantic search (get more results for reranking)
initial_results = self.semantic_search(query, top_k=top_k * 3)
# Rerank results using AI
reranked_results = self.rerank_results(query, initial_results, user_id)
# Apply personalization if user provided
if user_id:
reranked_results = self.personalize_results(reranked_results, user_id)
# Log search for analytics
self.log_search(query, user_id, reranked_results[:top_k])
return reranked_results[:top_k]
def rerank_results(self, query, results, user_id=None):
"""Rerank results using AI"""
# Prepare context for reranking
context = {
"query": query,
"user_id": user_id,
"results": [
{
"id": r["doc_id"],
"content": r["document"]["content"][:500], # Truncate
"metadata": r["document"]["metadata"]
}
for r in results
]
}
# Get reranking scores from AI
response = requests.post(
"https://api.anyapi.ai/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "gpt-4o-mini",
"messages": [
{
"role": "user",
"content": f"""
Rerank these search results for the query: "{query}"
Results:
{json.dumps(context["results"], indent=2)}
Consider:
1. Relevance to the query
2. Content quality and completeness
3. Recency (if timestamp available)
4. Authority (if available in metadata)
Return a JSON array with the result IDs in order of relevance.
Example: ["id1", "id3", "id2", ...]
"""
}
]
}
)
# Parse reranking order
try:
rerank_order = json.loads(response.json()["choices"][0]["message"]["content"])
# Reorder results based on AI ranking
reranked = []
for result_id in rerank_order:
for result in results:
if result["doc_id"] == result_id:
reranked.append(result)
break
return reranked
except:
# Fallback to original order if parsing fails
return results
def personalize_results(self, results, user_id):
"""Apply personalization based on user preferences"""
if user_id not in self.user_preferences:
return results
preferences = self.user_preferences[user_id]
# Boost results based on user preferences
for result in results:
metadata = result["document"]["metadata"]
# Boost preferred categories
if metadata.get("category") in preferences.get("preferred_categories", []):
result["similarity"] *= 1.2
# Boost preferred authors
if metadata.get("author") in preferences.get("preferred_authors", []):
result["similarity"] *= 1.1
# Boost recent content if user prefers fresh content
if preferences.get("prefer_recent", False):
timestamp = metadata.get("timestamp")
if timestamp:
# Boost based on recency (simplified)
days_old = (datetime.now() - datetime.fromisoformat(timestamp)).days
if days_old < 7:
result["similarity"] *= 1.15
# Re-sort based on adjusted scores
results.sort(key=lambda x: x["similarity"], reverse=True)
return results
def log_search(self, query, user_id, results):
"""Log search for analytics and learning"""
search_log = {
"timestamp": datetime.now().isoformat(),
"query": query,
"user_id": user_id,
"results_count": len(results),
"top_result": results[0]["doc_id"] if results else None
}
# Store in analytics
if query not in self.search_analytics:
self.search_analytics[query] = []
self.search_analytics[query].append(search_log)
def update_user_preferences(self, user_id, clicked_results, query):
"""Update user preferences based on behavior"""
if user_id not in self.user_preferences:
self.user_preferences[user_id] = {
"preferred_categories": [],
"preferred_authors": [],
"prefer_recent": False
}
# Analyze clicked results to update preferences
for result in clicked_results:
metadata = result["document"]["metadata"]
category = metadata.get("category")
if category:
prefs = self.user_preferences[user_id]["preferred_categories"]
if category not in prefs:
prefs.append(category)
class EcommerceSearchEngine(AISearchEngine):
def __init__(self, api_key):
super().__init__(api_key)
def index_product(self, product_id, product_data):
"""Index e-commerce product"""
# Combine product information for embedding
searchable_text = f"""
{product_data['name']}
{product_data['description']}
{product_data['category']}
{' '.join(product_data.get('tags', []))}
{product_data.get('brand', '')}
"""
self.add_document(
product_id,
searchable_text.strip(),
{
"name": product_data["name"],
"price": product_data["price"],
"category": product_data["category"],
"brand": product_data.get("brand", ""),
"rating": product_data.get("rating", 0),
"image_url": product_data.get("image_url", ""),
"in_stock": product_data.get("in_stock", True)
}
)
def search_products(self, query, filters=None, sort_by=None, top_k=20):
"""Search products with filters and sorting"""
results = self.search(query, top_k=top_k * 2) # Get more for filtering
# Apply filters
if filters:
filtered_results = []
for result in results:
metadata = result["metadata"]
# Price range filter
if "price_min" in filters and metadata["price"] < filters["price_min"]:
continue
if "price_max" in filters and metadata["price"] > filters["price_max"]:
continue
# Category filter
if "categories" in filters and metadata["category"] not in filters["categories"]:
continue
# Brand filter
if "brands" in filters and metadata["brand"] not in filters["brands"]:
continue
# Stock filter
if filters.get("in_stock_only", False) and not metadata["in_stock"]:
continue
filtered_results.append(result)
results = filtered_results
# Apply sorting
if sort_by == "price_low":
results.sort(key=lambda x: x["metadata"]["price"])
elif sort_by == "price_high":
results.sort(key=lambda x: x["metadata"]["price"], reverse=True)
elif sort_by == "rating":
results.sort(key=lambda x: x["metadata"]["rating"], reverse=True)
return results[:top_k]
class KnowledgeBaseSearch(AISearchEngine):
def __init__(self, api_key):
super().__init__(api_key)
def index_article(self, article_id, title, content, metadata=None):
"""Index knowledge base article"""
# Combine title and content for better search
searchable_text = f"{title}\n\n{content}"
article_metadata = {
"title": title,
"type": "article",
**(metadata or {})
}
self.add_document(article_id, searchable_text, article_metadata)
def answer_question(self, question, top_k=3):
"""Answer question using knowledge base"""
# Search for relevant articles
relevant_docs = self.search(question, top_k=top_k)
if not relevant_docs:
return "I couldn't find relevant information to answer your question."
# Combine relevant content
context = "\n\n".join([
f"Article: {doc['metadata']['title']}\n{doc['content'][:1000]}"
for doc in relevant_docs
])
# Generate answer using AI
response = requests.post(
f"{self.base_url}/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": f"""
Based on the following context from our knowledge base, answer the user's question.
Context:
{context}
Question: {question}
Answer based only on the provided context. If the context doesn't contain enough information, say so.
"""
}
]
}
)
answer = response.json()["choices"][0]["message"]["content"]
return {
"answer": answer,
"sources": [
{
"title": doc["metadata"]["title"],
"relevance": doc["similarity"]
}
for doc in relevant_docs
]
}
import redis
import hashlib
class CachedSearchEngine(AISearchEngine):
def __init__(self, api_key, redis_url=None):
super().__init__(api_key)
self.redis_client = redis.from_url(redis_url) if redis_url else None
self.cache_ttl = 3600 # 1 hour
def search_with_cache(self, query, top_k=10):
"""Search with Redis caching"""
if not self.redis_client:
return self.search(query, top_k)
# Create cache key
cache_key = f"search:{hashlib.md5(query.encode()).hexdigest()}:{top_k}"
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Perform search
results = self.search(query, top_k)
# Cache results
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(results, default=str)
)
return results
class BatchSearchEngine(AISearchEngine):
def __init__(self, api_key):
super().__init__(api_key)
def batch_index_documents(self, documents, batch_size=100):
"""Index documents in batches for efficiency"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Extract texts for batch embedding
texts = [doc["content"] for doc in batch]
# Get embeddings in batch
response = requests.post(
f"{self.base_url}/embeddings",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "text-embedding-3-large",
"input": texts
}
)
embeddings = response.json()["data"]
# Store documents with embeddings
for doc, embedding in zip(batch, embeddings):
self.documents[doc["id"]] = {
"content": doc["content"],
"metadata": doc.get("metadata", {}),
"embedding": embedding["embedding"]
}
print(f"Indexed {min(i + batch_size, len(documents))}/{len(documents)} documents")
class SearchAnalytics:
def __init__(self):
self.query_stats = {}
self.performance_stats = {}
def log_query(self, query, results_count, response_time, user_id=None):
"""Log search query for analytics"""
if query not in self.query_stats:
self.query_stats[query] = {
"count": 0,
"avg_results": 0,
"avg_response_time": 0,
"users": set()
}
stats = self.query_stats[query]
stats["count"] += 1
stats["avg_results"] = (stats["avg_results"] + results_count) / 2
stats["avg_response_time"] = (stats["avg_response_time"] + response_time) / 2
if user_id:
stats["users"].add(user_id)
def get_popular_queries(self, limit=10):
"""Get most popular search queries"""
sorted_queries = sorted(
self.query_stats.items(),
key=lambda x: x[1]["count"],
reverse=True
)
return sorted_queries[:limit]
def get_performance_metrics(self):
"""Get performance metrics"""
if not self.query_stats:
return {}
all_stats = list(self.query_stats.values())
return {
"total_queries": sum(stats["count"] for stats in all_stats),
"unique_queries": len(self.query_stats),
"avg_response_time": sum(stats["avg_response_time"] for stats in all_stats) / len(all_stats),
"avg_results_per_query": sum(stats["avg_results"] for stats in all_stats) / len(all_stats)
}