跳到主要内容

如何构建可以回答有关您网站的问题的 AI

如何构建可以回答有关您网站的问题的 AI

本教程介绍了一个简单的网站抓取示例(在本示例中为 OpenAI 网站),使用 Embeddings API将抓取的页面转换为嵌入,然后创建一个基本的搜索功能,允许用户询问有关嵌入信息的问题. 这旨在成为使用自定义知识库的更复杂应用程序的起点。

入门

Python 和 GitHub 的一些基础知识对本教程很有帮助。在深入研究之前,请确保设置一个 OpenAI API 密钥并完成快速入门教程。这将为如何充分发挥 API 的潜力提供良好的直觉。

Python 与 OpenAI、Pandas、transformers、NumPy 和其他流行的程序包一起用作主要的编程语言。如果您在学习本教程时遇到任何问题,请在 OpenAI 社区论坛上提问。

要从代码开始,请在 GitHub 上克隆本教程的完整代码。或者,跟随并将每个部分复制到 Jupyter 笔记本中并逐步运行代码,或者只是阅读。避免任何问题的一个好方法是设置一个新的虚拟环境并通过运行以下命令安装所需的包:

python -m venv env

source env/bin/activate

pip install -r requirements.txt

设置网络爬虫

本教程的主要重点是 OpenAI API,因此如果您愿意,可以跳过有关如何创建网络爬虫的上下文并直接下载源代码。否则,请展开下面的部分以完成抓取机制的实施。

DALL-E: Coding a web crawling system pixel art

以文本形式获取数据是使用嵌入的第一步。本教程通过爬取 OpenAI 网站创建一组新数据,您也可以将这种技术用于您自己的公司或个人网站。

While this crawler is written from scratch, open source packages like Scrapy can also help with these operations.

This crawler will start from the root URL passed in at the bottom of the code below, visit each page, find additional links, and visit those pages as well (as long as they have the same root domain). To begin, import the required packages, set up the basic URL, and define a HTMLParser class.

import requests
import re
import urllib.request
from bs4 import BeautifulSoup
from collections import deque
from html.parser import HTMLParser
from urllib.parse import urlparse
import os

# Regex pattern to match a URL
HTTP_URL_PATTERN = r'^http[s]*://.+'

domain = "openai.com" # <- put your domain to be crawled
full_url = "https://openai.com/" # <- put your domain to be crawled with https or http

# Create a class to parse the HTML and get the hyperlinks
class HyperlinkParser(HTMLParser):
def __init__(self):
super().__init__()
# Create a list to store the hyperlinks
self.hyperlinks = []

# Override the HTMLParser's handle_starttag method to get the hyperlinks
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)

# If the tag is an anchor tag and it has an href attribute, add the href attribute to the list of hyperlinks
if tag == "a" and "href" in attrs:
self.hyperlinks.append(attrs["href"])

The next function takes a URL as an argument, opens the URL, and reads the HTML content. Then, it returns all the hyperlinks found on that page.

# Function to get the hyperlinks from a URL
def get_hyperlinks(url):

# Try to open the URL and read the HTML
try:
# Open the URL and read the HTML
with urllib.request.urlopen(url) as response:

# If the response is not HTML, return an empty list
if not response.info().get('Content-Type').startswith("text/html"):
return []

# Decode the HTML
html = response.read().decode('utf-8')
except Exception as e:
print(e)
return []

# Create the HTML Parser and then Parse the HTML to get hyperlinks
parser = HyperlinkParser()
parser.feed(html)

return parser.hyperlinks

The goal is to crawl through and index only the content that lives under the OpenAI domain. For this purpose, a function that calls the get_hyperlinks function but filters out any URLs that are not part of the specified domain is needed.

# Function to get the hyperlinks from a URL that are within the same domain
def get_domain_hyperlinks(local_domain, url):
clean_links = []
for link in set(get_hyperlinks(url)):
clean_link = None

# If the link is a URL, check if it is within the same domain
if re.search(HTTP_URL_PATTERN, link):
# Parse the URL and check if the domain is the same
url_obj = urlparse(link)
if url_obj.netloc == local_domain:
clean_link = link

# If the link is not a URL, check if it is a relative link
else:
if link.startswith("/"):
link = link[1:]
elif link.startswith("#") or link.startswith("mailto:"):
continue
clean_link = "https://" + local_domain + "/" + link

if clean_link is not None:
if clean_link.endswith("/"):
clean_link = clean_link[:-1]
clean_links.append(clean_link)

# Return the list of hyperlinks that are within the same domain
return list(set(clean_links))

The crawl function is the final step in the web scraping task setup. It keeps track of the visited URLs to avoid repeating the same page, which might be linked across multiple pages on a site. It also extracts the raw text from a page without the HTML tags, and writes the text content into a local .txt file specific to the page.

def crawl(url):
# Parse the URL and get the domain
local_domain = urlparse(url).netloc

# Create a queue to store the URLs to crawl
queue = deque([url])

# Create a set to store the URLs that have already been seen (no duplicates)
seen = set([url])

# Create a directory to store the text files
if not os.path.exists("text/"):
os.mkdir("text/")

if not os.path.exists("text/"+local_domain+"/"):
os.mkdir("text/" + local_domain + "/")

# Create a directory to store the csv files
if not os.path.exists("processed"):
os.mkdir("processed")

# While the queue is not empty, continue crawling
while queue:

# Get the next URL from the queue
url = queue.pop()
print(url) # for debugging and to see the progress

# Save text from the url to a <url>.txt file
with open('text/'+local_domain+'/'+url[8:].replace("/", "_") + ".txt", "w", encoding="UTF-8") as f:

# Get the text from the URL using BeautifulSoup
soup = BeautifulSoup(requests.get(url).text, "html.parser")

# Get the text but remove the tags
text = soup.get_text()

# If the crawler gets to a page that requires JavaScript, it will stop the crawl
if ("You need to enable JavaScript to run this app." in text):
print("Unable to parse page " + url + " due to JavaScript being required")

# Otherwise, write the text to the file in the text directory
f.write(text)

# Get the hyperlinks from the URL and add them to the queue
for link in get_domain_hyperlinks(local_domain, url):
if link not in seen:
queue.append(link)
seen.add(link)

crawl(full_url)

The last line of the above example runs the crawler which goes through all the accessible links and turns those pages into text files. This will take a few minutes to run depending on the size and complexity of your site.

构建嵌入索引

DALL-E:女人把一叠纸变成数字像素艺术

CSV 是存储嵌入的常用格式。您可以通过将原始文本文件(位于文本目录中)转换为 Pandas 数据帧来将此格式与 Python 结合使用。Pandas 是一个流行的开源库,可帮助您处理表格数据(存储在行和列中的数据)。

空白空行会使文本文件混乱并使它们更难处理。一个简单的函数可以删除这些行并整理文件。

def remove_newlines(serie):
serie = serie.str.replace('\n', ' ')
serie = serie.str.replace('\\n', ' ')
serie = serie.str.replace(' ', ' ')
serie = serie.str.replace(' ', ' ')
return serie

将文本转换为 CSV 需要循环访问之前创建的文本目录中的文本文件。打开每个文件后,删除多余的间距并将修改后的文本附加到列表中。然后,将删除了新行的文本添加到空的 Pandas 数据框中,并将数据框写入 CSV 文件。

额外的间距和新行会使文本混乱并使嵌入过程复杂化。此处使用的代码有助于删除其中一些字符,但您可能会发现第 3 方库或其他方法有助于删除更多不必要的字符。

import pandas as pd

# Create a list to store the text files
texts=[]

# Get all the text files in the text directory
for file in os.listdir("text/" + domain + "/"):

# Open the file and read the text
with open("text/" + domain + "/" + file, "r", encoding="UTF-8") as f:
text = f.read()

# Omit the first 11 lines and the last 4 lines, then replace -, _, and #update with spaces.
texts.append((file[11:-4].replace('-',' ').replace('_', ' ').replace('#update',''), text))

# Create a dataframe from the list of texts
df = pd.DataFrame(texts, columns = ['fname', 'text'])

# Set the text column to be the raw text with the newlines removed
df['text'] = df.fname + ". " + remove_newlines(df.text)
df.to_csv('processed/scraped.csv')
df.head()

将原始文本保存到 CSV 文件后的下一步是标记化。此过程通过分解句子和单词将输入文本拆分为标记。通过查看文档中的 Tokenizer 可以看到对此的可视化演示。

一个有用的经验法则是,对于普通英文文本,一个标记通常对应于 ~4 个字符的文本。这相当于大约 ¾ 个单词(因此 100 个标记 ~= 75 个单词)。

API 对嵌入的输入令牌(Token) 的最大数量有限制。要保持在限制以下,CSV 文件中的文本需要分成多行。将首先记录每一行的现有长度,以确定需要拆分哪些行。

import tiktoken

# Load the cl100k_base tokenizer which is designed to work with the ada-002 model
tokenizer = tiktoken.get_encoding("cl100k_base")

df = pd.read_csv('processed/scraped.csv', index_col=0)
df.columns = ['title', 'text']

# Tokenize the text and save the number of tokens to a new column
df['n_tokens'] = df.text.apply(lambda x: len(tokenizer.encode(x)))

# Visualize the distribution of the number of tokens per row using a histogram
df.n_tokens.hist()

嵌入直方图

最新的嵌入模型可以处理多达 8191 个输入标记的输入,因此大多数行不需要任何分块,但对于每个被抓取的子页面来说可能并非如此,因此下一个代码块会将较长的行拆分为较小的块。

max_tokens = 500

# Function to split the text into chunks of a maximum number of tokens
def split_into_many(text, max_tokens = max_tokens):

# Split the text into sentences
sentences = text.split('. ')

# Get the number of tokens for each sentence
n_tokens = [len(tokenizer.encode(" " + sentence)) for sentence in sentences]

chunks = []
tokens_so_far = 0
chunk = []

# Loop through the sentences and tokens joined together in a tuple
for sentence, token in zip(sentences, n_tokens):

# If the number of tokens so far plus the number of tokens in the current sentence is greater
# than the max number of tokens, then add the chunk to the list of chunks and reset
# the chunk and tokens so far
if tokens_so_far + token > max_tokens:
chunks.append(". ".join(chunk) + ".")
chunk = []
tokens_so_far = 0

# If the number of tokens in the current sentence is greater than the max number of
# tokens, go to the next sentence
if token > max_tokens:
continue

# Otherwise, add the sentence to the chunk and add the number of tokens to the total
chunk.append(sentence)
tokens_so_far += token + 1

return chunks


shortened = []

# Loop through the dataframe
for row in df.iterrows():

# If the text is None, go to the next row
if row[1]['text'] is None:
continue

# If the number of tokens is greater than the max number of tokens, split the text into chunks
if row[1]['n_tokens'] > max_tokens:
shortened += split_into_many(row[1]['text'])

# Otherwise, add the text to the list of shortened texts
else:
shortened.append( row[1]['text'] )

再次可视化更新后的直方图有助于确认行是否已成功拆分为缩短的部分。

df = pd.DataFrame(shortened, columns = ['text'])
df['n_tokens'] = df.text.apply(lambda x: len(tokenizer.encode(x)))
df.n_tokens.hist()

嵌入标记化输出

内容现在被分解成更小的块,可以向 OpenAI API 发送一个简单的请求,指定使用新的 text-embedding-ada-002 模型来创建嵌入:

import openai

df['embeddings'] = df.text.apply(lambda x: openai.Embedding.create(input=x, engine='text-embedding-ada-002')['data'][0]['embedding'])

df.to_csv('processed/embeddings.csv')
df.head()

这大约需要 3-5 分钟,但之后您就可以使用嵌入了!

使用嵌入构建问答系统

DALL-E:友好的机器人问答系统像素艺术

嵌入已准备就绪,此过程的最后一步是创建一个简单的问答系统。这将接受用户的问题,创建它的嵌入,并将其与现有嵌入进行比较,以从抓取的网站中检索最相关的文本。然后,text-davinci-003 模型将根据检索到的文本生成听起来自然的答案。


将嵌入转换为 NumPy 数组是第一步,考虑到在 NumPy 数组上运行的许多可用函数,这将在如何使用它方面提供更大的灵活性。它还会将维度展平为一维,这是许多后续操作所需的格式。

import numpy as np
from openai.embeddings_utils import distances_from_embeddings

df=pd.read_csv('processed/embeddings.csv', index_col=0)
df['embeddings'] = df['embeddings'].apply(eval).apply(np.array)

df.head()

现在数据已准备就绪,需要将问题转换为具有简单函数的嵌入。这很重要,因为嵌入搜索使用余弦距离比较数字向量(这是原始文本的转换)。这些向量可能相关,如果它们的余弦距离接近,则可能是问题的答案。OpenAI python 包有一个内置distances_from_embeddings函数,在这里很有用。

def create_context(
question, df, max_len=1800, size="ada"
):
"""
Create a context for a question by finding the most similar context from the dataframe
"""

# Get the embeddings for the question
q_embeddings = openai.Embedding.create(input=question, engine='text-embedding-ada-002')['data'][0]['embedding']

# Get the distances from the embeddings
df['distances'] = distances_from_embeddings(q_embeddings, df['embeddings'].values, distance_metric='cosine')


returns = []
cur_len = 0

# Sort by distance and add the text to the context until the context is too long
for i, row in df.sort_values('distances', ascending=True).iterrows():

# Add the length of the text to the current length
cur_len += row['n_tokens'] + 4

# If the context is too long, break
if cur_len > max_len:
break

# Else add it to the text that is being returned
returns.append(row["text"])

# Return the context
return "\n\n###\n\n".join(returns)

文本被分解成更小的标记集,因此按升序循环并继续添加文本是确保完整答案的关键步骤。如果返回的内容多于所需,也可以将 max_len 修改为更小的值。

上一步只检索了与问题语义相关的文本块,因此它们可能包含答案,但不能保证。通过返回前 5 个最有可能的结果,可以进一步增加找到答案的机会。

然后,回答提示将尝试从检索到的上下文中提取相关事实,以便形成连贯的答案。如果没有相关答案,提示将返回“我不知道”。

可以使用完成端点创建问题的真实答案text-davinci-003

def answer_question(
df,
model="text-davinci-003",
question="Am I allowed to publish model outputs to Twitter, without a human review?",
max_len=1800,
size="ada",
debug=False,
max_tokens=150,
stop_sequence=None
):
"""
Answer a question based on the most similar context from the dataframe texts
"""
context = create_context(
question,
df,
max_len=max_len,
size=size,
)
# If debug, print the raw model response
if debug:
print("Context:\n" + context)
print("\n\n")

try:
# Create a completions using the question and context
response = openai.Completion.create(
prompt=f"Answer the question based on the context below, and if the question can't be answered based on the context, say \"I don't know\"\n\nContext: {context}\n\n---\n\nQuestion: {question}\nAnswer:",
temperature=0,
max_tokens=max_tokens,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
stop=stop_sequence,
model=model,
)
return response["choices"][0]["text"].strip()
except Exception as e:
print(e)
return ""

完成了!一个工作的 Q/A 系统已经准备就绪,该系统具有从 OpenAI 网站嵌入的知识。可以进行一些快速测试以查看输出质量:

answer_question(df, question="What day is it?", debug=False)

answer_question(df, question="What is our newest embeddings model?")

answer_question(df, question="What is ChatGPT?")

响应将类似于以下内容:

"I don't know."

'The newest embeddings model is text-embedding-ada-002.'

'ChatGPT is a model trained to interact in a conversational way. It is able to answer followup questions, admit its mistakes, challenge incorrect premises, and reject inappropriate requests.'

如果系统无法回答预期的问题,则值得搜索原始文本文件以查看预期已知的信息是否最终被嵌入。最初完成的爬网过程被设置为跳过所提供的原始域之外的站点,因此如果有子域设置,它可能不知道这些信息。

目前,每次都传入数据框来回答问题。对于更多的生产工作流程,应该使用矢量数据库解决方案而不是将嵌入存储在 CSV 文件中,但当前的方法是原型制作的一个很好的选择。