图片
传统搜索 vs 智能知识库对比图
在信息爆炸的时代,我们每天需要处理大量文档资料。传统文件管理方式存在三大痛点:
1、检索低效:无法通过语义理解快速定位内容
2、知识孤立:文件间缺乏智能关联
3、安全风险:敏感数据上传云端存在泄露隐患
本文将手把手教你用DeepSeek大模型搭建完全本地化运行的智能知识库系统,支持PDF/Word/PPT等常见格式,实现'上传即学习,提问即解答'。
具体实现是借助Flask这一轻量型的Python Web框架搭建支持文件上传和流式对话的服务,达成用户与模型之间的智能问答的交互体验,并且阐释其工作流程与工作原理,便于您对本地知识库智能问答系统得以全面掌控。
二、系统核心功能展示图片
本地知识库智能问答系统首页
1、支持PPT、Word、PDF等文件资料上传
图片
用户上传文件资料示意图
用户点击【选择文件】-> 选择目标文件 -> 点击【上传文档】,上传成功后,如图所示提示。
2、由大模型来学习用户上传文件资料,支持用户和大模型进行对话交互
图片
从截图中可以看出,大模型的回复使用到了用户所传的资料。由此可知,我们基于DeepSeek构建的本地知识库系统已完成构建,它将是你工作和学习中最强得力助手。
3、核心功能清单如下:
✅ 多格式文档解析(PDF/Word/PPT/Markdown)✅ 智能语义检索(基于FAISS向量库)✅ 流式问答交互(类似ChatGPT体验)✅ 完全本地化部署(数据不出本地)✅ 参考上下文溯源(显示答案依据)三、系统技术架构与工作原理3.1 核心技术栈分析分层架构:该系统的技术栈是采用分层的方式,即前端层(HTML/JS/CSS) -> Flask API层 -> 文本处理层 -> 向量存储层 -> Ollama服务层 -> 大模型层。
图片
技术栈示意图
图片
核心架构分层图
关键技术点分解:
层级
技术组件
关键知识点
前端交互
JavaScript/SSE
- Server-Sent Events(SSE)实时流式通信- Markdown渲染与安全过滤(XSS防护)
Highlight.js/Marked
- 代码语法高亮实现- Markdown实时解析与渲染
后端服务
Flask框架
- RESTful API设计- 文件上传处理(uuid临时文件管理)
请求处理管道
- 请求验证与异常处理- 跨组件数据传递机制
文本处理
LangChain文本分块
- 递归字符分割器(RecursiveCharacterTextSplitter)- Markdown结构化解析
多格式支持
- PDF/DOCX/PPTX解析技术- MIME类型双重验证机制
向量处理
FAISS向量数据库
- 平面索引(IndexFlatL2)- 向量相似度搜索算法
Sentence-Transformers
all-MiniLM-L6-v2模型特征提取文本向量化批处理
大模型集成
Ollama服务
- 本地模型部署与管理- 流式生成API集成
DeepSeek-R1模型
- 14B参数大模型推理- 上下文窗口管理(2048 tokens)
安全与优化
(后续实现)
防御体系
- 文件类型白名单验证- 内容哈希校验(SHA256)
性能优化
- 向量索引持久化- 流式响应压缩传输
3.2、核心工作原理图解3.2.1 系统架构流程图图片
系统架构流程图
该图展示了系统的整体架构设计。前端界面通过HTTP请求与Flask后端服务交互,分为文件上传和用户提问两条主线。文件上传路径包含文件解析、文本分块、向量编码和FAISS向量数据库存储,核心是通过文本分块技术将文档切割为语义片段,再通过Sentence-BERT模型生成向量存入FAISS索引。用户提问路径则通过向量检索找到相关内容,构建包含上下文的Prompt,调用Ollama服务的DeepSeek大模型生成流式响应。关键技术包括FAISS的近似最近邻搜索和Ollama的流式API设计,前者通过向量空间索引加速检索,后者实现类似ChatGPT的实时交互体验。
3. 2.2 详细工作流程3.2.2.1 文件上传处理流程:图片
文件上传处理流程
此图详细描述了文件上传处理流程。从用户上传文件开始,系统依次进行格式验证、生成唯一ID、临时存储、MIME类型检测,再到文本解析和分块处理。核心在于双重验证机制(扩展名+MIME类型)保障安全性,以及递归式文本分块算法(RecursiveCharacterTextSplitter)的实现。分块时采用滑动窗口策略(512字符分块+50字符重叠),既保留语义连贯性,又避免信息割裂。向量编码阶段使用轻量级的all-MiniLM-L6-v2模型,在384维空间中对文本片段进行高效编码。
3.2.2.2 知识问答处理流程图片
知识问答处理流程
此流程图聚焦问答处理逻辑。用户问题经语义编码转换为向量后,FAISS通过IVF(反向文件索引)和PQ(乘积量化)技术快速定位相似文本片段。检索到的上下文通过Prompt模板(如'基于以下内容:{context},请回答:{question}')拼接后,触发大模型生成。流式响应采用分块传输编码(Chunked Transfer Encoding),配合前端Markdown渐进式渲染(通过Marked.js库)实现打字机效果。安全过滤层使用DOMPurify防止XSS攻击,参考上下文溯源功能通过元数据关联实现。
四、本地知识库智能问答系统构建全过程4.1 环境说明(部分关键组件)软件
配置及版本说明
Windows11
处理器:AMD Ryzen 7 7840H with Radeon 780M Graphics 3.80 GHz
内存:32.0 GB
显卡:16G
Ollama
最新版本0.7.1
Python
Python 3.11.9
PyCharm
社区版PyCharm 2024.1+
FAISS
向量数据库
Flask
Python Web服务框架
4.2 部署Deepseek-r1 14b大模型这部分可参考本人另外一篇文章:DeepSeek对话系统搭建指南:从 Ollama 部署到 Flask 交互系统构建,其中由详细的基于Windows11环境下如何部署deepseek-r1的整个过程,图文并茂。当然这里也给出最简单的部署方案,如下所示:
# 安装Ollama(Windows需管理员权限)curl -fsSL https://ollama.com/install.sh | sh# 下载DeepSeek模型ollama pull deepseek-r1:14b# 启动服务(保持终端运行)ollama serve4.3 Flask搭建前后端可交互系统Flask 是一个轻量级的 Python Web 框架,它易于上手和扩展。Flask 提供了简单的路由和请求处理机制,适合快速搭建小型 Web 应用。在本文中,我们将使用 Flask 来处理用户的请求,并与 Ollama 进行交互,实现用户上传文件资料和智能问答相关功能。具体实现上,则需要在PyCharm IDE中创建工程编写代码,具体步骤如下所示:
1、创建项目工程
考虑到能让阅读这篇文章的读者即便没有太多相关技术基础也能快速在本地构建这个系统,我在整个项目结构设计上就以最简单的方式来构建,项目结构如下:
deepseek_knowledge/├── ai/│ ├── all-MiniLM-L6-v2/ # 因网络原因,已将 all-MiniLM-L6-v2下载到本地│ │ ├──pytorch_model.bin # Markdown样式│ │ ├──config.json│ │ ├──…… # 因all-MiniLM-L6-v2下内容比较多,其他省略├── models/ # 文档处理与Faiss模块│ │ ├──__init__.py│ │ ├──document.py│ │ ├──faiss_wrapper.py├── static/│ ├── css/│ │ ├── github-markdown.min.css # Markdown样式│ │ └── github.min.css # highlight.js主题│ │ └── style.css # highlight.js主题│ └── js/│ ├── marked.min.js # Markdown解析│ ├── highlight.min.js # 代码高亮│ └── purify.min.js # 安全过滤│ └── script.min.js # 安全过滤└── templates/│ └── chat.html # 对话页面├── uploads/ # 文件上传时临时存储位置├── utils/ # 文件解析模块│ │ ├──__init__.py│ │ ├──file_parser.py │ │ ├──text_processor.py├── vector_storage/ # 向量化存储目录,后续会生成.index和.json文件├── app.py├── config.py └── README.md # 对该系统的一些说明或者介绍└── requirements.txt # 工程所有依赖及其版本声明处
按照上述项目结构,在PyCharm中逐一将对应的工程及其目录和文件创建出来,完成后如下图所示:
图片
项目工程结构示意图
2、添加相关依赖
打开requirements.txt文件,添加如下所示依赖:
flask==3.0.2python-dotenv==1.0.1PyPDF2==3.0.1python-docx==1.1.0python-pptx==0.6.23markdown==3.5.2sentence-transformers==2.3.1faiss-cpu==1.8.0ollama==0.1.3langchain-text-splitters==0.2.0python-magic-bin==0.4.14numpy==1.26.4uuid==1.30mysql-connector-pythonrequests3、编写models模块下两个核心文件代码
新建document.py文件,添加如下代码:import hashlibimport loggingfrom pathlib import Pathfrom typing import Dict, Listclass DocumentProcessor: ''' 文档元数据处理器 负责处理文档相关信息,生成文档元数据并计算内容哈希值, 用于唯一标识文档内容和跟踪文档变更。 ''' def __init__(self): # 初始化日志记录器 self.logger = logging.getLogger(__name__) def process_document(self, file_path: Path, user_id: str, chunks: List[str]) -> Dict: '''生成文档元数据 读取指定路径的文档,提取关键信息并生成元数据字典。 元数据包括用户ID、文件名、文件大小、内容哈希值和分块信息。 Args: file_path: 文档文件的路径 user_id: 上传文档的用户ID chunks: 文档内容分块后的字符串列表 Returns: 包含文档元数据的字典 Raises: Exception: 处理过程中出现错误时抛出异常 ''' try: # 以二进制模式读取文件内容 with open(file_path, 'rb') as f: content = f.read() # 返回包含文档元数据的字典 return { 'user_id': user_id, # 关联的用户ID 'file_name': file_path.name, # 原始文件名 'file_size': file_path.stat().st_size, # 文件大小(字节) 'content_hash': self._generate_hash(content), # 内容哈希值 'chunks': chunks, # 文档内容分块列表 'chunk_count': len(chunks) # 分块数量 } except Exception as e: # 记录错误日志并重新抛出异常 self.logger.error(f'Metadata generation failed: {str(e)}') raise def _generate_hash(self, content: bytes) -> str: '''生成SHA256内容指纹 计算给定二进制内容的SHA256哈希值,用于唯一标识文档内容。 相同内容将生成相同的哈希值,可用于检测文档是否变更。 Args: content: 二进制文件内容 Returns: SHA256哈希值的十六进制字符串表示 ''' return hashlib.sha256(content).hexdigest()新建faiss_wrapper.py,添加如下代码:from typing import List, Dictimport faissimport jsonimport numpy as npfrom sentence_transformers import SentenceTransformerfrom config import Configimport loggingclass VectorDB: '''FAISS向量数据库封装,实现文本向量化存储与语义检索''' def __init__(self): # 初始化文本编码器(384维向量) self.encoder = SentenceTransformer(Config.EMBEDDING_MODEL) # 创建L2距离的平面索引(适合中小规模数据) self.index = faiss.IndexFlatL2(384) # 文档元数据存储 {doc_id: {user_id, vector_range, metadata}} self.metadata = {} # 持久化存储路径 self._storage_path = Config.VECTOR_STORAGE / 'knowledge_base' # 加载已有数据 self._load_storage() self.logger = logging.getLogger(__name__) def add_document(self, user_id: str, doc_id: str, chunks: List[str], metadata: Dict): ''' 添加文档到向量数据库 :param user_id: 用户隔离标识 :param doc_id: 文档唯一ID :param chunks: 文本块列表 :param metadata: 包含文本块的元数据 ''' try: # 批量编码文本向量(shape: [n_chunks, 384]) vectors = self.encoder.encode(chunks) # 记录向量存储区间 start_idx = self.index.ntotal # 将向量添加到索引 self.index.add(np.array(vectors)) # 存储元数据 self.metadata[doc_id] = { 'user_id': user_id, 'vector_range': (start_idx, self.index.ntotal), # 左闭右开区间 'metadata': metadata } self._save_storage() except Exception as e: self.logger.error(f'向量存储失败: {str(e)}') raise def search(self, user_id: str, query: str, top_k=3) -> List[str]: ''' 语义相似度搜索 :param user_id: 用户隔离过滤 :param query: 查询文本 :param top_k: 返回结果数 :return: 匹配的文本块列表 ''' try: # 编码查询文本(shape: [1, 384]) query_vec = self.encoder.encode([query]) # FAISS搜索(返回距离和索引) distances, indices = self.index.search(query_vec, top_k*2) # 扩大搜索范围 results = [] # 遍历候选索引 for idx in indices[0]: # 检查每个文档的元数据 for doc_id, meta in self.metadata.items(): # 用户隔离验证 if meta['user_id'] != user_id: continue start, end = meta['vector_range'] # 索引范围验证 if start <= idx < end: # 计算块在文档内的偏移 chunk_idx = idx - start results.append(meta['metadata']['chunks'][chunk_idx]) # 去重后返回top_k结果 return results[:top_k] except Exception as e: self.logger.error(f'向量搜索失败: {str(e)}') return [] def _save_storage(self): '''持久化存储索引和元数据''' # FAISS索引二进制存储 faiss.write_index(self.index, str(self._storage_path.with_suffix('.index'))) # 元数据JSON存储 with open(self._storage_path.with_suffix('.json'), 'w') as f: json.dump(self.metadata, f, ensure_ascii=False) def _load_storage(self): '''加载已有存储数据''' index_file = self._storage_path.with_suffix('.index') meta_file = self._storage_path.with_suffix('.json') if index_file.exists(): # 加载FAISS索引 self.index = faiss.read_index(str(index_file)) if meta_file.exists(): # 加载元数据 with open(meta_file) as f: self.metadata = json.load(f)
核心功能解析:
向量化存储:使用sentence-transformers/all-MiniLM-L6-v2模型将文本块编码为384维向量,采用FAISS的IndexFlatL2索引,适合精确的L2距离计算,批量添加支持(单次添加文档的全部文本块向量)。元数据管理:记录每个文档的向量存储区间(start_idx, end_idx),并采用用户隔离机制(通过user_id过滤搜索结果)和持久化存储(索引存为.index,元数据存为.json)。语义搜索:采用两阶段搜索(即先找top_k*2候选,再过滤用户数据)和范围验证(即确保结果索引属于有效区间)以及结果去重(即通过列表转换字典自动去重)。性能优化:采用批量编码(单次处理文档全部chunks)和内存映射(FAISS支持大文件内存映射加载)以及安全存储(即JSON序列化保证元数据可读性)。4、编写utils模块下两个核心文件代码
新建file_parser.py文件,添加如下代码:from pathlib import Pathimport magicimport loggingfrom PyPDF2 import PdfReaderfrom docx import Documentimport pptximport markdownclass FileParser: '''支持MIME验证的多格式文件解析器,实现安全可靠的文件内容提取''' def __init__(self): # 初始化MIME类型检测器(基于文件二进制内容) self.mime = magic.Magic(mime=True) self.logger = logging.getLogger(__name__) def parse(self, file_path: Path, ext: str) -> str: ''' 主解析方法(入口点) :param file_path: 文件路径对象 :param ext: 文件扩展名(小写不带点) :return: 提取的纯文本内容 ''' # 文件存在性验证 if not file_path.exists(): raise FileNotFoundError(f'文件不存在: {file_path}') # 检测真实MIME类型(比扩展名更可靠) true_type = self.mime.from_file(str(file_path)) # 双重验证:扩展名与MIME类型匹配 if not self._validate_type(ext, true_type): raise ValueError(f'文件类型不匹配: 扩展名{ext} vs 实际类型{true_type}') # 分派到具体解析器 try: if ext == 'pdf': return self._parse_pdf(file_path) elif ext == 'docx': return self._parse_docx(file_path) # ...其他格式处理... except Exception as e: self.logger.error(f'解析失败: {str(e)}') raise def _validate_type(self, ext: str, mime_type: str) -> bool: ''' 文件类型双重验证 :param ext: 文件扩展名(小写不带点) :param mime_type: 检测到的MIME类型 :return: 是否通过验证 ''' type_map = { 'pdf': 'application/pdf', # PDF标准类型 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'txt': 'text/plain', # 纯文本类型 'md': 'text/markdown' # Markdown类型 } # 验证扩展名与MIME类型映射关系 return ext in type_map and type_map[ext] == mime_type def _parse_pdf(self, path: Path) -> str: '''PDF解析实现:逐页提取文本''' text = [] with open(path, 'rb') as f: # 二进制模式读取 reader = PdfReader(f) for page in reader.pages: text.append(page.extract_text() or '') # 处理空页面 return '\n'.join(text) def _parse_docx(self, path: Path) -> str: '''DOCX解析实现:提取段落文本''' doc = Document(path) return '\n'.join([p.text for p in doc.paragraphs if p.text.strip()]) def _parse_pptx(self, path: Path) -> str: '''PPTX解析实现:提取幻灯片文本''' prs = pptx.Presentation(path) text = [] for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, 'text') and shape.text.strip(): text.append(shape.text) return '\n'.join(text) def _parse_txt(self, path: Path) -> str: '''纯文本文件解析''' with open(path, 'r', encoding='utf-8') as f: return f.read() def _parse_markdown(self, path: Path) -> str: '''Markdown解析:转换为纯文本''' with open(path, 'r', encoding='utf-8') as f: # 保留原始Markdown格式(或转换为HTML) return f.read() # 或 return markdown.markdown(f.read())新建text_processor.py文件,添加如下代码:from typing import Listfrom langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitterfrom langchain_text_splitters import Languageimport reimport loggingfrom config import Configclass TextProcessor: '''智能文本分块处理器 根据文件类型和内容特征,将文本分割成适当大小的块, 优化后续处理(如向量嵌入、索引构建)的效果。 ''' def __init__(self): # 初始化基础文本分割器 - 按字符递归分割 self.base_splitter = RecursiveCharacterTextSplitter( chunk_size=Config.CHUNK_SIZE, # 每个块的最大字符数 chunk_overlap=Config.CHUNK_OVERLAP, # 块之间的重叠字符数 # 优先使用的分隔符列表(按优先级降序) separators=['\n\n', '\n', '。', '!', '?', ';', '……', '…', '.', ' '] ) # 初始化Markdown专用分割器 - 按标题层级分割 self.markdown_splitter = MarkdownHeaderTextSplitter( headers_to_split_on=[('#', 'Header 1'), ('##', 'Header 2')] # 按一级和二级标题分割 ) self.logger = logging.getLogger(__name__) def process(self, text: str, file_type: str) -> List[str]: '''根据文件类型处理文本并分割成块 Args: text: 原始文本内容 file_type: 文件类型(如'md', 'py', 'java', 'js'等) Returns: 分割后的文本块列表 Raises: Exception: 处理失败时返回原始文本作为单个块 ''' try: # 清理文本(去除控制字符、统一换行符等) cleaned = self._clean_text(text) # 根据文件类型选择不同的分割策略 if file_type == 'md': return self._process_markdown(cleaned) elif file_type in ['py', 'java', 'js']: return self._process_code(cleaned, file_type) else: # 通用文本使用基础分割器 return self.base_splitter.split_text(cleaned) except Exception as e: # 出错时记录警告并返回原始文本作为单个块 self.logger.warning(f'Chunking failed: {str(e)}') return [text] def _clean_text(self, text: str) -> str: '''清理文本内容 Args: text: 原始文本 Returns: 清理后的文本 ''' # 将Windows风格换行符转换为Unix风格 text = re.sub(r'\r\n', '\n', text) # 移除不可见控制字符 text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) return text.strip() def _process_markdown(self, text: str) -> List[str]: '''处理Markdown文本,按标题层级分割 Args: text: 清理后的Markdown文本 Returns: 分割后的文本块列表 ''' try: # 使用Markdown标题分割器 chunks = self.markdown_splitter.split_text(text) # 在每个块前添加元数据(标题信息) return [f'#{chunk.metadata} {chunk.page_content}' for chunk in chunks] except: # 出错时回退到基础分割器 return self.base_splitter.split_text(text) def _process_code(self, text: str, lang: str) -> List[str]: '''处理代码文本,保留代码结构完整性 Args: text: 清理后的代码文本 lang: 编程语言类型 Returns: 分割后的代码块列表 ''' # 语言映射表 lang_map = { 'py': Language.PYTHON, 'js': Language.JS, 'java': Language.JAVA } # 根据语言类型创建专用分割器 splitter = RecursiveCharacterTextSplitter.from_language( language=lang_map.get(lang, Language.PYTHON), # 默认使用Python分割器 chunk_size=Config.CHUNK_SIZE, chunk_overlap=Config.CHUNK_OVERLAP ) # 分割代码文本 return splitter.split_text(text)
5、下载前端页面所需js和css文件以及自定义style.css
为了页面样式好看些和能以流式方式进行对话,需要下载如项目结构图中所示的js和css文件以及自定义style.css(这部分可参考本人另外一篇文章:DeepSeek对话系统搭建指南:从 Ollama 部署到 Flask 交互系统构建),将这些文件按照类型分别复制粘贴到工程项目下static/js和static/css文件夹下。
6、编写自定义script.js文件
打开static/js/script.js文件,添加如下内容:
// 初始化配置marked.setOptions({ breaks: true, highlight: function(code, lang) { return hljs.highlightAuto(code).value; }});const sanitizeConfig = { ALLOWED_TAGS: ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'pre', 'code', 'blockquote', 'ul', 'ol', 'li', 'strong', 'em', 'a', 'img', 'br'], ALLOWED_ATTR: ['href', 'src', 'alt', 'title']};let isGenerating = false;let currentMessage = null;// 安全渲染函数function renderMarkdown(content) { const rawHtml = marked.parse(content); return DOMPurify.sanitize(rawHtml, sanitizeConfig);}// 创建消息元素function createMessage(text, isUser) { const div = document.createElement('div'); div.className = `message ${isUser ? 'user' : 'bot'}`; if (isUser) { div.innerHTML = ` <div class='message-header'>您</div> <div class='message-content'>${text}</div> `; } else { div.innerHTML = ` <div class='message-header'>DeepSeek</div> <div class='message-content markdown-body'>${renderMarkdown(text)}</div> `; // 触发代码高亮 div.querySelectorAll('pre code').forEach(hljs.highlightElement); } return div;}// 消息管理function appendMessage(text, isUser) { const chatBox = document.getElementById('chat-box'); const message = createMessage(text, isUser); chatBox.appendChild(message); chatBox.scrollTop = chatBox.scrollHeight; return message;}// 发送消息逻辑document.getElementById('send-btn').addEventListener('click', sendMessage);document.getElementById('input').addEventListener('keydown', (e) => { if (e.key === 'Enter' && !e.shiftKey && !isGenerating) { e.preventDefault(); sendMessage(); }});async function sendMessage() { if (isGenerating) return; const input = document.getElementById('input'); const question = input.value.trim(); if (!question) return; input.value = ''; isGenerating = true; document.getElementById('send-btn').disabled = true; document.getElementById('loading').style.display = 'block'; const userMessage = appendMessage(question, true); currentMessage = appendMessage('', false); try { const response = await fetch('/api/query', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ user_id: 'test_user', question: question }) }); const reader = response.body.getReader(); let buffer = ''; let contextDisplayed = false; while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = new TextDecoder('utf-8').decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.includes('stream_end')) continue; try { const payload = JSON.parse(line.replace('data: ', '')); if (payload.error) { currentMessage.querySelector('.message-content').innerHTML = `<span style='color:red'>错误:${payload.error}</span>`; break; } else if (payload.response) { buffer += payload.response; currentMessage.querySelector('.message-content').innerHTML = renderMarkdown(buffer); hljs.highlightAll(); if (payload.done && !contextDisplayed) { const context = payload.context.map(c => `<li>${c}</li>`).join(''); const contextHtml = ` <details> <summary>查看参考上下文</summary> <ul>${context}</ul> </details> `; currentMessage.querySelector('.message-content').innerHTML += contextHtml; contextDisplayed = true; } } } catch (err) { console.warn('解析异常:', err); } } } } catch (error) { currentMessage.querySelector('.message-content').innerHTML = `<span style='color:red'>连接错误:${error}</span>`; } finally { finalizeMessage(); }}function finalizeMessage() { isGenerating = false; document.getElementById('send-btn').disabled = false; document.getElementById('loading').style.display = 'none'; currentMessage = null;}
7、下载ai模块中的all-MiniLM-L6-v2库
由于该模块内容比较多且网上很多地方无法下载,故特别将其下载地址分享,如下所示:
https://public.ukp.informatik.tu-darmstadt.de/reimers/sentence-transformers/v0.2/all-MiniLM-L6-v2.zip下载完成后,将其解压,并整个文件夹all-MiniLM-L6-v2复制到ai目录下。
8、编写config.py文件代码
打开config.py文件,添加如下代码,该文件主要作用是配置全局的一些常量:
from pathlib import Pathimport osclass Config: # 路径配置 BASE_DIR = Path(__file__).parent.absolute() UPLOAD_FOLDER = BASE_DIR / 'uploads' VECTOR_STORAGE = BASE_DIR / 'vector_storage' # 文件处理 ALLOWED_EXTENSIONS = {'pdf', 'docx', 'pptx', 'txt', 'md', 'py'} MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 100MB # 模型参数 EMBEDDING_MODEL = 'D:/PycharmProjects/deepseek_knowledge/ai/all-MiniLM-L6-v2' CHUNK_SIZE = 512 CHUNK_OVERLAP = 50 # Ollama配置 OLLAMA_ENDPOINT = 'http://localhost:11434' OLLAMA_MODEL = 'deepseek-r1:14b' OLLAMA_TIMEOUT = 30 # MySQL配置 MYSQL_HOST = 'localhost' MYSQL_USER = 'root' MYSQL_PASSWORD = 'openGauss@1234' MYSQL_DATABASE = 'knowledge' # 上传文档的用户ID,先固定,后续在动态获取 USER_ID = 'test_user' # 密钥 SECRET_KEY = os.urandom(24) # 初始化目录 os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(VECTOR_STORAGE, exist_ok=True)
9、编写app.py文件代码
打开app.py文件,添加如下代码:
import jsonfrom flask import Flask, render_template, request, Response, jsonifyimport requestsfrom pathlib import Pathfrom config import Configfrom models.document import DocumentProcessorfrom models.faiss_wrapper import VectorDBfrom utils.file_parser import FileParserfrom utils.text_processor import TextProcessorimport uuidimport loggingimport ollamaapp = Flask(__name__, static_folder='static', static_url_path='/static')app.config.from_object(Config)# 初始化组件vector_db = VectorDB()file_parser = FileParser()doc_processor = DocumentProcessor()text_processor = TextProcessor()# 配置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)# Ollama配置OLLAMA_API = 'http://localhost:11434/api/generate'MODEL_NAME = 'deepseek-r1:14b'def format_stream_data(chunk): '''标准化数据格式''' try: data = json.loads(chunk) return json.dumps({ 'response': data.get('response', ''), 'done': data.get('done', False) }) except json.JSONDecodeError: return json.dumps({'error': 'Invalid JSON format'})@app.route('/')def index(): return render_template('chat.html')@app.route('/chat', methods=['GET'])def stream_chat(): prompt = request.args.get('prompt', '') def event_stream(): try: response = requests.post( OLLAMA_API, json={ 'model': MODEL_NAME, 'prompt': prompt, 'stream': True, 'options': { 'temperature': 0.7, 'num_ctx': 2048 } }, stream=True, timeout=30 ) response.raise_for_status() buffer = [] for line in response.iter_lines(): if line: # 标准化数据格式 formatted = format_stream_data(line) yield f'data: {formatted}\n\n' except Exception as e: error_msg = {'error': f'服务异常: {str(e)}'} yield f'data: {json.dumps(error_msg)}\n\n' finally: yield 'event: end\ndata: stream_end\n\n' return Response(event_stream(), mimetype='text/event-stream')@app.route('/api/upload', methods=['POST'])def handle_upload(): '''处理文件上传''' if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] user_id = Config.USER_ID if not file or file.filename == '': return jsonify({'error': 'Empty filename'}), 400 try: # 生成唯一ID并保留原始扩展名 file_id = str(uuid.uuid4()) original_filename = file.filename original_ext = Path(original_filename).suffix.lower() # 构建带扩展名的临时文件路径 temp_path = Config.UPLOAD_FOLDER / f'{file_id}{original_ext}' file.save(temp_path) # 打印文件信息用于调试 print(f'文件路径: {temp_path}') print(f'文件大小: {temp_path.stat().st_size} 字节') print(f'原始文件名: {original_filename}') print(f'保存的文件名: {temp_path.name}') print(f'文件扩展名: {original_ext}') # 从路径获取扩展名(此时应该可靠) file_type = temp_path.suffix.lower()[1:] if not file_type: file_type = 'unknown' # 处理极端情况(应该不会发生) # 处理流水线 print('开始解析文件内容...') raw_text = file_parser.parse(temp_path, original_ext) print(f'成功提取文本,长度: {len(raw_text)} 字符') chunks = text_processor.process(raw_text, file_type) doc_meta = doc_processor.process_document(temp_path, user_id, chunks) # 向量存储 vector_db.add_document( user_id=user_id, doc_id=file_id, chunks=chunks, metadata=doc_meta ) return jsonify({ 'status': 'success', 'document_id': file_id, 'original_filename': original_filename, 'chunk_count': len(chunks) }) except Exception as e: logger.error(f'Upload failed: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 finally: if temp_path.exists(): temp_path.unlink()@app.route('/api/query', methods=['POST'])def handle_query(): '''处理知识查询''' data = request.get_json() user_id = Config.USER_ID question = data.get('question', '') if not question.strip(): return jsonify({'error': 'Empty question'}), 400 try: # 检索上下文 context_chunks = vector_db.search(user_id, question, top_k=3) # 构建prompt prompt = f'Context:\n{' '.join(context_chunks)}\n\nQuestion: {question}' print(f'构建prompt:\n{prompt}') def event_stream(): client = ollama.Client(host=Config.OLLAMA_ENDPOINT) try: for part in client.generate( model=Config.OLLAMA_MODEL, prompt=prompt, options={'temperature': 0.3}, stream=True ): formatted = json.dumps({ 'response': part.get('response', ''), 'done': part.get('done', False), 'context': context_chunks }) yield f'data: {formatted}\n\n' except Exception as e: error_msg = {'error': f'服务异常: {str(e)}'} yield f'data: {json.dumps(error_msg)}\n\n' finally: yield 'event: end\ndata: stream_end\n\n' return Response(event_stream(), mimetype='text/event-stream') except ollama.ResponseError as e: logger.error(f'Model error: {e.error}') return jsonify({'error': 'Model service unavailable'}), 503 except Exception as e: logger.error(f'Query failed: {str(e)}') return jsonify({'error': str(e)}), 500if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)10、前端页面 (chat.html)编码
打开chat.html文件,添加如下代码:
<!DOCTYPE html><html><head> <meta charset='UTF-8'> <title>DeepSeek本地知识库系统</title> <link rel='stylesheet' href='/static/css/github-markdown.min.css'> <link rel='stylesheet' href='/static/css/github.min.css'> <link rel='stylesheet' href='/static/css/style.css'> <style> .section { margin-bottom: 30px; border: 1px solid #eee; padding: 20px; } .result { margin-top: 20px; padding: 15px; background: #f8f9fa; } </style></head><body> <div class='container'> <h1>DeepSeek本地知识库系统</h1> <div class='section'> <h2>文件上传</h2> <form id='uploadForm'> <input type='file' name='file' required> <button type='submit'>上传文档</button> </form> <div id='uploadResult' class='result'></div> </div> <h2>知识问答</h2> <div id='chat-box' class='chat-box'></div> <div class='input-area'> <textarea id='input' placeholder='输入您的问题(Shift+Enter换行)...'></textarea> <button id='send-btn'>发送</button> </div> <div id='loading' class='loading-indicator'> <div class='typing-dots'> <div class='dot'></div> <div class='dot'></div> <div class='dot'></div> </div> </div> </div> <!-- 本地依赖库 --> <script src='/static/js/marked.min.js'></script> <script src='/static/js/highlight.min.js'></script> <script src='/static/js/purify.min.js'></script> <script src='/static/js/script.js'></script> <script> // 文件上传处理 document.getElementById('uploadForm').onsubmit = async (e) => { e.preventDefault(); const formData = new FormData(); formData.append('file', e.target.file.files[0]); const resultDiv = document.getElementById('uploadResult'); resultDiv.innerHTML = '上传中...'; try { const response = await fetch('/api/upload', { method: 'POST', body: formData }); const data = await response.json(); resultDiv.innerHTML = `上传成功!文档名称: ${data.original_filename}`; } catch (error) { resultDiv.innerHTML = `上传失败: ${error}`; } }; </script></body></html>
到此,基于Flask框架搭建的本地知识库智能问答系统就完成了。
五、集成测试与验证现在需要将Ollama、Deepseek、Flask集成联调测试,验证整个对话系统是否正常工作。具体按照如下步骤的顺序进行:
1、检查并验证Ollama服务是否启动,如没有启动需要先启动,启动方式就是双击桌面上的Ollama程序图标即可,一旦启动成功,该服务会占用11434端口,可以执行cmd查看:
图片
查看Ollama运行端口示意图
2、启动Flask程序,点击PyCharm中的绿色三角按钮运行即可,正常运行如下截图所示:
图片
程序正常启动示意图
3、打开浏览器,输入:http://127.0.0.1:5000,正常情况下会看到一个对话框,如下图所示:
图片
本地知识库智能问答系统首页示意图
4、上传文件资料,分别点击【选择文件】按钮选择本地文件后,点击【上传文档】,正常如下图所示:
图片
上传文档成功后的示意图
此时,PyCharm控制台会看到如下日志输出:
图片
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报。下一篇:没有了