Confluence企业技术文档同步Dify工具

工具代码

import base64
import os
import re
import urllib.parse
import requests
from markdownify import markdownify as md

# -------------------------------
# Confluence 配置(通过环境变量传入)
# -------------------------------
CONFLUENCE_CONFIG = {
    "api_token": os.getenv("CONFLUENCE_API_TOKEN"),
    "email": os.getenv("CONFLUENCE_EMAIL"),
    "url": os.getenv("CONFLUENCE_BASE_URL"),  # e.g., https://your-domain.atlassian.net/wiki
    "space_key": os.getenv("CONFLUENCE_SPACE_KEY", "").upper(),
    "space_id": "",
    "space_name": "",
}

# -------------------------------
# Dify 配置(通过环境变量传入)
# -------------------------------
DIFY_CONFIG = {
    "api_key": os.getenv("DIFY_API_KEY"),
    "endpoint": os.getenv("DIFY_API_ENDPOINT"),  # e.g., https://api.dify.ai/v1
    "dataset_id": "",
}


# 将 <![CDATA[...]]> 中的代码块格式化为 HTML <pre>
def convert_code_block(match):
    content = match.group(2)
    content = content.replace("<", "<").replace(">", ">")
    return f"<pre>{content}</pre>"


# Confluence 特有 HTML 格式清洗,转为适合 markdownify 处理的 HTML
def pre_processing(text: str) -> str:
    patterns = {
        r'<ac:adf-attribute key=\"panel-type\">(.*?)</ac:adf-attribute>': r'<strong>\1:</strong>',
        r'<ac:parameter ac:name=\"title\">(.*?)</ac:parameter>': r'<strong>\1</strong>',
        r'<': r'\<',  # 防止 HTML 被再次转义
        r'>': r'\>',
        r'<ac:parameter.*?/ac:parameter>': r'',
        r'<ac:adf-content.*?/ac:adf-content>': r'',
        r'<p>.*?</p><ac:structured-macro ac:name=\"toc': r'<ac:structured-macro ac:name=\"toc',
        r'<ac:task-id.*?/ac:task-id>(\n)?': r'',
        r'<ac:task-uuid.*?/ac:task-uuid>(\n)?': r'',
        r'<ac:task-status>complete</ac:task-status>(\n)?': r'- [x] ',
        r'<ac:task-status>incomplete</ac:task-status>(\n)?': r'- [ ] ',
        r'</ac:task-body>\n': r'</ac:task-body>',
        r'</ac:task>\n': r'</ac:task>',
    }
    for key, value in patterns.items():
        text = re.sub(key, value, text)

    text = re.sub(r"(<!\[CDATA\[)(.*?)(]]>)", convert_code_block, text, flags=re.DOTALL)
    return text


# 将处理后的 HTML 转换为 Markdown,并附带页面来源链接
def clean_html_to_markdown(page: dict) -> str:
    html = pre_processing(page["body"]["storage"]["value"])
    markdown_content = md(html, heading_style="ATX")
    title = page["title"].strip()
    page_url = f"{CONFLUENCE_CONFIG['url']}{page['_links']['webui']}"
    return f"# {title}\n\n{markdown_content}\n\n**Source:** [{title}]({page_url})"


# 获取指定空间的 Space ID 和名称
def get_confluence_space_id():
    url = f"{CONFLUENCE_CONFIG['url']}/api/v2/spaces?limit=250"
    auth = base64.b64encode(f"{CONFLUENCE_CONFIG['email']}:{CONFLUENCE_CONFIG['api_token']}".encode()).decode()
    headers = {"Authorization": f"Basic {auth}", "Accept": "application/json"}

    resp = requests.get(url, headers=headers)
    resp.raise_for_status()

    for space in resp.json().get("results", []):
        if space["key"] == CONFLUENCE_CONFIG["space_key"]:
            print(f"Found Space: {space['name']} ({space['id']})")
            return space["id"], space["name"]

    raise ValueError(f"Space key '{CONFLUENCE_CONFIG['space_key']}' not found.")


# 分页获取所有页面(默认每页 200)
def get_all_confluence_pages():
    all_pages = []
    url = f"{CONFLUENCE_CONFIG['url']}/api/v2/spaces/{CONFLUENCE_CONFIG['space_id']}/pages"
    auth = base64.b64encode(f"{CONFLUENCE_CONFIG['email']}:{CONFLUENCE_CONFIG['api_token']}".encode()).decode()
    headers = {"Authorization": f"Basic {auth}", "Accept": "application/json"}
    params = {"limit": 200, "body-format": "storage", "status": "current"}

    while True:
        resp = requests.get(url, headers=headers, params=params)
        resp.raise_for_status()
        data = resp.json()
        all_pages.extend(data.get("results", []))

        # 判断是否还有下一页
        next_link = data.get("_links", {}).get("next")
        if next_link:
            cursor = urllib.parse.parse_qs(urllib.parse.urlparse(next_link).query).get("cursor", [None])[0]
            if cursor:
                print(f"Next page with cursor: {cursor}")
                params["cursor"] = cursor
            else:
                break
        else:
            break

    return all_pages


# 获取已有的 Dify 数据集列表
def get_dify_datasets():
    headers = {"Authorization": f"Bearer {DIFY_CONFIG['api_key']}"}
    try:
        resp = requests.get(f"{DIFY_CONFIG['endpoint']}/datasets", headers=headers)
        resp.raise_for_status()
        return resp.json().get("data", [])
    except Exception as e:
        print(f"Failed to get Dify datasets: {e}")
        return []


# 根据空间名称查找或创建数据集
def get_dataset_id():
    dataset_name = CONFLUENCE_CONFIG["space_name"]
    for ds in get_dify_datasets():
        if ds["name"].lower() == dataset_name.lower():
            print(f"Dataset exists: {dataset_name} (ID: {ds['id']})")
            return ds["id"]
    return create_dify_dataset(dataset_name)


# 创建新的 Dify 数据集并启用内置元数据
def create_dify_dataset(name: str) -> str:
    url = f"{DIFY_CONFIG['endpoint']}/datasets"
    headers = {
        "Authorization": f"Bearer {DIFY_CONFIG['api_key']}",
        "Content-Type": "application/json"
    }
    payload = {
        "name": name,
        "description": f"Confluence Space - {name}",
        "permission": "all_team_members",
        "indexing_technique": "high_quality"
    }
    resp = requests.post(url, headers=headers, json=payload)
    resp.raise_for_status()
    ds = resp.json()

    # 启用内置 metadata(可选)
    meta_url = f"{DIFY_CONFIG['endpoint']}/datasets/{ds['id']}/metadata/built-in/enable"
    requests.post(meta_url, headers=headers)

    print(f"Created dataset: {ds['name']} (ID: {ds['id']})")
    return ds["id"]


# 将一个文档推送到 Dify
def push_to_dify(doc_name: str, doc_text: str) -> bool:
    url = f"{DIFY_CONFIG['endpoint']}/datasets/{DIFY_CONFIG['dataset_id']}/document/create-by-text"
    headers = {
        "Authorization": f"Bearer {DIFY_CONFIG['api_key']}",
        "Content-Type": "application/json"
    }
    payload = {
        "name": doc_name,
        "text": doc_text,
        "indexing_technique": "high_quality",
        "doc_form": "hierarchical_model",
        "process_rule": {
            "mode": "hierarchical",
            "rules": {
                "pre_processing_rules": [{"id": "remove_extra_spaces", "enabled": True}],
                "parent_mode": "full-doc",
                "segmentation": {"max_tokens": 4000},
                "subchunk_segmentation": {"separator": "\\n", "max_tokens": 4000}
            }
        }
    }

    try:
        resp = requests.post(url, headers=headers, json=payload)
        resp.raise_for_status()
        return True
    except requests.exceptions.RequestException as e:
        err_msg = f"{e.response.status_code} {e.response.text}" if hasattr(e, 'response') and e.response else str(e)
        print(f"Push failed: {err_msg}")
        return False


# 删除某个 Dify 文档
def del_dify_doc(doc_id):
    try:
        resp = requests.delete(
            f"{DIFY_CONFIG['endpoint']}/datasets/{DIFY_CONFIG['dataset_id']}/documents/{doc_id}",
            headers={"Authorization": f"Bearer {DIFY_CONFIG['api_key']}"}
        )
        return resp.status_code == 200
    except Exception as e:
        print(f"Delete failed: {e}")
        return False


# 获取 Dify 当前数据集的所有文档
def get_dify_docs():
    docs = []
    page = 1
    while True:
        try:
            resp = requests.get(
                f"{DIFY_CONFIG['endpoint']}/datasets/{DIFY_CONFIG['dataset_id']}/documents",
                headers={"Authorization": f"Bearer {DIFY_CONFIG['api_key']}"},
                params={"page": page, "limit": 100}
            ).json()
            docs.extend(resp.get("data", []))
            if not resp.get("has_more"):
                break
            page += 1
        except Exception as e:
            print(f"Failed to fetch Dify documents: {e}")
            break
    return docs


# 清理 Dify 中不再存在的文档(按标题名比对)
def clean_docs(confluence_titles):
    print("\nCleaning outdated documents...")
    confluence_set = {t.strip().lower() for t in confluence_titles}
    dify_docs = get_dify_docs()

    to_delete = [
        doc for doc in dify_docs
        if doc.get("name", "").strip().lower() not in confluence_set
    ]

    for doc in to_delete:
        if del_dify_doc(doc["id"]):
            print(f"Deleted: {doc['name']}")
    print(f"Removed {len(to_delete)} stale documents.")


# -------------------------------
# 主入口:同步流程
# -------------------------------
def main():
    print("Start syncing Confluence to Dify...\n")
    CONFLUENCE_CONFIG["space_id"], CONFLUENCE_CONFIG["space_name"] = get_confluence_space_id()
    DIFY_CONFIG["dataset_id"] = get_dataset_id()

    pages = get_all_confluence_pages()
    print(f"Total pages retrieved: {len(pages)}")

    filtered_pages = []
    skipped, updated, failed = 0, 0, 0

    for page in pages:
        if not page.get("body", {}).get("storage", {}).get("value"):
            print(f"Skipped empty: {page['title']}")
            skipped += 1
            continue
        if page["_links"]["webui"].endswith("/overview"):
            print(f"Skipped overview: {page['title']}")
            skipped += 1
            continue
        filtered_pages.append(page)

    print("\nUploading to Dify...")
    for page in filtered_pages:
        title = page["title"].strip()
        content = clean_html_to_markdown(page)
        if push_to_dify(title, content):
            updated += 1
            print(f"Uploaded: {title}")
        else:
            failed += 1
            print(f"Failed: {title}")

    print(f"\nSummary: {updated} uploaded, {skipped} skipped, {failed} failed.")
    clean_docs({page["title"] for page in filtered_pages})
    print("\nSync complete.")


if __name__ == "__main__":
    main()

作为cornjob运行在k8s里

apiVersion: batch/v1
kind: CronJob
metadata:
  name: sync-confluence-space-<space-key>
  namespace: <your-namespace>
spec:
  schedule: "5 8-20 * * *"
  timeZone: Asia/Shanghai
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: sync-confluence
              image: python
              imagePullPolicy: IfNotPresent
              workingDir: /sync
              command: ["/bin/bash", "-c"]
              args:
                - |
                  pip install --no-cache-dir markdownify requests \
                  && python -u sync-confluence.py
              env:
                - name: CONFLUENCE_SPACE_KEY
                  value: "<space-key>"
              envFrom:
                - secretRef:
                    name: confluence-sync-secret
              volumeMounts:
                - name: sync-script
                  mountPath: /sync
          volumes:
            - name: sync-script
              configMap:
                name: sync-confluence-script
                defaultMode: 0777
          restartPolicy: OnFailure
          nodeSelector:
            nodeGroup: <internal-node-group>