Confluence企业技术文档同步Dify工具
工具代码
import base64
import os
import re
import urllib.parse
import requests
from markdownify import markdownify as md
# -------------------------------
# Confluence 配置(通过环境变量传入)
# -------------------------------
CONFLUENCE_CONFIG = {
"api_token": os.getenv("CONFLUENCE_API_TOKEN"),
"email": os.getenv("CONFLUENCE_EMAIL"),
"url": os.getenv("CONFLUENCE_BASE_URL"), # e.g., https://your-domain.atlassian.net/wiki
"space_key": os.getenv("CONFLUENCE_SPACE_KEY", "").upper(),
"space_id": "",
"space_name": "",
}
# -------------------------------
# Dify 配置(通过环境变量传入)
# -------------------------------
DIFY_CONFIG = {
"api_key": os.getenv("DIFY_API_KEY"),
"endpoint": os.getenv("DIFY_API_ENDPOINT"), # e.g., https://api.dify.ai/v1
"dataset_id": "",
}
# 将 <![CDATA[...]]> 中的代码块格式化为 HTML <pre>
def convert_code_block(match):
content = match.group(2)
content = content.replace("<", "<").replace(">", ">")
return f"<pre>{content}</pre>"
# Confluence 特有 HTML 格式清洗,转为适合 markdownify 处理的 HTML
def pre_processing(text: str) -> str:
patterns = {
r'<ac:adf-attribute key=\"panel-type\">(.*?)</ac:adf-attribute>': r'<strong>\1:</strong>',
r'<ac:parameter ac:name=\"title\">(.*?)</ac:parameter>': r'<strong>\1</strong>',
r'<': r'\<', # 防止 HTML 被再次转义
r'>': r'\>',
r'<ac:parameter.*?/ac:parameter>': r'',
r'<ac:adf-content.*?/ac:adf-content>': r'',
r'<p>.*?</p><ac:structured-macro ac:name=\"toc': r'<ac:structured-macro ac:name=\"toc',
r'<ac:task-id.*?/ac:task-id>(\n)?': r'',
r'<ac:task-uuid.*?/ac:task-uuid>(\n)?': r'',
r'<ac:task-status>complete</ac:task-status>(\n)?': r'- [x] ',
r'<ac:task-status>incomplete</ac:task-status>(\n)?': r'- [ ] ',
r'</ac:task-body>\n': r'</ac:task-body>',
r'</ac:task>\n': r'</ac:task>',
}
for key, value in patterns.items():
text = re.sub(key, value, text)
text = re.sub(r"(<!\[CDATA\[)(.*?)(]]>)", convert_code_block, text, flags=re.DOTALL)
return text
# 将处理后的 HTML 转换为 Markdown,并附带页面来源链接
def clean_html_to_markdown(page: dict) -> str:
html = pre_processing(page["body"]["storage"]["value"])
markdown_content = md(html, heading_style="ATX")
title = page["title"].strip()
page_url = f"{CONFLUENCE_CONFIG['url']}{page['_links']['webui']}"
return f"# {title}\n\n{markdown_content}\n\n**Source:** [{title}]({page_url})"
# 获取指定空间的 Space ID 和名称
def get_confluence_space_id():
url = f"{CONFLUENCE_CONFIG['url']}/api/v2/spaces?limit=250"
auth = base64.b64encode(f"{CONFLUENCE_CONFIG['email']}:{CONFLUENCE_CONFIG['api_token']}".encode()).decode()
headers = {"Authorization": f"Basic {auth}", "Accept": "application/json"}
resp = requests.get(url, headers=headers)
resp.raise_for_status()
for space in resp.json().get("results", []):
if space["key"] == CONFLUENCE_CONFIG["space_key"]:
print(f"Found Space: {space['name']} ({space['id']})")
return space["id"], space["name"]
raise ValueError(f"Space key '{CONFLUENCE_CONFIG['space_key']}' not found.")
# 分页获取所有页面(默认每页 200)
def get_all_confluence_pages():
all_pages = []
url = f"{CONFLUENCE_CONFIG['url']}/api/v2/spaces/{CONFLUENCE_CONFIG['space_id']}/pages"
auth = base64.b64encode(f"{CONFLUENCE_CONFIG['email']}:{CONFLUENCE_CONFIG['api_token']}".encode()).decode()
headers = {"Authorization": f"Basic {auth}", "Accept": "application/json"}
params = {"limit": 200, "body-format": "storage", "status": "current"}
while True:
resp = requests.get(url, headers=headers, params=params)
resp.raise_for_status()
data = resp.json()
all_pages.extend(data.get("results", []))
# 判断是否还有下一页
next_link = data.get("_links", {}).get("next")
if next_link:
cursor = urllib.parse.parse_qs(urllib.parse.urlparse(next_link).query).get("cursor", [None])[0]
if cursor:
print(f"Next page with cursor: {cursor}")
params["cursor"] = cursor
else:
break
else:
break
return all_pages
# 获取已有的 Dify 数据集列表
def get_dify_datasets():
headers = {"Authorization": f"Bearer {DIFY_CONFIG['api_key']}"}
try:
resp = requests.get(f"{DIFY_CONFIG['endpoint']}/datasets", headers=headers)
resp.raise_for_status()
return resp.json().get("data", [])
except Exception as e:
print(f"Failed to get Dify datasets: {e}")
return []
# 根据空间名称查找或创建数据集
def get_dataset_id():
dataset_name = CONFLUENCE_CONFIG["space_name"]
for ds in get_dify_datasets():
if ds["name"].lower() == dataset_name.lower():
print(f"Dataset exists: {dataset_name} (ID: {ds['id']})")
return ds["id"]
return create_dify_dataset(dataset_name)
# 创建新的 Dify 数据集并启用内置元数据
def create_dify_dataset(name: str) -> str:
url = f"{DIFY_CONFIG['endpoint']}/datasets"
headers = {
"Authorization": f"Bearer {DIFY_CONFIG['api_key']}",
"Content-Type": "application/json"
}
payload = {
"name": name,
"description": f"Confluence Space - {name}",
"permission": "all_team_members",
"indexing_technique": "high_quality"
}
resp = requests.post(url, headers=headers, json=payload)
resp.raise_for_status()
ds = resp.json()
# 启用内置 metadata(可选)
meta_url = f"{DIFY_CONFIG['endpoint']}/datasets/{ds['id']}/metadata/built-in/enable"
requests.post(meta_url, headers=headers)
print(f"Created dataset: {ds['name']} (ID: {ds['id']})")
return ds["id"]
# 将一个文档推送到 Dify
def push_to_dify(doc_name: str, doc_text: str) -> bool:
url = f"{DIFY_CONFIG['endpoint']}/datasets/{DIFY_CONFIG['dataset_id']}/document/create-by-text"
headers = {
"Authorization": f"Bearer {DIFY_CONFIG['api_key']}",
"Content-Type": "application/json"
}
payload = {
"name": doc_name,
"text": doc_text,
"indexing_technique": "high_quality",
"doc_form": "hierarchical_model",
"process_rule": {
"mode": "hierarchical",
"rules": {
"pre_processing_rules": [{"id": "remove_extra_spaces", "enabled": True}],
"parent_mode": "full-doc",
"segmentation": {"max_tokens": 4000},
"subchunk_segmentation": {"separator": "\\n", "max_tokens": 4000}
}
}
}
try:
resp = requests.post(url, headers=headers, json=payload)
resp.raise_for_status()
return True
except requests.exceptions.RequestException as e:
err_msg = f"{e.response.status_code} {e.response.text}" if hasattr(e, 'response') and e.response else str(e)
print(f"Push failed: {err_msg}")
return False
# 删除某个 Dify 文档
def del_dify_doc(doc_id):
try:
resp = requests.delete(
f"{DIFY_CONFIG['endpoint']}/datasets/{DIFY_CONFIG['dataset_id']}/documents/{doc_id}",
headers={"Authorization": f"Bearer {DIFY_CONFIG['api_key']}"}
)
return resp.status_code == 200
except Exception as e:
print(f"Delete failed: {e}")
return False
# 获取 Dify 当前数据集的所有文档
def get_dify_docs():
docs = []
page = 1
while True:
try:
resp = requests.get(
f"{DIFY_CONFIG['endpoint']}/datasets/{DIFY_CONFIG['dataset_id']}/documents",
headers={"Authorization": f"Bearer {DIFY_CONFIG['api_key']}"},
params={"page": page, "limit": 100}
).json()
docs.extend(resp.get("data", []))
if not resp.get("has_more"):
break
page += 1
except Exception as e:
print(f"Failed to fetch Dify documents: {e}")
break
return docs
# 清理 Dify 中不再存在的文档(按标题名比对)
def clean_docs(confluence_titles):
print("\nCleaning outdated documents...")
confluence_set = {t.strip().lower() for t in confluence_titles}
dify_docs = get_dify_docs()
to_delete = [
doc for doc in dify_docs
if doc.get("name", "").strip().lower() not in confluence_set
]
for doc in to_delete:
if del_dify_doc(doc["id"]):
print(f"Deleted: {doc['name']}")
print(f"Removed {len(to_delete)} stale documents.")
# -------------------------------
# 主入口:同步流程
# -------------------------------
def main():
print("Start syncing Confluence to Dify...\n")
CONFLUENCE_CONFIG["space_id"], CONFLUENCE_CONFIG["space_name"] = get_confluence_space_id()
DIFY_CONFIG["dataset_id"] = get_dataset_id()
pages = get_all_confluence_pages()
print(f"Total pages retrieved: {len(pages)}")
filtered_pages = []
skipped, updated, failed = 0, 0, 0
for page in pages:
if not page.get("body", {}).get("storage", {}).get("value"):
print(f"Skipped empty: {page['title']}")
skipped += 1
continue
if page["_links"]["webui"].endswith("/overview"):
print(f"Skipped overview: {page['title']}")
skipped += 1
continue
filtered_pages.append(page)
print("\nUploading to Dify...")
for page in filtered_pages:
title = page["title"].strip()
content = clean_html_to_markdown(page)
if push_to_dify(title, content):
updated += 1
print(f"Uploaded: {title}")
else:
failed += 1
print(f"Failed: {title}")
print(f"\nSummary: {updated} uploaded, {skipped} skipped, {failed} failed.")
clean_docs({page["title"] for page in filtered_pages})
print("\nSync complete.")
if __name__ == "__main__":
main()
作为cornjob运行在k8s里
apiVersion: batch/v1
kind: CronJob
metadata:
name: sync-confluence-space-<space-key>
namespace: <your-namespace>
spec:
schedule: "5 8-20 * * *"
timeZone: Asia/Shanghai
jobTemplate:
spec:
template:
spec:
containers:
- name: sync-confluence
image: python
imagePullPolicy: IfNotPresent
workingDir: /sync
command: ["/bin/bash", "-c"]
args:
- |
pip install --no-cache-dir markdownify requests \
&& python -u sync-confluence.py
env:
- name: CONFLUENCE_SPACE_KEY
value: "<space-key>"
envFrom:
- secretRef:
name: confluence-sync-secret
volumeMounts:
- name: sync-script
mountPath: /sync
volumes:
- name: sync-script
configMap:
name: sync-confluence-script
defaultMode: 0777
restartPolicy: OnFailure
nodeSelector:
nodeGroup: <internal-node-group>