chore: implement memo property runner

This commit is contained in:
Steven 2024-08-20 08:07:48 +08:00
parent f4d6675363
commit d1280bc04f
6 changed files with 216 additions and 131 deletions

View file

@ -6,7 +6,6 @@ import (
"context"
"fmt"
"log/slog"
"slices"
"time"
"unicode/utf8"
@ -28,6 +27,7 @@ import (
"github.com/usememos/memos/plugin/webhook"
v1pb "github.com/usememos/memos/proto/gen/api/v1"
storepb "github.com/usememos/memos/proto/gen/store"
memoproperty "github.com/usememos/memos/server/runner/memo_property"
"github.com/usememos/memos/store"
)
@ -60,8 +60,9 @@ func (s *APIV1Service) CreateMemo(ctx context.Context, request *v1pb.CreateMemoR
}
if len(create.Content) > contentLengthLimit {
return nil, status.Errorf(codes.InvalidArgument, "content too long (max %d characters)", contentLengthLimit)
}
property, err := getMemoPropertyFromContent(create.Content)
property, err := memoproperty.GetMemoPropertyFromContent(create.Content)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get memo property: %v", err)
}
@ -247,7 +248,7 @@ func (s *APIV1Service) UpdateMemo(ctx context.Context, request *v1pb.UpdateMemoR
}
update.Content = &request.Memo.Content
property, err := getMemoPropertyFromContent(*update.Content)
property, err := memoproperty.GetMemoPropertyFromContent(*update.Content)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get memo property: %v", err)
}
@ -610,7 +611,7 @@ func (s *APIV1Service) RebuildMemoProperty(ctx context.Context, request *v1pb.Re
}
for _, memo := range memos {
property, err := getMemoPropertyFromContent(memo.Content)
property, err := memoproperty.GetMemoPropertyFromContent(memo.Content)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get memo property: %v", err)
}
@ -691,14 +692,14 @@ func (s *APIV1Service) RenameMemoTag(ctx context.Context, request *v1pb.RenameMe
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to parse memo: %v", err)
}
TraverseASTNodes(nodes, func(node ast.Node) {
memoproperty.TraverseASTNodes(nodes, func(node ast.Node) {
if tag, ok := node.(*ast.Tag); ok && tag.Content == request.OldTag {
tag.Content = request.NewTag
}
})
content := restore.Restore(nodes)
property, err := getMemoPropertyFromContent(content)
property, err := memoproperty.GetMemoPropertyFromContent(content)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get memo property: %v", err)
}
@ -1127,56 +1128,6 @@ func findMemoField(callExpr *expr.Expr_Call, filter *MemoFilter) {
}
}
func getMemoPropertyFromContent(content string) (*storepb.MemoPayload_Property, error) {
nodes, err := parser.Parse(tokenizer.Tokenize(content))
if err != nil {
return nil, errors.Wrap(err, "failed to parse content")
}
property := &storepb.MemoPayload_Property{}
TraverseASTNodes(nodes, func(node ast.Node) {
switch n := node.(type) {
case *ast.Tag:
tag := n.Content
if !slices.Contains(property.Tags, tag) {
property.Tags = append(property.Tags, tag)
}
case *ast.Link, *ast.AutoLink:
property.HasLink = true
case *ast.TaskList:
property.HasTaskList = true
if !n.Complete {
property.HasIncompleteTasks = true
}
case *ast.Code, *ast.CodeBlock:
property.HasCode = true
}
})
return property, nil
}
func TraverseASTNodes(nodes []ast.Node, fn func(ast.Node)) {
for _, node := range nodes {
fn(node)
switch n := node.(type) {
case *ast.Paragraph:
TraverseASTNodes(n.Children, fn)
case *ast.Heading:
TraverseASTNodes(n.Children, fn)
case *ast.Blockquote:
TraverseASTNodes(n.Children, fn)
case *ast.OrderedList:
TraverseASTNodes(n.Children, fn)
case *ast.UnorderedList:
TraverseASTNodes(n.Children, fn)
case *ast.TaskList:
TraverseASTNodes(n.Children, fn)
case *ast.Bold:
TraverseASTNodes(n.Children, fn)
}
}
}
// DispatchMemoCreatedWebhook dispatches webhook when memo is created.
func (s *APIV1Service) DispatchMemoCreatedWebhook(ctx context.Context, memo *v1pb.Memo) error {
return s.dispatchMemoRelatedWebhook(ctx, memo, "memos.memo.created")

View file

@ -0,0 +1,120 @@
package memoproperty
import (
"context"
"log/slog"
"slices"
"time"
"github.com/pkg/errors"
"github.com/usememos/gomark/ast"
"github.com/usememos/gomark/parser"
"github.com/usememos/gomark/parser/tokenizer"
storepb "github.com/usememos/memos/proto/gen/store"
"github.com/usememos/memos/store"
)
type Runner struct {
Store *store.Store
}
func NewRunner(store *store.Store) *Runner {
return &Runner{
Store: store,
}
}
// Schedule runner every 12 hours.
const runnerInterval = time.Hour * 12
func (r *Runner) Run(ctx context.Context) {
ticker := time.NewTicker(runnerInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.RunOnce(ctx)
case <-ctx.Done():
return
}
}
}
func (r *Runner) RunOnce(ctx context.Context) {
emptyPayload := "{}"
memos, err := r.Store.ListMemos(ctx, &store.FindMemo{
PayloadFind: &store.FindMemoPayload{
Raw: &emptyPayload,
},
})
if err != nil {
slog.Error("failed to list memos", "err", err)
return
}
for _, memo := range memos {
property, err := GetMemoPropertyFromContent(memo.Content)
if err != nil {
slog.Error("failed to get memo property", "err", err)
continue
}
memo.Payload.Property = property
if err := r.Store.UpdateMemo(ctx, &store.UpdateMemo{
ID: memo.ID,
Payload: memo.Payload,
}); err != nil {
slog.Error("failed to update memo", "err", err)
}
}
}
func GetMemoPropertyFromContent(content string) (*storepb.MemoPayload_Property, error) {
nodes, err := parser.Parse(tokenizer.Tokenize(content))
if err != nil {
return nil, errors.Wrap(err, "failed to parse content")
}
property := &storepb.MemoPayload_Property{}
TraverseASTNodes(nodes, func(node ast.Node) {
switch n := node.(type) {
case *ast.Tag:
tag := n.Content
if !slices.Contains(property.Tags, tag) {
property.Tags = append(property.Tags, tag)
}
case *ast.Link, *ast.AutoLink:
property.HasLink = true
case *ast.TaskList:
property.HasTaskList = true
if !n.Complete {
property.HasIncompleteTasks = true
}
case *ast.Code, *ast.CodeBlock:
property.HasCode = true
}
})
return property, nil
}
func TraverseASTNodes(nodes []ast.Node, fn func(ast.Node)) {
for _, node := range nodes {
fn(node)
switch n := node.(type) {
case *ast.Paragraph:
TraverseASTNodes(n.Children, fn)
case *ast.Heading:
TraverseASTNodes(n.Children, fn)
case *ast.Blockquote:
TraverseASTNodes(n.Children, fn)
case *ast.OrderedList:
TraverseASTNodes(n.Children, fn)
case *ast.UnorderedList:
TraverseASTNodes(n.Children, fn)
case *ast.TaskList:
TraverseASTNodes(n.Children, fn)
case *ast.Bold:
TraverseASTNodes(n.Children, fn)
}
}
}

View file

@ -1,4 +1,4 @@
package s3objectpresigner
package s3presign
import (
"context"
@ -12,25 +12,45 @@ import (
"github.com/usememos/memos/store"
)
// nolint
type S3ObjectPresigner struct {
type Runner struct {
Store *store.Store
}
func NewS3ObjectPresigner(store *store.Store) *S3ObjectPresigner {
return &S3ObjectPresigner{
func NewRunner(store *store.Store) *Runner {
return &Runner{
Store: store,
}
}
func (p *S3ObjectPresigner) CheckAndPresign(ctx context.Context) {
workspaceStorageSetting, err := p.Store.GetWorkspaceStorageSetting(ctx)
// Schedule runner every 12 hours.
const runnerInterval = time.Hour * 12
func (r *Runner) Run(ctx context.Context) {
ticker := time.NewTicker(runnerInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.RunOnce(ctx)
case <-ctx.Done():
return
}
}
}
func (r *Runner) RunOnce(ctx context.Context) {
r.CheckAndPresign(ctx)
}
func (r *Runner) CheckAndPresign(ctx context.Context) {
workspaceStorageSetting, err := r.Store.GetWorkspaceStorageSetting(ctx)
if err != nil {
return
}
s3StorageType := storepb.ResourceStorageType_S3
resources, err := p.Store.ListResources(ctx, &store.FindResource{
resources, err := r.Store.ListResources(ctx, &store.FindResource{
GetBlob: false,
StorageType: &s3StorageType,
})
@ -73,7 +93,7 @@ func (p *S3ObjectPresigner) CheckAndPresign(ctx context.Context) {
}
s3ObjectPayload.S3Config = s3Config
s3ObjectPayload.LastPresignedTime = timestamppb.New(time.Now())
if err := p.Store.UpdateResource(ctx, &store.UpdateResource{
if err := r.Store.UpdateResource(ctx, &store.UpdateResource{
ID: resource.ID,
Reference: &presignURL,
Payload: &storepb.ResourcePayload{
@ -86,21 +106,3 @@ func (p *S3ObjectPresigner) CheckAndPresign(ctx context.Context) {
}
}
}
func (p *S3ObjectPresigner) Start(ctx context.Context) {
p.CheckAndPresign(ctx)
// Schedule runner every 24 hours.
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
p.CheckAndPresign(ctx)
}
}

View file

@ -1,4 +1,5 @@
package versionchecker
// Packge version provides a runner to check the latest version of the application.
package version
import (
"bytes"
@ -16,50 +17,50 @@ import (
"github.com/usememos/memos/store"
)
// nolint
type VersionChecker struct {
type Runner struct {
Store *store.Store
Profile *profile.Profile
}
func NewVersionChecker(store *store.Store, profile *profile.Profile) *VersionChecker {
return &VersionChecker{
func NewRunner(store *store.Store, profile *profile.Profile) *Runner {
return &Runner{
Store: store,
Profile: profile,
}
}
func (*VersionChecker) GetLatestVersion() (string, error) {
response, err := http.Get("https://www.usememos.com/api/version")
if err != nil {
return "", errors.Wrap(err, "failed to make http request")
}
defer response.Body.Close()
// Schedule checker every 8 hours.
const runnerInterval = time.Hour * 8
buf := &bytes.Buffer{}
_, err = buf.ReadFrom(response.Body)
if err != nil {
return "", errors.Wrap(err, "fail to read response body")
}
func (r *Runner) Run(ctx context.Context) {
ticker := time.NewTicker(runnerInterval)
defer ticker.Stop()
version := ""
if err = json.Unmarshal(buf.Bytes(), &version); err != nil {
return "", errors.Wrap(err, "fail to unmarshal get version response")
for {
select {
case <-ticker.C:
r.RunOnce(ctx)
case <-ctx.Done():
return
}
}
return version, nil
}
func (c *VersionChecker) Check(ctx context.Context) {
latestVersion, err := c.GetLatestVersion()
func (r *Runner) RunOnce(ctx context.Context) {
r.Check(ctx)
}
func (r *Runner) Check(ctx context.Context) {
latestVersion, err := r.GetLatestVersion()
if err != nil {
return
}
if !version.IsVersionGreaterThan(latestVersion, version.GetCurrentVersion(c.Profile.Mode)) {
if !version.IsVersionGreaterThan(latestVersion, version.GetCurrentVersion(r.Profile.Mode)) {
return
}
versionUpdateActivityType := store.ActivityTypeVersionUpdate
list, err := c.Store.ListActivities(ctx, &store.FindActivity{
list, err := r.Store.ListActivities(ctx, &store.FindActivity{
Type: &versionUpdateActivityType,
})
if err != nil {
@ -89,12 +90,12 @@ func (c *VersionChecker) Check(ctx context.Context) {
},
},
}
if _, err := c.Store.CreateActivity(ctx, activity); err != nil {
if _, err := r.Store.CreateActivity(ctx, activity); err != nil {
return
}
hostUserRole := store.RoleHost
users, err := c.Store.ListUsers(ctx, &store.FindUser{
users, err := r.Store.ListUsers(ctx, &store.FindUser{
Role: &hostUserRole,
})
if err != nil {
@ -105,7 +106,7 @@ func (c *VersionChecker) Check(ctx context.Context) {
}
hostUser := users[0]
if _, err := c.Store.CreateInbox(ctx, &store.Inbox{
if _, err := r.Store.CreateInbox(ctx, &store.Inbox{
SenderID: store.SystemBotID,
ReceiverID: hostUser.ID,
Status: store.UNREAD,
@ -118,20 +119,22 @@ func (c *VersionChecker) Check(ctx context.Context) {
}
}
func (c *VersionChecker) Start(ctx context.Context) {
c.Check(ctx)
// Schedule checker every 8 hours.
ticker := time.NewTicker(8 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
c.Check(ctx)
func (*Runner) GetLatestVersion() (string, error) {
response, err := http.Get("https://www.usememos.com/api/version")
if err != nil {
return "", errors.Wrap(err, "failed to make http request")
}
defer response.Body.Close()
buf := &bytes.Buffer{}
_, err = buf.ReadFrom(response.Body)
if err != nil {
return "", errors.Wrap(err, "fail to read response body")
}
version := ""
if err = json.Unmarshal(buf.Bytes(), &version); err != nil {
return "", errors.Wrap(err, "fail to unmarshal get version response")
}
return version, nil
}

View file

@ -1,4 +1,4 @@
package versionchecker
package version
import (
"testing"
@ -7,6 +7,6 @@ import (
)
func TestGetLatestVersion(t *testing.T) {
_, err := NewVersionChecker(nil, nil).GetLatestVersion()
_, err := NewRunner(nil, nil).GetLatestVersion()
require.NoError(t, err)
}

View file

@ -21,8 +21,9 @@ import (
apiv1 "github.com/usememos/memos/server/router/api/v1"
"github.com/usememos/memos/server/router/frontend"
"github.com/usememos/memos/server/router/rss"
s3objectpresigner "github.com/usememos/memos/server/service/s3_object_presigner"
versionchecker "github.com/usememos/memos/server/service/version_checker"
memoproperty "github.com/usememos/memos/server/runner/memo_property"
s3presign "github.com/usememos/memos/server/runner/s3_presign"
"github.com/usememos/memos/server/runner/version"
"github.com/usememos/memos/store"
)
@ -140,8 +141,16 @@ func (s *Server) Shutdown(ctx context.Context) {
}
func (s *Server) StartBackgroundRunners(ctx context.Context) {
go versionchecker.NewVersionChecker(s.Store, s.Profile).Start(ctx)
go s3objectpresigner.NewS3ObjectPresigner(s.Store).Start(ctx)
s3presignRunner := s3presign.NewRunner(s.Store)
s3presignRunner.RunOnce(ctx)
versionRunner := version.NewRunner(s.Store, s.Profile)
versionRunner.RunOnce(ctx)
memopropertyRunner := memoproperty.NewRunner(s.Store)
memopropertyRunner.RunOnce(ctx)
go s3presignRunner.Run(ctx)
go versionRunner.Run(ctx)
go memopropertyRunner.Run(ctx)
}
func (s *Server) getOrUpsertWorkspaceBasicSetting(ctx context.Context) (*storepb.WorkspaceBasicSetting, error) {