Skip to content

Commit

Permalink
Merge pull request #309 from gen-mind/patch/ms-teams-chats
Browse files Browse the repository at this point in the history
Patch/ms teams chats
  • Loading branch information
apaladiychuk authored Jun 27, 2024
2 parents 96e5c43 + bf4233a commit cd90210
Showing 1 changed file with 50 additions and 40 deletions.
90 changes: 50 additions & 40 deletions src/backend/core/connector/ms-teams.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
msTeamsFolderContent = "https://graph.microsoft.com/v1.0/groups/%s/drive/items/%s/children"

msTeamsChats = "https://graph.microsoft.com/v1.0/chats?$top=50"
msTeamsChatMessagesURL = "https://graph.microsoft.com/v1.0/chats/%s/messages"
msTeamsChatMessagesURL = "https://graph.microsoft.com/v1.0/chats/%s/messages?$top=50"

msTeamsParamTeamID = "team_id"

Expand Down Expand Up @@ -98,7 +98,7 @@ type (
}
MSTeamsResult struct {
PrevLoadTime string
Messages []byte
Messages []string
}
)

Expand Down Expand Up @@ -159,7 +159,13 @@ func (c *MSTeams) Execute(ctx context.Context, param map[string]string) chan *Re
func (c *MSTeams) execute(ctx context.Context, param map[string]string) error {

if c.param.AnalyzeChats {
if err := c.loadChats(ctx, ""); err != nil {
msDrive := microsoftcore.NewMSDrive(c.param.Files,
c.model,
c.sessionID, c.client,
"", "",
c.getFile,
)
if err := c.loadChats(ctx, msDrive, ""); err != nil {
zap.S().Errorf("error loading chats : %s ", err.Error())
//return fmt.Errorf("load chats : %s", err.Error())
}
Expand Down Expand Up @@ -242,7 +248,7 @@ func (c *MSTeams) loadChannels(ctx context.Context, teamID string) error {
Bucket: model.BucketName(c.model.User.EmbeddingModel.TenantID),
URL: "",
AppendContent: true,
Body: replies.Messages,
Body: []byte(strings.Join(replies.Messages, "\n")),
},
UpToData: false,
}
Expand Down Expand Up @@ -331,7 +337,7 @@ func (c *MSTeams) getReplies(ctx context.Context, teamID, channelID string, msg
}

}
result.Messages = []byte(strings.Join(messages, "\n"))
result.Messages = messages
state.LastCreatedDateTime = lastTime
return &result, nil
}
Expand Down Expand Up @@ -430,7 +436,7 @@ func (c *MSTeams) buildMDMessage(msg *microsoftcore.MessageBody) string {
return fmt.Sprintf(messageTemplate, userName, message)
}

func (c *MSTeams) loadChats(ctx context.Context, nextLink string) error {
func (c *MSTeams) loadChats(ctx context.Context, msDrive *microsoftcore.MSDrive, nextLink string) error {
var response microsoftcore.MSTeamsChatResponse
url := nextLink
if url == "" {
Expand All @@ -439,15 +445,22 @@ func (c *MSTeams) loadChats(ctx context.Context, nextLink string) error {
if err := c.requestAndParse(ctx, url, &response); err != nil {
return nil
}

for _, chat := range response.Value {
sourceID := fmt.Sprintf("chat:%s", chat.Id)
result, err := c.loadChatMessages(ctx, chat.Id)
state, ok := c.state.Chats[chat.Id]
if !ok {
state = &MSTeamMessageState{
LastCreatedDateTime: time.Time{},
}
c.state.Chats[chat.Id] = state
}

result, err := c.loadChatMessages(ctx, msDrive, state, chat.Id, fmt.Sprintf(msTeamsChatMessagesURL, chat.Id))
if err != nil {
zap.S().Errorf("error loading chat messages: %s", err.Error())
continue
}
if len(result.Messages) == 0 {
if len(result) == 0 {
continue
}
doc := &model.Document{
Expand All @@ -469,47 +482,45 @@ func (c *MSTeams) loadChats(ctx context.Context, nextLink string) error {
Name: fileName,
SourceID: sourceID,
DocumentID: doc.ID.IntPart(),
MimeType: "plain/text",
MimeType: "text/markdown",
FileType: proto.FileType_MD,
Signature: "",
Content: &Content{
Bucket: model.BucketName(c.model.User.EmbeddingModel.TenantID),
URL: "",
AppendContent: true,
Body: result.Messages,
Body: []byte(strings.Join(result, "\n")),
},
UpToData: false,
}
}
if response.NexLink != "" {
zap.S().Debugf("load next chats...")
return c.loadChats(ctx, response.NexLink)
return c.loadChats(ctx, msDrive, response.NexLink)
}
return nil
}
func (c *MSTeams) loadChatMessages(ctx context.Context, chatID string) (*MSTeamsResult, error) {
func (c *MSTeams) loadChatMessages(ctx context.Context,
msDrive *microsoftcore.MSDrive,
state *MSTeamMessageState,
chatID, url string) ([]string, error) {
var response microsoftcore.MessageResponse
if err := c.requestAndParse(ctx, fmt.Sprintf(msTeamsChatMessagesURL, chatID), &response); err != nil {
if err := c.requestAndParse(ctx, url, &response); err != nil {
return nil, err
}
state, ok := c.state.Chats[chatID]
if !ok {
state = &MSTeamMessageState{
LastCreatedDateTime: time.Time{},
}
c.state.Chats[chatID] = state
}
lastTime := state.LastCreatedDateTime
lastTime := state.LastCreatedDateTime.UTC()

var messages []string

for _, msg := range response.Value {
// do not scan message if it was scanned before or if it system message
if msg.MessageType != messageTypeMessage ||
state.LastCreatedDateTime.UTC().After(msg.CreatedDateTime.UTC()) ||
state.LastCreatedDateTime.UTC().Equal(msg.CreatedDateTime.UTC()) {
// do not scan system messages
if msg.MessageType != messageTypeMessage {
continue
}
if state.LastCreatedDateTime.UTC().After(msg.CreatedDateTime.UTC()) ||
state.LastCreatedDateTime.UTC().Equal(msg.CreatedDateTime.UTC()) {
// messages in desc order. not needed to process messages that were loaded before.
return messages, nil
}

// renew newest message time
if lastTime.UTC().Before(msg.CreatedDateTime.UTC()) {
Expand All @@ -519,27 +530,26 @@ func (c *MSTeams) loadChatMessages(ctx context.Context, chatID string) (*MSTeams
messages = append(messages, message)
}
for _, attachment := range msg.Attachments {
if err := c.loadAttachment(ctx, attachment); err != nil {
if err := c.loadAttachment(ctx, msDrive, attachment); err != nil {
zap.S().Errorf("error loading attachment: %v", err)
}
}
}
state.LastCreatedDateTime = lastTime

return &MSTeamsResult{
PrevLoadTime: state.LastCreatedDateTime.Format("2006-01-02-15-04-05"),
Messages: []byte(strings.Join(messages, "\n")),
}, nil
if response.OdataNextLink != "" {
if nested, err := c.loadChatMessages(ctx, msDrive, state, chatID, response.OdataNextLink); err == nil {
messages = append(messages, nested...)
} else {
zap.S().Errorf("error loading nested chat messages: %v", err)
}

}
state.LastCreatedDateTime = lastTime
return messages, nil
}

func (c *MSTeams) loadAttachment(ctx context.Context, attachment *microsoftcore.Attachment) error {
func (c *MSTeams) loadAttachment(ctx context.Context, msDrive *microsoftcore.MSDrive, attachment *microsoftcore.Attachment) error {

msDrive := microsoftcore.NewMSDrive(c.param.Files,
c.model,
c.sessionID, c.client,
"", "",
c.getFile,
)
if attachment.ContentType != attachmentContentTypReference {
// do not scrap replies
return nil
Expand Down

0 comments on commit cd90210

Please sign in to comment.