Skill

Access Notion Data for AI Applications

A Python toolkit that retrieves information from Notion pages using the Notion API.

Works with notion

52
Spark score
out of 100
Updated 7 days ago
Version 0.2.90

Add to Favorites

Why it matters

Integrate your Notion workspace with AI models to retrieve and process information. This toolkit enables AI to access and utilize your Notion pages as a knowledge base.

Outcomes

What it gets done

01

Connect to Notion API using your token.

02

Retrieve information from specified Notion pages.

03

Index Notion content for efficient AI querying.

04

Facilitate AI-driven summarization and analysis of Notion data.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/camel-notiontoolkit | bash

Capabilities

What this skill does

Extract

Pulls structured data fields from unstructured text.

Query a database

Writes and executes SQL or NoSQL queries on databases.

RAG index

Chunks, embeds, and indexes documents for semantic retrieval.

Summarize

Condenses long documents or threads into key takeaways.

Overview

Notion Toolkit

What it does

A toolkit for retrieving information from Notion pages via API

How it connects

When you need to retrieve information from Notion pages using an API token

Source code

import os
from typing import List, Optional, cast

from camel.toolkits import FunctionTool
from camel.toolkits.base import BaseToolkit
from camel.utils import MCPServer, api_keys_required

def get_plain_text_from_rich_text(rich_text: List[dict]) -> str:
r"""Extracts plain text from a list of rich text elements.

Args:
    rich_text: A list of dictionaries representing rich text elements.
        Each dictionary should contain a key named "plain_text" with
        the plain text content.

Returns:
    str: A string containing the combined plain text from all elements,
        joined together.
"""
plain_texts = [element.get("plain_text", "") for element in rich_text]
return "".join(plain_texts)

def get_media_source_text(block: dict) -> str:
r"""Extracts the source URL and optional caption from a
Notion media block.

Args:
    block: A dictionary representing a Notion media block.

Returns:
    A string containing the source URL and caption (if available),
        separated by a colon.
"""
block_type = block.get("type", "Unknown Type")
block_content = block.get(block_type, {})

# Extract source URL based on available types
source = (
    block_content.get("external", {}).get("url")
    or block_content.get("file", {}).get("url")
    or block_content.get(
        "url", "[Missing case for media block types]: " + block_type
    )
)

# Extract caption if available
caption_elements = block_content.get("caption", [])
if caption_elements:
    caption = get_plain_text_from_rich_text(caption_elements)
    return f"{caption}: {source}"

return source

@MCPServer()
class NotionToolkit(BaseToolkit):
r"""A toolkit for retrieving information from the user's notion pages.

Attributes:
    notion_token (Optional[str], optional): The notion_token used to
        interact with notion APIs. (default: :obj:`None`)
    notion_client (module): The notion module for interacting with
        the notion APIs.
"""

@api_keys_required(
    [
        ("notion_token", 'NOTION_TOKEN'),
    ]
)
def __init__(
    self,
    notion_token: Optional[str] = None,
    timeout: Optional[float] = None,
) -> None:
    r"""Initializes the NotionToolkit.

    Args:
        notion_token (Optional[str], optional): The optional notion_token
            used to interact with notion APIs.(default: :obj:`None`)
    """
    super().__init__(timeout=timeout)
    from notion_client import Client

    self.notion_token = notion_token or os.environ.get("NOTION_TOKEN")
    self.notion_client = Client(auth=self.notion_token)

def list_all_users(self) -> List[dict]:
    r"""Lists all users via the Notion integration.

    Returns:
        List[dict]: A list of user objects with type, name, and workspace.
    """
    all_users_info: List[dict] = []
    cursor = None

    while True:
        response = cast(
            dict,
            self.notion_client.users.list(start_cursor=cursor),
        )
        all_users_info.extend(response["results"])

        if not response["has_more"]:
            break

        cursor = response["next_cursor"]

    formatted_users = [
        {
            "type": user["type"],
            "name": user["name"],
            "workspace": user.get(user.get("type"), {}).get(
                "workspace_name", ""
            ),
        }
        for user in all_users_info
    ]

    return formatted_users

def list_all_pages(self) -> List[dict]:
    r"""Lists all pages in the Notion workspace.

    Returns:
        List[dict]: A list of page objects with title and id.
    """
    all_pages_info: List[dict] = []
    cursor = None

    while True:
        response = cast(
            dict,
            self.notion_client.search(
                filter={"property": "object", "value": "page"},
                start_cursor=cursor,
            ),
        )
        all_pages_info.extend(response["results"])

        if not response["has_more"]:
            break

        cursor = response["next_cursor"]

    formatted_pages = [
        {
            "id": page.get("id"),
            "title": next(
                (
                    title.get("text", {}).get("content")
                    for title in page["properties"]
                    .get("title", {})
                    .get("title", [])
                    if title["type"] == "text"
                ),
                None,
            ),
        }
        for page in all_pages_info
    ]

    return formatted_pages

def get_notion_block_text_content(self, block_id: str) -> str:
    r"""Retrieves the text content of a Notion block.

    Args:
        block_id (str): The ID of the Notion block to retrieve.

    Returns:
        str: The text content of a Notion block, containing all
            the sub blocks.
    """
    blocks: List[dict] = []
    cursor = None

    while True:
        response = cast(
            dict,
            self.notion_client.blocks.children.list(
                block_id=block_id, start_cursor=cursor
            ),
        )
        blocks.extend(response["results"])

        if not response["has_more"]:
            break

        cursor = response["next_cursor"]

    block_text_content = " ".join(
        [self.get_text_from_block(sub_block) for sub_block in blocks]
    )

    return block_text_content

def get_text_from_block(self, block: dict) -> str:
    r"""Extracts plain text from a Notion block based on its type.

    Args:
        block (dict): A dictionary representing a Notion block.

    Returns:
        str: A string containing the extracted plain text and block type.
    """
    # Get rich text for supported block types
    if block.get(block.get("type"), {}).get("rich_text"):
        # Empty string if it's an empty line
        text = get_plain_text_from_rich_text(
            block[block["type"]]["rich_text"]
        )
    else:
        # Handle block types by case
        block_type = block.get("type")
        if block_type == "unsupported":
            text = "[Unsupported block type]"
        elif block_type == "bookmark":
            text = block["bookmark"]["url"]
        elif block_type == "child_database":
            text = block["child_database"]["title"]
            # Use other API endpoints for full database data
        elif block_type == "child_page":
            text = block["child_page"]["title"]
        elif block_type in ("embed", "video", "file", "image", "pdf"):
            text = get_media_source_text(block)
        elif block_type == "equation":
            text = block["equation"]["expression"]
        elif block_type == "link_preview":
            text = block["link_preview"]["url"]
        elif block_type == "synced_block":
            if block["synced_block"].get("synced_from"):
                text = (
                    f"This block is synced with a block with ID: "
                    f"""
                    {block['synced_block']['synced_from']
                    [block['synced_block']['synced_from']['type']]}
                    """
                )
            else:
                text = (
                    "Source sync block that another"
                    + "blocked is synced with."
                )
        elif block_type == "table":
            text = f"Table width: {block['table']['table_width']}"
            # Fetch children for full table data
        elif block_type == "table_of_contents":
            text = f"ToC color: {block['table_of_contents']['color']}"
        elif block_type in ("breadcrumb", "column_list", "divider"):
            text = "No text available"
        else:
            text = "[Needs case added]"

    # Query children for blocks with children
    if block.get("has_children"):
        text += self.get_notion_block_text_content(block["id"])

    return text

def get_tools(self) -> List[FunctionTool]:
    r"""Returns a list of FunctionTool objects representing the
    functions in the toolkit.

    Returns:
        List[FunctionTool]: A list of FunctionTool objects
            representing the functions in the toolkit.
    """
    return [
        FunctionTool(self.list_all_pages),
        FunctionTool(self.list_all_users),
        FunctionTool(self.get_notion_block_text_content),
    ]

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.