Kimi API 助手的氮气加速装置 —— 以 Golang 为例实践 Context Caching

进击的黑咻,15 min read

前言

众所周知,在我们的“Moonshot API 内测核心用户群”中,活跃着一个由 Kimi 开放平台驱动的机器人客服“Moonshot API 助手”(简称小助手),小助手自上线之日起,极大地解放了 Kimi 员工的生产力,帮助广大开发者解决 API 的各种疑难杂症,属实是居家旅行,开发调试的必备选择。

然而,小助手也不是完美的,群里的用户时常反馈,小助手的响应速度太慢了,经常问一个问题,需要 40s 甚至更久的时间才能获得响应,在面对复杂问题时,等待小助手的时间就已经超过了编写代码本身的时间,非常影响使用体验。

我们的长文本缓存功能(Context Caching)的第一个内测用户,就是 Moonshot API 助手,旨在解决小助手响应慢的问题,以更快地响应用户提出的问题。本期我们将使用 Golang 对 Moonshot API 助手进行改造,为其接入“氮气加速装置” ———— Context Caching。

小助手的架构设计

我们会简单地解释一下小助手的架构设计,并贴出关键代码,以便于大家更好地理解后续 Context Caching 的部分。

Moonshot API 助手没有使用任何 RAG 技术,其基于 Kimi 大模型长文本特性,将所有小助手需要掌握的知识全部置于 system prompt 中。它会将用户提出的问题作为新的 user prompt,通过 /chat/completions 接口获取回复。其接口以 Golang 的形式表示如下:

type Client interface {
	// CreateChatCompletion POST {{ $.Client.BaseUrl }}/chat/completions
	// Content-Type: application/json
	// Authorization: Bearer {{ $.Client.Key }}
	//
	// {{ $.request.ToJSON }}
	CreateChatCompletion(ctx context.Context, request *ChatCompletionRequest) (*Completion, error)
}

注:由于微信并不支持 stream,因此我们没有启用 stream 选项

其中,载入知识的部分,实现起来也非常简单,其步骤是:

  1. 获取文件内容;
  2. 通过 /files 接口上传文件;
  3. 通过 /files/{file_id} 接口获取文件内容;
  4. 将文件内容作为 system prompt;

其部分实现代码(简化版)如下:

func initKnowledge(ctx context.Context, client Client) (messages []*Message, err error) {
	file, err := os.Open("kimi-api-doc.md")
	if err != nil {
		return nil, err
	}
 
	// 事实上,每次都上传文件并抽取并不是一个合理合适的逻辑,我们建议在文件没有更新的场合,
	// 完成初次上传后,使用某种方式将抽取的文件内容缓存起来,并在下次调用 initKnowledge
	// 时直接使用缓存的文件内容;
	//
	// 你也可以存储文件 ID,并在下次调用 initKnowledge 时,使用 file_id 调用 retrieve
	// 接口获取已上传的文件内容;
	//
	// 这些方式能让你避免上传的文件数量超出我们的限制。
 
	uploadedFile, err := client.UploadFile(ctx, &UploadFileRequest{
		File:    file,
		Purpose: "file-extract",
	})
	if err != nil {
		return nil, err
	}
	fileContent, err := client.RetrieveFileContent(ctx, uploadedFile.ID)
	if err != nil {
		return nil, err
	}
	messages = []*Message{
		{Role: RoleSystem, Content: &Content{Text: string(fileContent)}},
	}
	return messages
}

在载入知识后,剩下的部分就是对 Kimi 大模型进行提问,其部分代码(简化版)如下:

func chat(ctx context.Context, input string) (output string, err error) {
	messages, err := initKnowledge(ctx, globalClient)
	if err != nil {
		return "", err
	}
	messages = append(messages, &Message{
		Role:    "user",
		Content: &Content{Text: input},
	})
	completion, err := client.CreateChatCompletion(ctx, &ChatCompletionRequest{
		Model:    "moonshot-v1-128k",
		Messages: messages,
	})
	if err != nil {
		return "", err
	}
	return completion.choices[0].Message.Content, nil
}

为小助手添加 Cache

缓存管理功能

缓存问题是计算机科学中最困难的两件事之一,因此为了更好地使用我们的缓存,我们需要对代码架构进行一些额外的设计,其中最先需要考虑的内容则是,我们需要一个额外的缓存管理模块;它不需要非常复杂,只需要实现基本的新增和查询即可,使用 Golang 表示的接口如下所示:

type ContextCacheManager interface {
	Set(ctx context.Context, key string, cache *ContextCache) error
	Get(ctx context.Context, key string) (*ContextCache, error)
}

我们推荐使用业务上的唯一标识作为缓存的 Key,在 Moonshot API 助手的场合,由于业务逻辑单一,我们就写死了一个 Key moonshot-api-doc

const knowledgeKey = "moonshot-api-doc-0624" // 后续我们会使用该 Key 来创建和获取 Cache

注:使用 Key 的意义在于,删除或重新创建缓存,会导致缓存 cache_id 发生变化,因此我们使用一个业务的 Key 来绑定 Kimi 大模型创建的 cache_id。

为了便于演示,我们使用 sqlite 作为我们缓存的存储介质(当然你可以选择其他缓存介质,例如 redis),使用 sqlite 搭配代码生成工具快速创建一个实现了 ContextCacheManager 接口的类型,如下所示(为了便于演示,将字段类型全部设置为 text):

//go:generate go run -mod=mod github.com/x5iu/defc generate --output=context_cache_manager.gen.go --features=sqlx/nort
type ContextCacheManager interface {
	// createTable Exec
	/*
		create table if not exists context_cache
		(
		    id    integer not null
					constraint context_cache_pk
						primary key autoincrement,
		    key            text    not null,
		    cache_id       text    not null,
			cache_status   text    not null,
			cache_messages text    not null
		);
	*/
	createTable(ctx context.Context) error
 
	// Set Exec arguments=args
	/*
		delete from context_cache where key = {{ $.args.Add $.key }};
		insert into context_cache (
			key,
			cache_id,
			cache_status,
			cache_messages
		) values (
			{{ $.args.Add $.key }},
			{{ $.args.Add $.cache.ID }},
			{{ $.args.Add $.cache.Status }},
			{{ $.args.Add $.cache.Messages }}
		);
	*/
	Set(ctx context.Context, key string, cache *ContextCache) error
 
	// Get Query One Const
	// select cache_id, cache_status, cache_messages from context_cache where key = ?;
	Get(ctx context.Context, key string) (*ContextCache, error)
}

顺带地,我们为 client 添加创建缓存和查询缓存的方法:

type Client interface {
	// CreateContextCache POST {{ $.Client.BaseUrl }}/caching
	// Content-Type: application/json
	// Authorization: Bearer {{ $.Client.Key }}
	//
	// {{ $.request.ToJSON }}
	CreateContextCache(ctx context.Context, request *CreateContextCacheRequest) (*ContextCache, error)
 
	// RetrieveContextCache GET {{ $.Client.BaseUrl }}/caching/{{ $.cacheID }}
	// Content-Type: application/json
	// Authorization: Bearer {{ $.Client.Key }}
	RetrieveContextCache(ctx context.Context, cacheID string) (*ContextCache, error)
}

改造 initKnowledge 函数

当我们拥有了缓存管理模块,我们就可以开始着手改造我们的 initKnowledge 函数了,我会将相关要点以注释的形式添加在代码中:

var createTableOnce sync.Once
 
func initKnowledge(
	ctx context.Context,
	client Client,
	manager ContextCacheManager, // 我们将缓存管理模块以参数的形式提供
) (messages []*Message, cacheID string, err error) { // 在返回值中额外添加了 cacheID,以表明当前知识是否已被缓存
	// 我们只需要创建一次数据库表
	createTableOnce.Do(func() {
		err = manager.createTable(ctx)
	})
	if err != nil {
		return nil, "", err
	}
 
	// 第一步,我们使用 knowledgeKey 尝试获取 sqlite 中存储的缓存,如果缓存存在,
	// 则会刷新一次缓存状态;当刷新后的缓存状态是 ready 时,我们会使用这个缓存 ID
	// 调用 `/chat/completions` 接口。
 
	var cache *ContextCache
	cache, err = manager.Get(ctx, knowledgeKey)
	if err != nil && !errors.Is(err, sql.ErrNoRows) {
		return nil, "", err
	}
 
	// 我们使用缓存 ID 来简单地判断缓存是否存在
	if cache.ID != "" {
		cache, err = client.RetrieveContextCache(ctx, cache.ID)
		if err != nil {
			return nil, "", err
		}
		if err = manager.Set(ctx, knowledgeKey, cache); err != nil {
			return nil, "", err
		}
 
		// 当缓存存在且状态为 ready 时,我们会将缓存的 messages 内容、缓存的 ID
		// 作为返回值返回,以便于下一步调用
		if cache.Status == cacheStatusReady {
			return cache.Messages, cache.ID, nil
		}
	}
 
	// 当缓存不存在或状态不为 ready 时,我们需要重新读取文件内容并创建缓存;如同上文所
	// 提到的,我们建议你以某种方式缓存抽取后的文件内容或文件 ID,以避免重复上传导致文
	// 件数量超出我们的上传限制。
 
	file, err := os.Open("kimi-api-doc.md")
	if err != nil {
		return nil, "", err
	}
	uploadedFile, err := client.UploadFile(ctx, &UploadFileRequest{
		File:    file,
		Purpose: "file-extract",
	})
	if err != nil {
		return nil, "", err
	}
	fileContent, err := client.RetrieveFileContent(ctx, uploadedFile.ID)
	if err != nil {
		return nil, "", err
	}
	messages = []*Message{
		{Role: RoleSystem, Content: &Content{Text: string(fileContent)}},
	}
 
	// 在这里,我们为了避免重复创建缓存,只当缓存不存在,或缓存状态为错误的场合,重新创建缓存
	if cache.ID == "" || cache.Status == cacheStatusError {
		cache, err = client.CreateContextCache(ctx, &CreateContextCacheRequest{
			Messages: messages,
			Model:    "moonshot-v1",
			TTL:      3600,
		})
		if err != nil {
			return nil, "", err
		}
		if err = manager.Set(ctx, knowledgeKey, cache); err != nil {
			return nil, "", err
		}
	}
 
	return messages, "", nil
}
 

这样我们就完成了 initKnowledge 的改造,其基本逻辑为:

  1. 先根据 Key 尝试获取已经存在的缓存;
  2. 如果获取到缓存,则更新其状态;
  3. 如果缓存状态为 ready,则使用该缓存;
  4. 如果查询不到缓存,或缓存状态不为 ready,则重新上传并抽取知识内容;
  5. 重新获取知识内容后,创建新的缓存,存入缓存管理模块;

改造 CreateChatCompletion 接口

在这个案例中,我们使用 HTTP Headers 的方式启用缓存,我们会基于 HTTP Headers 改造我们原先的 CreateChatCompletion 接口。在此之前,我们要讨论一下,在 Golang 的编程语言环境下,如何优雅地在不破坏原有代码的基础上,为 CreateChatCompletion 接口启用缓存。 如果我不想破坏原因的数据结构,也不想改动太多接口实现的代码,那么用什么样的方式可以简单方便地为一个接口添加额外参数呢?聪明的读者们肯定已经猜到了,Golang 为我们准备了 context.Context 这样的抓手,我们可以基于 context.Context 快速传递一个不确定是否存在的参数,因此我们需要写一些额外的代码来配置 context.Context

type ContextCacheOptions struct {
	CacheID  string
	ResetTTL int
}
 
type contextKeyCacheID struct{}
 
func withCacheOptions(ctx context.Context, options *ContextCacheOptions) context.Context {
	return context.WithValue(ctx, contextKeyCacheID{}, options)
}
 
func getCacheOptions(ctx context.Context) *ContextCacheOptions {
	cv := ctx.Value(contextKeyCacheID{})
	if cv == nil {
		return nil
	}
	options, ok := cv.(*ContextCacheOptions)
	if !ok {
		return nil
	}
	return options
}

在此之后,我们便可以用 withCacheOptionsgetCacheOptions 函数来传入和取出和 Context Cache 有关的配置。

接着,我们继续改造我们的 CreateChatCompletion 接口,本次改造的目的是在 HTTP Headers 中传入 X-Msh-Context-CacheX-Msh-Context-Cache-Reset-TTL 两个请求头(关于请求头的具体含义,请参照这里 (opens in a new tab))。我们配合代码生成工具改造我们的接口,最终变成:

type Client interface {
	// CreateChatCompletion POST {{ $.Client.BaseUrl }}/chat/completions
	// Content-Type: application/json
	// Authorization: Bearer {{ $.Client.Key }}
	// {{ $options := (get_cache_options $.ctx) -}}
	// {{ if $options }}X-Msh-Context-Cache: {{ $options.CacheID }}
	// {{ end }}{{ if $options }}{{ if $options.ResetTTL }}X-Msh-Context-Cache-Reset-TTL: {{ $options.ResetTTL }}
	// {{ end }}{{ end }}
	//
	// {{ $.request.ToJSON }}
	CreateChatCompletion(ctx context.Context, request *ChatCompletionRequest) (*Completion, error)
}

此时我们做到了在保证接口兼容性的同时,为 CreateChatCompletion 接口添加启用缓存功能。

至此,我们完成了小助手的 Cache 改造,为其配置了氮气加速装置,最终的使用方式是:

func chat(ctx context.Context, input string) (output string, err error) {
	messages, cacheID, err := initKnowledge(ctx, globalClient, globalCacheManager)
	if err != nil {
		return "", err
	}
 
	// 如果有可用的 cache_id,则将 ContextCacheOptions 传入 context.Context 中,
	// 从而使 CreateChatCompletion 接口能顺利携带与 cache 相关的 HTTP Headers
	if cacheID != "" {
		ctx = withCacheOptions(ctx, &ContextCacheOptions{
			CacheID:  cacheID,
			ResetTTL: 3600,
		})
	}
 
	messages = append(messages, &Message{
		Role:    "user",
		Content: &Content{Text: input},
	})
	completion, err := client.CreateChatCompletion(ctx, &ChatCompletionRequest{
		Model:    "moonshot-v1-128k",
		Messages: messages,
	})
	if err != nil {
		return "", err
	}
	return completion.choices[0].Message.Content, nil
}

可以看到,在实际的业务代码(即 chat 函数中),只需要在缓存可用时,将缓存信息注入 context.Context 中,即可无痛启用缓存,获得爆发式地响应速度提升,完全不需要修改其他逻辑,可谓是极佳的“微创手术”。

需要注意的是,这是一个简单业务场景下的最小实现 Context Caching 的方式,在实际的工程代码中,你可能需要更多地考虑:

  1. 如何维护业务类型与缓存的关系;
  2. 缓存的存储与更新周期;
  3. 缓存失效场合下的容错逻辑;
  4. 费用审计相关。

相关代码及代码示例中所涉及的 SDK 均可在我们的 Github (opens in a new tab) 中获取。

2024 © Moonshot AI用户中心文档