diff --git a/common/common.go b/common/common.go index fa4a7f6..b27f986 100644 --- a/common/common.go +++ b/common/common.go @@ -561,6 +561,20 @@ type NotificationInfo struct { MetaData *MetaData } +type ObjectInQueue struct { + NotificationAction string // Notification status and type + NotificationType string + Object MetaData + Destinations []StoreDestinationStatus //use this list if NotificationType is common.TypeDestination +} + +type DestinationRequestInQueue struct { + Action string + Status string + Object MetaData + Destination Destination +} + // ACLentry contains ACL information about each user type ACLentry struct { Username string @@ -611,6 +625,7 @@ const ( Feedback = "feedback" Error = "error" Ping = "ping" + ReceiverError = "receiverError" ) // Indication whether the object has been delivered to the destination @@ -686,6 +701,17 @@ const ( DeleteAction = "delete" ) +// DestinationUpdateRequestInQueue Action +const ( +// Update = "update" +) + +// NotificationType of object sent to objectWorkQueue +const ( + TypeDestination = "destination" + TypeObject = "object" +) + // Resend flag options const ( ResendAll = iota diff --git a/common/config.go b/common/config.go index 05c35d7..d0a1afc 100644 --- a/common/config.go +++ b/common/config.go @@ -112,6 +112,11 @@ type Config struct { // dummy - for the dummyAuthenticate Authentication handler AuthenticationHandler string `env:"AUTHENTICATION_HANDLER"` + // Buffer size of Object Queue to send objects for notification handling + // For the CSS, default value is 1000 + // For the ESS, default value is 2 + ObjectQueueBufferSize uint64 `env:"OBJECT_QUEUE_BUFFER_SIZE"` + // CommunicationProtocol is a comma separated list of protocols to be used for communication between CSS and ESS // The elements of the list can be 'http', 'mqtt', and 'wiotp' // wiotp indicates MQTT communication via the Watson IoT Platform and mqtt indicates direct MQTT communication to a broker @@ -195,6 +200,10 @@ type Config struct { // Default value: none HTTPCSSCACertificate string `env:"HTTP_CSS_CA_CERTIFICATE"` + // HTTPESSClientTimeout is to specify the http client timeout in seconds for ESS + // default is 120s + HTTPESSClientTimeout int `env:"HTTPESSClientTimeout"` + // LogLevel specifies the logging level in string format LogLevel string `env:"LOG_LEVEL"` @@ -232,6 +241,13 @@ type Config struct { // Other notifications are resent with frequency equal to ResendInterval*6 ResendInterval int16 `env:"RESEND_INTERVAL"` + // ESSCallSPIRetryInterval specifies the frequence in seconds of resend failed updated notifications. + // Default is 2s + ESSCallSPIRetryInterval int32 `env:"ESS_CALL_SPI_RETRY_INTERVAL"` + + // ESSSPIMaxRetry specifies the number of retry if ESS receives notification transparent error + ESSSPIMaxRetry int `env:"ESSSPIMaxRetry"` + // ESSPingInterval specifies the frequency in hours of ping messages that ESS sends to CSS ESSPingInterval int16 `env:"ESS_PING_INTERVAL"` @@ -651,6 +667,14 @@ func ValidateConfig() error { } } + if Configuration.ObjectQueueBufferSize == 0 { + if Configuration.NodeType == CSS { + Configuration.ObjectQueueBufferSize = 1000 + } else { + Configuration.ObjectQueueBufferSize = 2 + } + } + return nil } @@ -684,7 +708,9 @@ func SetDefaultConfig(config *Config) { config.MaxCompressedlLogTraceFilesNumber = 50 config.LogTraceDestination = "file" config.LogTraceMaintenanceInterval = 60 - config.ResendInterval = 5 + config.ResendInterval = 10 + config.ESSSPIMaxRetry = 1000 + config.ESSCallSPIRetryInterval = 2 config.ESSPingInterval = 1 config.RemoveESSRegistrationTime = 30 config.MaxDataChunkSize = 120 * 1024 @@ -698,7 +724,7 @@ func SetDefaultConfig(config *Config) { config.MongoCACertificate = "" config.MongoAllowInvalidCertificates = false config.MongoSessionCacheSize = 1 - config.MongoSleepTimeBetweenRetry = 2000 + config.MongoSleepTimeBetweenRetry = 4000 config.DatabaseConnectTimeout = 300 config.StorageMaintenanceInterval = 30 config.ObjectActivationInterval = 30 @@ -706,6 +732,7 @@ func SetDefaultConfig(config *Config) { config.HTTPPollingInterval = 10 config.HTTPCSSUseSSL = false config.HTTPCSSCACertificate = "" + config.HTTPESSClientTimeout = 120 config.MessagingGroupCacheExpiration = 60 config.ShutdownQuiesceTime = 60 config.ESSConsumedObjectsKept = 1000 diff --git a/core/base/apiModule.go b/core/base/apiModule.go index d7beccb..37371d7 100644 --- a/core/base/apiModule.go +++ b/core/base/apiModule.go @@ -244,8 +244,6 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) apiObjectLocks.Lock(lockIndex) - defer apiObjectLocks.Unlock(lockIndex) - common.ObjectLocks.Lock(lockIndex) if metaData.NoData { @@ -268,6 +266,7 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com dataVf.RemoveTempData(orgID, objectType, objectID, "") common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } dataVf.RemoveTempData(orgID, objectType, objectID, "") @@ -278,9 +277,17 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com } metaData.ChunkSize = common.Configuration.MaxDataChunkSize + _, existingObjStatus, _ := store.RetrieveObjectAndStatus(orgID, objectType, objectID) + if existingObjStatus != "" && existingObjStatus != common.ReadyToSend && existingObjStatus != common.NotReadyToSend { + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + return &common.InvalidRequest{Message: "Can't update object of the receiving side"} + } + deletedDestinations, err := store.StoreObject(metaData, data, status) if err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } @@ -288,6 +295,7 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com if status == common.NotReadyToSend || metaData.Inactive { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return nil } @@ -295,32 +303,40 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com updatedMetaData, err := store.RetrieveObject(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) if err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } - var deleteNotificationsInfo []common.NotificationInfo + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + //var deleteNotificationsInfo []common.NotificationInfo if len(deletedDestinations) != 0 { - deleteNotificationsInfo, err = communications.PrepareNotificationsForDestinations(*updatedMetaData, deletedDestinations, common.Delete) - if err != nil { - common.ObjectLocks.Unlock(lockIndex) - return err + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObject. Send object to objectQueue for delete%s %s\n", objectType, objectID) } - } - updateNotificationsInfo, err := communications.PrepareObjectNotifications(*updatedMetaData) - common.ObjectLocks.Unlock(lockIndex) + objectInQueue := common.ObjectInQueue{NotificationAction: common.Delete, NotificationType: common.TypeDestination, Object: *updatedMetaData, Destinations: deletedDestinations} + objectQueue.SendObjectToQueue(objectInQueue) - if err != nil { - return err + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObject. Continue to updateDestination %s %s\n", objectType, objectID) + } } - if deleteNotificationsInfo != nil { - if err := communications.SendNotifications(deleteNotificationsInfo); err != nil { - return err - } + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObject. Send object to objectQueue for update%s %s\n", objectType, objectID) + } + + objectInQueue := common.ObjectInQueue{NotificationAction: common.Update, NotificationType: common.TypeObject, Object: *updatedMetaData, Destinations: []common.StoreDestinationStatus{}} + objectQueue.SendObjectToQueue(objectInQueue) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObject. Return response for UpdateObject %s %s\n", objectType, objectID) } - return communications.SendNotifications(updateNotificationsInfo) + return nil } // GetObjectStatus sends the status of the object to the app @@ -518,25 +534,27 @@ func PutObjectData(orgID string, objectType string, objectID string, dataReader lockIndex := common.HashStrings(orgID, objectType, objectID) apiObjectLocks.Lock(lockIndex) - defer apiObjectLocks.Unlock(lockIndex) - common.ObjectLocks.Lock(lockIndex) metaData, status, err := store.RetrieveObjectAndStatus(orgID, objectType, objectID) if err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, err } if metaData == nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, nil } if status != common.ReadyToSend && status != common.NotReadyToSend { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, &common.InvalidRequest{Message: "Can't update data of the receiving side"} } if metaData.NoData { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, &common.InvalidRequest{Message: "Can't update data, the NoData flag is set to true"} } @@ -554,6 +572,7 @@ func PutObjectData(orgID string, objectType string, objectID string, dataReader } dataVf.RemoveTempData(orgID, objectType, objectID, "") common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, &common.InvalidRequest{Message: "Failed to verify and store data, Error: " + err.Error()} } if trace.IsLogging(logger.DEBUG) { @@ -567,11 +586,13 @@ func PutObjectData(orgID string, objectType string, objectID string, dataReader if err := dataVf.StoreVerifiedData(orgID, objectType, objectID, ""); err != nil { dataVf.RemoveTempData(orgID, objectType, objectID, "") common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, err } } else { if exists, err := store.StoreObjectData(orgID, objectType, objectID, dataReader); err != nil || !exists { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, err } } @@ -579,6 +600,7 @@ func PutObjectData(orgID string, objectType string, objectID string, dataReader if metaData.SourceDataURI != "" { if err = store.UpdateObjectSourceDataURI(orgID, objectType, objectID, ""); err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, err } } @@ -589,25 +611,31 @@ func PutObjectData(orgID string, objectType string, objectID string, dataReader updatedMetaData, err = store.RetrieveObject(orgID, objectType, objectID) if err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return false, err } if updatedMetaData.Inactive { // Don't send inactive objects to the other side common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return true, nil } - notificationsInfo, err := communications.PrepareObjectNotifications(*updatedMetaData) - common.ObjectLocks.Unlock(lockIndex) - if err != nil { - return true, err + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In PutObjectData. Send object to objectQueue %s %s\n", objectType, objectID) } + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) + objectInQueue := common.ObjectInQueue{NotificationAction: common.Update, NotificationType: common.TypeObject, Object: *updatedMetaData, Destinations: []common.StoreDestinationStatus{}} + objectQueue.SendObjectToQueue(objectInQueue) + if trace.IsLogging(logger.DEBUG) { - trace.Debug("In PutObjectData. done with storing data for object %s %s\n", objectType, objectID) + trace.Debug("In PutObjectData. Return response for PutObjectData %s %s\n", objectType, objectID) } - return true, communications.SendNotifications(notificationsInfo) + return true, nil } // ObjectConsumed is used when an app indicates that it consumed the object @@ -662,8 +690,9 @@ func ObjectConsumed(orgID string, objectType string, objectID string) common.Syn return err } - notificationsInfo, err := communications.PrepareObjectStatusNotification(*metaData, common.Consumed) common.ObjectLocks.Unlock(lockIndex) + notificationsInfo, err := communications.PrepareObjectStatusNotification(*metaData, common.Consumed) + if err != nil { return err } @@ -814,8 +843,8 @@ func ObjectDeleted(userID string, orgID string, objectType string, objectID stri } common.ObjectLocks.Unlock(lockIndex) } else if c == 0 { - notificationsInfo, err := communications.PrepareObjectStatusNotification(*metaData, common.Deleted) common.ObjectLocks.Unlock(lockIndex) + notificationsInfo, err := communications.PrepareObjectStatusNotification(*metaData, common.Deleted) if err != nil { return err } @@ -914,17 +943,17 @@ func DeleteObject(orgID string, objectType string, objectID string) common.SyncS lockIndex := common.HashStrings(orgID, objectType, objectID) apiObjectLocks.Lock(lockIndex) - defer apiObjectLocks.Unlock(lockIndex) - common.ObjectLocks.Lock(lockIndex) metaData, status, err := store.RetrieveObjectAndStatus(orgID, objectType, objectID) if err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } if metaData == nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Object not found"} } if status != common.NotReadyToSend && status != common.ReadyToSend { @@ -932,31 +961,43 @@ func DeleteObject(orgID string, objectType string, objectID string) common.SyncS // ESS is not allowed to remove such objects if common.Configuration.NodeType == common.ESS { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Can't delete object on the receiving side for ESS"} } // CSS removes them without notifying the other side err = storage.DeleteStoredObject(store, *metaData) common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } if err := storage.DeleteStoredData(store, *metaData); err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } if err := store.MarkObjectDeleted(orgID, objectType, objectID); err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } + common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) // Notify the receivers of the object that it was deleted - notificationsInfo, err := communications.PrepareDeleteNotifications(*metaData) - common.ObjectLocks.Unlock(lockIndex) - if err != nil { - return err + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In DeleteObject. Send object to objectQueue %s %s\n", objectType, objectID) + } + + objectInQueue := common.ObjectInQueue{NotificationAction: common.Delete, NotificationType: common.TypeObject, Object: *metaData, Destinations: []common.StoreDestinationStatus{}} + objectQueue.SendObjectToQueue(objectInQueue) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In DeleteObject. Return response for DeleteObject %s %s\n", objectType, objectID) } - return communications.SendNotifications(notificationsInfo) + return nil } // ActivateObject activates an inactive object @@ -970,39 +1011,50 @@ func ActivateObject(orgID string, objectType string, objectID string) common.Syn lockIndex := common.HashStrings(orgID, objectType, objectID) apiObjectLocks.Lock(lockIndex) - defer apiObjectLocks.Unlock(lockIndex) common.ObjectLocks.Lock(lockIndex) metaData, status, err := store.RetrieveObjectAndStatus(orgID, objectType, objectID) if err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } if metaData == nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Object not found"} } if status != common.NotReadyToSend && status != common.ReadyToSend { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return &common.InvalidRequest{Message: "Can't activate object on the receiving side"} } if err := store.ActivateObject(orgID, objectType, objectID); err != nil { common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return err } if status == common.ReadyToSend { - notificationsInfo, err := communications.PrepareObjectNotifications(*metaData) + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In ActivateObject. Send object to objectQueue %s %s\n", objectType, objectID) + } common.ObjectLocks.Unlock(lockIndex) - if err != nil { - return err + apiObjectLocks.Unlock(lockIndex) + objectInQueue := common.ObjectInQueue{NotificationAction: common.Update, NotificationType: common.TypeObject, Object: *metaData, Destinations: []common.StoreDestinationStatus{}} + objectQueue.SendObjectToQueue(objectInQueue) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In ActivateObject. Return response for ActivateObject %s %s\n", objectType, objectID) } - return communications.SendNotifications(notificationsInfo) + return nil } common.ObjectLocks.Unlock(lockIndex) + apiObjectLocks.Unlock(lockIndex) return nil } @@ -1189,32 +1241,33 @@ func UpdateObjectDestinations(orgID string, objectType string, objectID string, return err } - var deleteNotificationsInfo, updateNotificationsInfo []common.NotificationInfo + apiObjectLocks.Unlock(lockIndex) + if len(deletedDestinations) != 0 { - deleteNotificationsInfo, err = communications.PrepareNotificationsForDestinations(*metaData, deletedDestinations, common.Delete) - if err != nil { - apiObjectLocks.Unlock(lockIndex) - return err + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObjectDestinations. Send object to objectQueue for delete%s %s\n", objectType, objectID) } - } - if len(addedDestinations) != 0 && status == common.ReadyToSend { - updateNotificationsInfo, err = communications.PrepareNotificationsForDestinations(*metaData, addedDestinations, common.Update) - if err != nil { - apiObjectLocks.Unlock(lockIndex) - return err + objectInQueue := common.ObjectInQueue{NotificationAction: common.Delete, NotificationType: common.TypeDestination, Object: *metaData, Destinations: deletedDestinations} + objectQueue.SendObjectToQueue(objectInQueue) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObjectDestinations. Continue to add destinations\n") } } - apiObjectLocks.Unlock(lockIndex) - if len(deleteNotificationsInfo) != 0 { - if err := communications.SendNotifications(deleteNotificationsInfo); err != nil { - return err + if len(addedDestinations) != 0 && status == common.ReadyToSend { + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObjectDestinations. Send object to objectQueue for update%s %s\n", objectType, objectID) } - } - if len(updateNotificationsInfo) != 0 { - if err := communications.SendNotifications(updateNotificationsInfo); err != nil { - return err + + objectInQueue := common.ObjectInQueue{NotificationAction: common.Update, NotificationType: common.TypeDestination, Object: *metaData, Destinations: addedDestinations} + objectQueue.SendObjectToQueue(objectInQueue) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In UpdateObjectDestinations. Continue to return response %s %s\n", objectType, objectID) } } @@ -1249,21 +1302,21 @@ func AddObjectDestinations(orgID string, objectType string, objectID string, des return err } - var updateNotificationsInfo []common.NotificationInfo + apiObjectLocks.Unlock(lockIndex) if len(addedDestinations) != 0 && status == common.ReadyToSend { - updateNotificationsInfo, err = communications.PrepareNotificationsForDestinations(*metaData, addedDestinations, common.Update) - if err != nil { - apiObjectLocks.Unlock(lockIndex) - return err + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In AddObjectDestinations. Send object to objectQueue for update%s %s\n", objectType, objectID) } - } - apiObjectLocks.Unlock(lockIndex) - if len(updateNotificationsInfo) != 0 { - if err := communications.SendNotifications(updateNotificationsInfo); err != nil { - return err + objectInQueue := common.ObjectInQueue{NotificationAction: common.Update, NotificationType: common.TypeDestination, Object: *metaData, Destinations: addedDestinations} + objectQueue.SendObjectToQueue(objectInQueue) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In AddObjectDestinations. Continue to return response %s %s\n", objectType, objectID) } } + if trace.IsLogging(logger.DEBUG) { trace.Debug("In add destinations, added destination for object %s %s: \n", objectType, objectID) for _, added := range addedDestinations { @@ -1290,19 +1343,19 @@ func DeleteObjectDestinations(orgID string, objectType string, objectID string, return err } - var deleteNotificationsInfo []common.NotificationInfo + apiObjectLocks.Unlock(lockIndex) + if len(deletedDestinations) != 0 { - deleteNotificationsInfo, err = communications.PrepareNotificationsForDestinations(*metaData, deletedDestinations, common.Delete) - if err != nil { - apiObjectLocks.Unlock(lockIndex) - return err + // Should be in antoher thread + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In DeleteObjectDestinations. Send object to objectQueue for delete%s %s\n", objectType, objectID) } - } - apiObjectLocks.Unlock(lockIndex) - if len(deleteNotificationsInfo) != 0 { - if err := communications.SendNotifications(deleteNotificationsInfo); err != nil { - return err + objectInQueue := common.ObjectInQueue{NotificationAction: common.Delete, NotificationType: common.TypeDestination, Object: *metaData, Destinations: deletedDestinations} + objectQueue.SendObjectToQueue(objectInQueue) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In DeleteObjectDestinations. Continue to return response %s %s\n", objectType, objectID) } } diff --git a/core/base/apiModule_test.go b/core/base/apiModule_test.go index 5b267b1..09699ab 100644 --- a/core/base/apiModule_test.go +++ b/core/base/apiModule_test.go @@ -15,6 +15,7 @@ import ( "os" "strings" "testing" + "time" "github.com/open-horizon/edge-sync-service/common" "github.com/open-horizon/edge-sync-service/core/communications" @@ -22,6 +23,8 @@ import ( "github.com/open-horizon/edge-sync-service/core/storage" ) +const MAX_RETRY = 100 + func setupDB(dbType string) { if dbType == common.Mongo { common.Configuration.MongoDbName = "d_test_db" @@ -29,6 +32,7 @@ func setupDB(dbType string) { } else if dbType == common.Bolt { dir, _ := os.Getwd() common.Configuration.PersistenceRootPath = dir + "/persist" + fmt.Printf("common.Configuration.PersistenceRootPath: %s\n", common.Configuration.PersistenceRootPath) boltStore := &storage.BoltStorage{} boltStore.Cleanup(true) store = boltStore @@ -73,12 +77,31 @@ func setupDataSignature(data []byte, hashAlgo string) (string, string, error) { return publicKeyString, signatureString, nil } +func setupObjectQueue() { + objectQueue = communications.NewObjectWorkQueue(40) +} + +func teardownObjectQueue() { + if objectQueue != nil { + fmt.Println("close the objectQueue") + objectQueue.Close() + } +} + +func TestSetupForApiModule(t *testing.T) { + setupObjectQueue() +} + func TestObjectAPI(t *testing.T) { + fmt.Println("Mongo") setupDB(common.Mongo) testObjectAPI(store, t) + fmt.Println("Mongo Done") + fmt.Println("Bolt") setupDB(common.Bolt) testObjectAPI(store, t) + fmt.Println("BoltDB Done") } func testObjectAPI(store storage.Storage, t *testing.T) { @@ -289,55 +312,56 @@ func testObjectAPI(store storage.Storage, t *testing.T) { } validObjects := []struct { - orgID string - objectType string - objectID string - metaData common.MetaData - data []byte - expectedStatus string - expectedConsumers int - newData []byte - expectedDestNumber int - updateDests bool + orgID string + objectType string + objectID string + metaData common.MetaData + data []byte + expectedStatus string + expectedConsumers int + newData []byte + expectedDestNumber int + updateDests bool + destNeedToBeUpdated bool }{ {"myorg777", "type1", "1", common.MetaData{ObjectID: "1", ObjectType: "type1", DestOrgID: "myorg777", DestID: "dev1", DestType: "device"}, - nil, common.NotReadyToSend, 1, []byte("new"), 1, false}, + nil, common.NotReadyToSend, 1, []byte("new"), 1, false, false}, {"myorg777", "type1", "2", common.MetaData{ObjectID: "2", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: -1, NoData: true, Link: "abc", DestID: "dev1", DestType: "device"}, - []byte("abc"), common.ReadyToSend, math.MaxInt32, []byte("new"), 1, false}, + []byte("abc"), common.ReadyToSend, math.MaxInt32, []byte("new"), 1, false, false}, {"myorg777", "type1", "3", common.MetaData{ObjectID: "3", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, Link: "abc", DestID: "dev2", DestType: "device"}, - nil, common.ReadyToSend, 3, []byte("new"), 0, false}, + nil, common.ReadyToSend, 3, []byte("new"), 0, false, false}, {"myorg777", "type1", "31", common.MetaData{ObjectID: "31", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, MetaOnly: true, DestID: "dev2", DestType: "device"}, - nil, common.NotReadyToSend, 3, []byte("new"), 0, false}, + nil, common.NotReadyToSend, 3, []byte("new"), 0, false, false}, {"myorg777", "type1", "4", common.MetaData{ObjectID: "4", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, DestType: "device", DestID: "dev1"}, - []byte("abc"), common.ReadyToSend, 3, []byte("new"), 1, false}, + []byte("abc"), common.ReadyToSend, 3, []byte("new"), 1, false, false}, {"myorg777", "type1", "5", common.MetaData{ObjectID: "5", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, DestType: "device", DestID: "dev1", Inactive: true}, - []byte("abc"), common.ReadyToSend, 3, []byte("new"), 1, false}, + []byte("abc"), common.ReadyToSend, 3, []byte("new"), 1, false, false}, {"myorg777", "type1", "6", common.MetaData{ObjectID: "6", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 1, DestinationsList: dests}, - []byte("abc"), common.ReadyToSend, 1, []byte("new"), 3, false}, + []byte("abc"), common.ReadyToSend, 1, []byte("new"), 3, false, true}, {"myorg777", "type1", "6", common.MetaData{ObjectID: "6", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 1, DestType: "device3", DestID: "dev1", MetaOnly: true}, - nil, common.ReadyToSend, 1, []byte("new"), 1, true}, + nil, common.ReadyToSend, 1, []byte("new"), 1, true, false}, {"myorg777", "type1", "7", common.MetaData{ObjectID: "7", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, DestType: "device", DestID: "dev1", HashAlgorithm: common.Sha1, PublicKey: publicKeySha1, Signature: signatureSha1}, - dataToSign, common.ReadyToSend, 3, nil, 1, false}, + dataToSign, common.ReadyToSend, 3, nil, 1, false, false}, {"myorg777", "type1", "8", common.MetaData{ObjectID: "8", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, DestType: "device", DestID: "dev1", HashAlgorithm: common.Sha1, PublicKey: publicKeySha1, Signature: signatureSha1, NoData: true}, - dataToSign, common.ReadyToSend, 3, nil, 1, false}, + dataToSign, common.ReadyToSend, 3, nil, 1, false, false}, {"myorg777", "type1", "71", common.MetaData{ObjectID: "71", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, DestType: "device", DestID: "dev1", HashAlgorithm: common.Sha256, PublicKey: publicKeySha256, Signature: signatureSha256}, - dataToSign, common.ReadyToSend, 3, nil, 1, false}, + dataToSign, common.ReadyToSend, 3, nil, 1, false, false}, {"myorg777", "type1", "81", common.MetaData{ObjectID: "81", ObjectType: "type1", DestOrgID: "myorg777", ExpectedConsumers: 3, DestType: "device", DestID: "dev1", HashAlgorithm: common.Sha256, PublicKey: publicKeySha256, Signature: signatureSha256, NoData: true}, - dataToSign, common.ReadyToSend, 3, nil, 1, false}, + dataToSign, common.ReadyToSend, 3, nil, 1, false, false}, } destination1 := common.Destination{DestOrgID: "myorg777", DestType: "device", DestID: "dev1", Communication: common.MQTTProtocol} @@ -408,39 +432,34 @@ func testObjectAPI(store storage.Storage, t *testing.T) { if !row.metaData.NoData && row.data != nil && dataReader == nil { t.Errorf("Failed to fetch object's data (objectID = %s)", row.objectID) } + } + fmt.Println("Sleep 5s before checking notification for UpdateObject") + time.Sleep(time.Duration(5) * time.Second) + + // check notification for UpdateObject + for _, row := range validObjects { + metaData, _, err := store.RetrieveObjectAndStatus(row.orgID, row.objectType, row.objectID) + if err != nil { + t.Errorf("Failed to fetch updated object (objectID = %s).", row.objectID) + } // Check the created notification if row.expectedStatus == common.ReadyToSend && !metaData.Inactive { if destination1.DestID == metaData.DestID { - notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, metaData.DestType, metaData.DestID) - if err != nil { - t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) - } - if (metaData.DestType == destination1.DestType || metaData.DestType == "") && - (metaData.DestID == destination1.DestID || metaData.DestID == "") { - if notification == nil { - t.Errorf("No notification record (objectID = %s)", row.objectID) - } else { - if notification.Status != common.Update { - t.Errorf("Wrong notification status: %s instead of update (objectID = %s)", notification.Status, row.objectID) - } - if notification.InstanceID != metaData.InstanceID { - t.Errorf("Wrong instance ID in notification: %d instead of %d (objectID = %s)", notification.InstanceID, - metaData.InstanceID, row.objectID) - } + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, metaData.DestType, metaData.DestID) + if err != nil { + t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) } - } - } else if metaData.DestinationsList != nil { - if destinations, err := store.GetObjectDestinations(*metaData); err != nil { - t.Errorf("GetObjectDestinations failed. Error: %s", err.Error()) - } else { - for _, d := range destinations { - notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, d.DestType, d.DestID) - if err != nil { - t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) - } + if (metaData.DestType == destination1.DestType || metaData.DestType == "") && + (metaData.DestID == destination1.DestID || metaData.DestID == "") { if notification == nil { - t.Errorf("No notification record (objectID = %s)", row.objectID) + if i == MAX_RETRY-1 { + t.Errorf("No notification record (objectID = %s)", row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue + } } else { if notification.Status != common.Update { t.Errorf("Wrong notification status: %s instead of update (objectID = %s)", notification.Status, row.objectID) @@ -452,41 +471,89 @@ func testObjectAPI(store storage.Storage, t *testing.T) { } } } + + } else if metaData.DestinationsList != nil { + if destinations, err := store.GetObjectDestinations(*metaData); err != nil { + t.Errorf("GetObjectDestinations failed. Error: %s", err.Error()) + } else { + for _, d := range destinations { + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, d.DestType, d.DestID) + if err != nil { + t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) + } else if notification == nil { + if i == MAX_RETRY-1 { + t.Errorf("No notification record (objectID = %s)", row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue + } + } else { + if notification.Status != common.Update { + t.Errorf("Wrong notification status: %s instead of update (objectID = %s)", notification.Status, row.objectID) + } + if notification.InstanceID != metaData.InstanceID { + t.Errorf("Wrong instance ID in notification: %d instead of %d (objectID = %s)", notification.InstanceID, + metaData.InstanceID, row.objectID) + } + } + + } + } + } } } if row.updateDests { // There should be delete notifications for destinations 1-3 - notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, destination1.DestType, destination1.DestID) - if err != nil { - t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) - } else { - if notification == nil { - t.Errorf("No delete notification record (objectID = %s)", row.objectID) + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, destination1.DestType, destination1.DestID) + if err != nil { + t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) + } else if notification == nil { + if i == MAX_RETRY-1 { + t.Errorf("No delete notification record (objectID = %s)", row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue + } } else { if notification.Status != common.Delete { t.Errorf("Wrong notification status: %s instead of delete (objectID = %s)", notification.Status, row.objectID) } } + } - notification, err = store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, destination2.DestType, destination2.DestID) - if err != nil { - t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) - } else { - if notification == nil { - t.Errorf("No delete notification record (objectID = %s)", row.objectID) + + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, destination2.DestType, destination2.DestID) + if err != nil { + t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) + } else if notification == nil { + if i == MAX_RETRY-1 { + t.Errorf("No delete notification record (objectID = %s)", row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue + } } else { if notification.Status != common.Delete { t.Errorf("Wrong notification status: %s instead of delete (objectID = %s)", notification.Status, row.objectID) } } } - notification, err = store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, destination3.DestType, destination3.DestID) - if err != nil { - t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) - } else { - if notification == nil { - t.Errorf("No delete notification record (objectID = %s)", row.objectID) + + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, destination3.DestType, destination3.DestID) + if err != nil { + t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) + } else if notification == nil { + if i == MAX_RETRY-1 { + t.Errorf("No delete notification record (objectID = %s)", row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue + } } else { if notification.Status != common.Delete { t.Errorf("Wrong notification status: %s instead of delete (objectID = %s)", notification.Status, row.objectID) @@ -537,11 +604,21 @@ func testObjectAPI(store storage.Storage, t *testing.T) { } } } + } - // Put data - instance := metaData.InstanceID - + metaInstanceIdMap := make(map[string]int64, 0) + for _, row := range validObjects { if row.newData != nil { + metaData, _, err := store.RetrieveObjectAndStatus(row.orgID, row.objectType, row.objectID) + if err != nil { + t.Errorf("Failed to fetch updated object (objectID = %s).", row.objectID) + } + + // Put data + instance := metaData.InstanceID + key := fmt.Sprintf("%s/%s/%s", row.orgID, row.objectType, row.objectID) + metaInstanceIdMap[key] = instance + ok, err := PutObjectData(row.orgID, row.objectType, row.objectID, bytes.NewReader(row.newData)) if err != nil { if !row.metaData.NoData { @@ -571,36 +648,79 @@ func testObjectAPI(store storage.Storage, t *testing.T) { t.Errorf("Instance ID was not updated: %d should be greater than %d (objectID = %s)", metaData.InstanceID, instance, row.objectID) } + } - if destination1.DestID == metaData.DestID { - notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, metaData.DestType, metaData.DestID) - if err != nil { - t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) - } + } + } + + } + } + + // check notificagtion for PutObjectData + fmt.Println("Sleep 5s before checking notification for PutObjectData") + time.Sleep(time.Duration(5) * time.Second) + for _, row := range validObjects { + if row.newData != nil { + metaData, _, err := store.RetrieveObjectAndStatus(row.orgID, row.objectType, row.objectID) + if err != nil { + t.Errorf("Failed to fetch updated object (objectID = %s).", row.objectID) + } + // Put data + key := fmt.Sprintf("%s/%s/%s", row.orgID, row.objectType, row.objectID) + instance := metaInstanceIdMap[key] + + if !row.metaData.NoData { + if destination1.DestID == metaData.DestID { + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, metaData.DestType, metaData.DestID) + if err != nil { + t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) + } - if !metaData.Inactive && (metaData.DestType == destination1.DestType || metaData.DestType == "") && - (metaData.DestID == destination1.DestID || metaData.DestID == "") { - if notification == nil { + if !metaData.Inactive && (metaData.DestType == destination1.DestType || metaData.DestType == "") && + (metaData.DestID == destination1.DestID || metaData.DestID == "") { + if notification == nil { + if i == MAX_RETRY-1 { t.Errorf("No notification record after data update (objectID = %s)", row.objectID) } else { - if notification.Status != common.Update { + //time.Sleep(time.Duration(1) * time.Second) + continue + } + } else { + if notification.Status != common.Update { + if i == MAX_RETRY-1 { t.Errorf("Wrong notification status after data update: %s instead of update (objectID = %s)", notification.Status, row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue } - if row.expectedStatus == common.ReadyToSend && notification.InstanceID <= instance { + + } + //fmt.Printf("for object %s/%s/%s, notification.InstanceID: %d, instance: %d\n", row.orgID, row.objectType, row.objectID, notification.InstanceID, instance) + if row.expectedStatus == common.ReadyToSend && notification.InstanceID <= instance { + if i == MAX_RETRY-1 { t.Errorf("Wrong instance ID in notification after data update: %d should be greater than %d (objectID = %s)", notification.InstanceID, instance, row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue } + } - } else if metaData.Inactive && notification != nil { - t.Errorf("Found a notification record after data update with an inactive object (objectID = %s)", row.objectID) } + } else if metaData.Inactive && notification != nil { + t.Errorf("Found a notification record after data update with an inactive object (objectID = %s)", row.objectID) } } } + } + } + } + for _, row := range validObjects { // Mark consumed (should fail) if err := ObjectConsumed(row.orgID, row.objectType, row.objectID); err == nil { t.Errorf("objectConsumed marked the sender's object as consumed (objectID = %s)", row.objectID) @@ -614,8 +734,21 @@ func testObjectAPI(store storage.Storage, t *testing.T) { // Activate if err := ActivateObject(row.orgID, row.objectType, row.objectID); err != nil { t.Errorf("Failed to activate object (objectID = %s). Error: %s", row.objectID, err.Error()) - } else { - if destination1.DestID == metaData.DestID { + } + + } + + fmt.Println("Sleep 5s before checking notification for Activate") + time.Sleep(time.Duration(5) * time.Second) + // Check notification for Activate + for _, row := range validObjects { + metaData, _, err := store.RetrieveObjectAndStatus(row.orgID, row.objectType, row.objectID) + if err != nil { + t.Errorf("Failed to fetch updated object (objectID = %s).", row.objectID) + } + + if destination1.DestID == metaData.DestID { + for i := 0; i < MAX_RETRY; i++ { notification, err := store.RetrieveNotificationRecord(row.orgID, row.objectType, row.objectID, metaData.DestType, metaData.DestID) if err != nil && !storage.IsNotFound(err) { t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", row.objectID, err.Error()) @@ -624,7 +757,12 @@ func testObjectAPI(store storage.Storage, t *testing.T) { if (metaData.DestType == destination1.DestType || metaData.DestType == "") && (metaData.DestID == destination1.DestID || metaData.DestID == "") { if notification == nil { - t.Errorf("No notification record after object activation (objectID = %s)", row.objectID) + if i == MAX_RETRY-1 { + t.Errorf("No notification record after object activation (objectID = %s)", row.objectID) + } else { + //time.Sleep(time.Duration(1) * time.Second) + continue + } } else { if notification.Status != common.Update { t.Errorf("Wrong notification status after object activation: %s instead of update (objectID = %s)", row.objectID, @@ -632,14 +770,19 @@ func testObjectAPI(store storage.Storage, t *testing.T) { } } } + } } + } + for _, row := range validObjects { // Destinations list for the object - if dests, err := GetObjectDestinationsStatus(row.orgID, row.objectType, row.objectID); err != nil { - t.Errorf("Error in getObjectDestinationsStatus (objectID = %s). Error: %s", row.objectID, err.Error()) - } else if len(dests) != row.expectedDestNumber { - t.Errorf("Wrong number of destinations: %d instead of %d (objectID = %s).", len(dests), row.expectedDestNumber, row.objectID) + if !row.destNeedToBeUpdated { + if dests, err := GetObjectDestinationsStatus(row.orgID, row.objectType, row.objectID); err != nil { + t.Errorf("Error in getObjectDestinationsStatus (objectID = %s). Error: %s", row.objectID, err.Error()) + } else if len(dests) != row.expectedDestNumber { + t.Errorf("Wrong number of destinations: %d instead of %d (objectID = %s).", len(dests), row.expectedDestNumber, row.objectID) + } } } @@ -649,11 +792,15 @@ func testObjectAPI(store storage.Storage, t *testing.T) { } func TestESSObjectDeletedAPI(t *testing.T) { + fmt.Println("Bolt") setupDB(common.Bolt) testESSObjectDeletedAPI(store, t) + fmt.Println("Bolt done") + fmt.Println("In memory") setupDB(common.InMemory) testESSObjectDeletedAPI(store, t) + fmt.Println("In memory done") } func testESSObjectDeletedAPI(store storage.Storage, t *testing.T) { @@ -819,6 +966,11 @@ func testObjectDestinationsAPI(store storage.Storage, t *testing.T) { t.Errorf("UpdateObject failed to update (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } + } + + fmt.Println("Sleep 5s before checking notification for UpdateObject") + time.Sleep(time.Duration(5) * time.Second) + for _, test := range tests { dests, err := GetObjectDestinationsStatus(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID) if err != nil { t.Errorf("GetObjectDestinationsStatus failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) @@ -833,29 +985,49 @@ func testObjectDestinationsAPI(store storage.Storage, t *testing.T) { t.Errorf("DeleteNotificationRecords failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } - err = UpdateObjectDestinations(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, dests2) - if err != nil { + if err = UpdateObjectDestinations(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, dests2); err != nil { t.Errorf("UpdateObjectDestinations failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) - } else { - // There should be delete notifications for destination1 + } + + } + + fmt.Println("Sleep 5s before checking notification for UpdateObjectDestinations") + time.Sleep(time.Duration(5) * time.Second) + for _, test := range tests { + // There should be delete notifications for destination1 + for i := 0; i < MAX_RETRY; i++ { notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destination1.DestType, destination1.DestID) if err != nil { t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } else { if notification == nil { if test.metaData.NoData == true { - t.Errorf("No delete notification record (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("No delete notification record (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } } else if test.metaData.NoData == false { - t.Errorf("Notification record created for not ready to send object (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("Notification record created for not ready to send object (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } else { if notification.Status != common.Delete { - t.Errorf("Wrong notification status: %s instead of delete (objectID = %s)", notification.Status, test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("Wrong notification status: %s instead of delete (objectID = %s)", notification.Status, test.metaData.ObjectID) + } else { + continue + } } } } - if test.expectedDestNumber > 1 { - // There should be delete notifications for destination2 + } + if test.expectedDestNumber > 1 { + // There should be delete notifications for destination2 + for i := 0; i < MAX_RETRY; i++ { notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destination2.DestType, destination2.DestID) if err != nil { @@ -863,25 +1035,46 @@ func testObjectDestinationsAPI(store storage.Storage, t *testing.T) { } else { if notification == nil { if test.metaData.NoData == true { - t.Errorf("No delete notification record (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("No delete notification record (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } } else if test.metaData.NoData == false { - t.Errorf("Notification record created for not ready to send object (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("Notification record created for not ready to send object (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } else { if notification.Status != common.Delete { - t.Errorf("Wrong notification status: %s instead of delete (objectID = %s)", notification.Status, test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("Wrong notification status: %s instead of delete (objectID = %s)", notification.Status, test.metaData.ObjectID) + } else { + continue + } } } } - // There should be delete notifications for destination3 - notification, err = store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, + } + } + + if test.expectedDestNumber > 1 { + // There should be delete notifications for destination3 + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destination3.DestType, destination3.DestID) if err != nil { t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } else { if notification == nil { if test.metaData.NoData == true { - t.Errorf("No delete notification record (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("No delete notification record (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } } else if test.metaData.NoData == false { t.Errorf("Notification record created for not ready to send object (objectID = %s)", test.metaData.ObjectID) @@ -892,75 +1085,121 @@ func testObjectDestinationsAPI(store storage.Storage, t *testing.T) { } } } - // Look for update notification for destination4 - notification, err = store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, + } + + // Look for update notification for destination4 + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destination4.DestType, destination4.DestID) if err != nil { t.Errorf("An error occurred in notification fetch (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } else { if notification == nil { if test.expectedDestNumber < 4 && test.metaData.NoData == true { - t.Errorf("No notification record (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("No notification record (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } } else if test.metaData.NoData == false { - t.Errorf("Notification record created for not ready to send object (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("Notification record created for not ready to send object (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } else { if test.expectedDestNumber == 4 && test.metaData.NoData == true { - t.Errorf("Notification record created for already existing destination (objectID = %s)", test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("Notification record created for already existing destination (objectID = %s)", test.metaData.ObjectID) + } else { + continue + } } else if notification.Status != common.Update { - t.Errorf("Wrong notification status: %s instead of update (objectID = %s)", notification.Status, test.metaData.ObjectID) + if i == MAX_RETRY-1 { + t.Errorf("Wrong notification status: %s instead of update (objectID = %s)", notification.Status, test.metaData.ObjectID) + } else { + continue + } } } } + } + } - t.Logf("Start testing add object destinations") - err = store.DeleteNotificationRecords(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, "", "") + t.Logf("Start testing add object destinations") + for _, test := range tests { + err := store.DeleteNotificationRecords(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, "", "") if err != nil { t.Errorf("DeleteNotificationRecords failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } err = AddObjectDestinations(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destsToAdd) if err != nil { t.Errorf("AddObjectDestinations failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) - } else { - // The destinations should be: device3:dev1, device:dev1, device2:dev, device2:dev1 - dests, err := GetObjectDestinationsStatus(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID) - if err != nil { - t.Errorf("GetObjectDestinationsStatus failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) - } else if len(dests) != len(destsToAdd) { - t.Errorf("GetObjectDestinationsStatus returned wrong number of destinations: %d instead of %d (objectID = %s)", - len(dests), len(destsToAdd), test.metaData.ObjectID) - } else if !destsEquals(dests, destsToAdd) { - t.Errorf("GetObjectDestinationsStatus returned list of destinations: %s, expect %s (objectID = %s)", - printDestinationsStatus(dests), destsToAdd, test.metaData.ObjectID) - } else { - // check notification record - // Look for update notification for device:dev1, device2:dev, device2:dev1 - // destination4 is already existed, should not receive update notification - for _, destToAdd := range destsToAdd { - parts := strings.Split(destToAdd, ":") - destToAddDestType := parts[0] - destToAddDestID := parts[1] - - notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, - destToAddDestType, destToAddDestID) - if err != nil { - t.Errorf("An error occurred in notification fetch (objectID = %s, destinationType = %s, destinationID = %s). Error: %s", test.metaData.ObjectID, destToAddDestType, destToAddDestID, err.Error()) - } else { - if notification == nil { - if destToAddDestType == destination4.DestType && destToAddDestID == destination4.DestID || !test.metaData.NoData { - // It is expected - } else { + } + } + + for _, test := range tests { + // The destinations should be: device3:dev1, device:dev1, device2:dev, device2:dev1 + dests, err := GetObjectDestinationsStatus(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID) + if err != nil { + t.Errorf("GetObjectDestinationsStatus failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) + } else if len(dests) != len(destsToAdd) { + t.Errorf("GetObjectDestinationsStatus returned wrong number of destinations: %d instead of %d (objectID = %s)", + len(dests), len(destsToAdd), test.metaData.ObjectID) + } else if !destsEquals(dests, destsToAdd) { + t.Errorf("GetObjectDestinationsStatus returned list of destinations: %s, expect %s (objectID = %s)", + printDestinationsStatus(dests), destsToAdd, test.metaData.ObjectID) + } + } + + fmt.Println("Sleep 5s before checking notification for AddObjectDestinations") + time.Sleep(time.Duration(5) * time.Second) + for _, test := range tests { + // check notification record + // Look for update notification for device:dev1, device2:dev, device2:dev1 + // destination4 is already existed, should not receive update notification + for i := 0; i < MAX_RETRY; i++ { + for _, destToAdd := range destsToAdd { + parts := strings.Split(destToAdd, ":") + destToAddDestType := parts[0] + destToAddDestID := parts[1] + + notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, + destToAddDestType, destToAddDestID) + if err != nil { + t.Errorf("An error occurred in notification fetch (objectID = %s, destinationType = %s, destinationID = %s). Error: %s", test.metaData.ObjectID, destToAddDestType, destToAddDestID, err.Error()) + } else { + if notification == nil { + if destToAddDestType == destination4.DestType && destToAddDestID == destination4.DestID || !test.metaData.NoData { + // It is expected + } else { + if i == MAX_RETRY-1 { t.Errorf("No notification record (objectID = %s, destinationType = %s, destinationID = %s)", test.metaData.ObjectID, destToAddDestType, destToAddDestID) + } else { + continue } - } else if test.metaData.NoData == false { + } + } else if test.metaData.NoData == false { + if i == MAX_RETRY-1 { t.Errorf("Notification record created for not ready to send object (objectID = %s, destinationType = %s, destinationID = %s)", test.metaData.ObjectID, destToAddDestType, destToAddDestID) } else { - // destination4 is already existed, should not receive update notification - if destToAddDestType == destination4.DestType && destToAddDestID == destination4.DestID && test.expectedDestNumber == 4 && test.metaData.NoData == true { + continue + } + } else { + // destination4 is already existed, should not receive update notification + if destToAddDestType == destination4.DestType && destToAddDestID == destination4.DestID && test.expectedDestNumber == 4 && test.metaData.NoData == true { + if i == MAX_RETRY-1 { t.Errorf("Notification record created for already existing destination (objectID = %s, destinationType = %s, destinationID = %s)", test.metaData.ObjectID, destToAddDestType, destToAddDestID) - } else if notification.Status != common.Update { + } else { + continue + } + } else if notification.Status != common.Update { + if i == MAX_RETRY-1 { t.Errorf("Wrong notification status: %s instead of update (objectID = %s, destinationType = %s, destinationID = %s)", notification.Status, test.metaData.ObjectID, destToAddDestType, destToAddDestID) + } else { + continue } } } @@ -968,13 +1207,18 @@ func testObjectDestinationsAPI(store storage.Storage, t *testing.T) { } } - t.Logf("Start testing delete object destinations") - err = UpdateObject(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, test.metaData, nil) + } + + t.Logf("Start testing delete object destinations") + for _, test := range tests { + err := UpdateObject(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, test.metaData, nil) if err != nil { t.Errorf("UpdateObject failed to update (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } + } - dests, err = GetObjectDestinationsStatus(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID) + for _, test := range tests { + dests, err := GetObjectDestinationsStatus(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID) if err != nil { t.Errorf("GetObjectDestinationsStatus failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) } else if len(dests) != test.expectedDestNumber { @@ -995,13 +1239,6 @@ func testObjectDestinationsAPI(store storage.Storage, t *testing.T) { t.Errorf("Expect to see error when destinations to delete contains invalid destination") } // notification should not have record for noType:noID - parts := strings.Split(destsToDeleteToFail[0], ":") - notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, parts[0], parts[1]) - if err != nil { - t.Errorf("An error occurred fetch notification for deleted destination (objectID = %s, destinationType = %s, destinationID = %s). Error: %s", test.metaData.ObjectID, parts[0], parts[1], err.Error()) - } else if notification != nil { - t.Errorf("Notification delete record should not be created for non-exist destination (objectID = %s, destinationType = %s, destinationID = %s)", test.metaData.ObjectID, parts[0], parts[1]) - } // Valid case // NoData == true ----> should have delete notification @@ -1010,60 +1247,95 @@ func testObjectDestinationsAPI(store storage.Storage, t *testing.T) { err = DeleteObjectDestinations(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destsToDelete) if err != nil { t.Errorf("DeleteObjectDestinations failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) - } else { - // The destinations after deletion is: (destsRemain) "device2:dev", "device2:dev1", "device3:dev1" - // delete (destsToDelete) "device:dev1" - dests, err := GetObjectDestinationsStatus(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID) + } + } + + fmt.Println("Sleep 5s before checking notification for DeleteObjectDestinations") + time.Sleep(time.Duration(5) * time.Second) + for _, test := range tests { + // The destinations after deletion is: (destsRemain) "device2:dev", "device2:dev1", "device3:dev1" + // delete (destsToDelete) "device:dev1" + dests, err := GetObjectDestinationsStatus(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID) + if err != nil { + t.Errorf("GetObjectDestinationsStatus after deleting destinations failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) + } else if len(dests) != len(destsRemain[test.metaData.ObjectID]) { + t.Errorf("GetObjectDestinationsStatus returned wrong number of destinations: %d instead of %d (objectID = %s)", + len(dests), len(destsRemain[test.metaData.ObjectID]), test.metaData.ObjectID) + } else if !destsEquals(dests, destsRemain[test.metaData.ObjectID]) { + t.Errorf("GetObjectDestinationsStatus returned list of destinations: %s, expect %s (objectID = %s)", + printDestinationsStatus(dests), destsRemain[test.metaData.ObjectID], test.metaData.ObjectID) + } + } + + fmt.Println("Sleep 5s before checking notification for DeleteObjectDestinations") + time.Sleep(time.Duration(5) * time.Second) + for _, test := range tests { + // check notification record + // Look for delete notification for device:dev1(destination1) + for i := 0; i < MAX_RETRY; i++ { + notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destination1.DestType, destination1.DestID) if err != nil { - t.Errorf("GetObjectDestinationsStatus after deleting destinations failed (objectID = %s). Error: %s", test.metaData.ObjectID, err.Error()) - } else if len(dests) != len(destsRemain[test.metaData.ObjectID]) { - t.Errorf("GetObjectDestinationsStatus returned wrong number of destinations: %d instead of %d (objectID = %s)", - len(dests), len(destsRemain[test.metaData.ObjectID]), test.metaData.ObjectID) - } else if !destsEquals(dests, destsRemain[test.metaData.ObjectID]) { - t.Errorf("GetObjectDestinationsStatus returned list of destinations: %s, expect %s (objectID = %s)", - printDestinationsStatus(dests), destsRemain[test.metaData.ObjectID], test.metaData.ObjectID) + t.Errorf("An error occurred fetch notification for deleted destination (objectID = %s, destinationType = %s, destinationID = %s). Error: %s", test.metaData.ObjectID, destination1.DestType, destination1.DestID, err.Error()) } else { - // check notification record - // Look for delete notification for device:dev1(destination1) - notification, err := store.RetrieveNotificationRecord(test.metaData.DestOrgID, test.metaData.ObjectType, test.metaData.ObjectID, destination1.DestType, destination1.DestID) - if err != nil { - t.Errorf("An error occurred fetch notification for deleted destination (objectID = %s, destinationType = %s, destinationID = %s). Error: %s", test.metaData.ObjectID, destination1.DestType, destination1.DestID, err.Error()) - } else { - if notification == nil { - if test.metaData.NoData == true { + if notification == nil { + if test.metaData.NoData == true { + if i == MAX_RETRY-1 { t.Errorf("No delete notification record (objectID = %s, destinationType = %s, destinationID = %s)", test.metaData.ObjectID, destination1.DestType, destination1.DestID) + } else { + continue } - } else if test.metaData.NoData == false { + } + } else if test.metaData.NoData == false { + if i == MAX_RETRY-1 { t.Errorf("Notification record created for not ready to send object (objectID = %s, destinationType = %s, destinationID = %s)", test.metaData.ObjectID, destination1.DestType, destination1.DestID) } else { - if notification.Status != common.Delete { + continue + } + } else { + if notification.Status != common.Delete { + if i == MAX_RETRY-1 { t.Errorf("Wrong notification status: %s instead of delete (objectID = %s, destinationType = %s, destinationID = %s)", notification.Status, test.metaData.ObjectID, destination1.DestType, destination1.DestID) + } else { + continue } } } - } - } - } - } func TestObjectWithPolicyAPI(t *testing.T) { - common.Configuration.MongoDbName = "d_test_db" - store = &storage.MongoStorage{} + // // common.Configuration.MongoDbName = "d_test_db" + // // store = &storage.MongoStorage{} + // setupDB(common.Mongo) + // testObjectWithPolicyAPI(store, t) + + // // dir, _ := os.Getwd() + // // common.Configuration.PersistenceRootPath = dir + "/persist" + // // boltStore := &storage.BoltStorage{} + // // boltStore.Cleanup(true) + // // store = boltStore + // setupDB(common.Bolt) + // testObjectWithPolicyAPI(store, t) + + setupDB(common.Mongo) testObjectWithPolicyAPI(store, t) - dir, _ := os.Getwd() - common.Configuration.PersistenceRootPath = dir + "/persist" - boltStore := &storage.BoltStorage{} - boltStore.Cleanup(true) - store = boltStore + setupDB(common.Bolt) testObjectWithPolicyAPI(store, t) } func testObjectWithPolicyAPI(store storage.Storage, t *testing.T) { + + communications.Store = store + common.InitObjectLocks() + + if err := store.Init(); err != nil { + t.Errorf("Failed to initialize storage driver. Error: %s\n", err.Error()) + } + defer store.Stop() + tests := []struct { metaData common.MetaData recieved bool @@ -1136,12 +1408,6 @@ func testObjectWithPolicyAPI(store storage.Storage, t *testing.T) { Communication: common.HTTPProtocol}, } - communications.Store = store - if err := store.Init(); err != nil { - t.Errorf("Failed to initialize storage driver. Error: %s\n", err.Error()) - } - defer store.Stop() - communications.Comm = &communications.TestComm{} if err := communications.Comm.StartCommunication(); err != nil { t.Errorf("Failed to start test communication. Error: %s", err.Error()) @@ -1310,6 +1576,13 @@ func testObjectWithPolicyAPI(store storage.Storage, t *testing.T) { t.Errorf("Received %d objects with a destination policy. Expected %d.\n", len(policyInfo), 1) } + + time.Sleep(time.Duration(10) * time.Second) + +} + +func TestTearDownForApiModule(t *testing.T) { + teardownObjectQueue() } func destsEquals(dests1 []common.DestinationsStatus, dests2 []string) bool { diff --git a/core/base/apiServer_test.go b/core/base/apiServer_test.go index 4bce7a0..21dd2a0 100644 --- a/core/base/apiServer_test.go +++ b/core/base/apiServer_test.go @@ -1414,6 +1414,7 @@ func testAPIServerSetup(nodeType string, storageType string) string { } common.Configuration.NodeType = nodeType + objectQueue = communications.NewObjectWorkQueue(40) return "" } diff --git a/core/base/base.go b/core/base/base.go index 0625f08..b93a538 100644 --- a/core/base/base.go +++ b/core/base/base.go @@ -43,6 +43,9 @@ var started bool var waitersForStartChannel chan chan int +var objectQueue *communications.ObjectWorkQueue +var destReqQueue *communications.DestinationRequestQueue + func init() { blockChannel = make(chan int, 1) waitersForStartChannel = make(chan chan int, 40) @@ -151,6 +154,19 @@ func Start(swaggerFile string, registerHandlers bool) common.SyncServiceError { common.InitObjectLocks() + // storage, lock should be setup before initialize objectQueue + queueBufferSize := common.Configuration.ObjectQueueBufferSize + objectQueue = communications.NewObjectWorkQueue(queueBufferSize) + if trace.IsLogging(logger.INFO) { + trace.Info("ObjectQueue initialzed with buffer size %d", queueBufferSize) + } + + destReqQueue = communications.NewDestinationRequestQueue(queueBufferSize) + if trace.IsLogging(logger.INFO) { + trace.Info("DestinationRequestQueue initialzed with buffer size %d", queueBufferSize) + } + communications.DestReqQueue = destReqQueue + go func() { common.GoRoutineStarted() keepRunning := true @@ -320,6 +336,26 @@ func Stop(quiesceTime int, unregisterSelf bool) { stopHTTPServing() + if objectQueue != nil { + if trace.IsLogging(logger.INFO) { + trace.Info("Closing objectQueue...") + } + objectQueue.Close() + if trace.IsLogging(logger.INFO) { + trace.Info("ObjectQueue closed") + } + } + + if destReqQueue != nil { + if trace.IsLogging(logger.INFO) { + trace.Info("Closing destReqQueue...") + } + destReqQueue.Close() + if trace.IsLogging(logger.INFO) { + trace.Info("DestReqQueue closed") + } + } + communication.StopCommunication() security.Stop() diff --git a/core/communications/communicator.go b/core/communications/communicator.go index 86ffe12..06578fa 100644 --- a/core/communications/communicator.go +++ b/core/communications/communicator.go @@ -3,6 +3,7 @@ package communications import ( "bytes" "net/http" + "strings" "time" "github.com/open-horizon/edge-sync-service/common" @@ -108,6 +109,8 @@ var Store storage.Storage // Comm is the selected communications struct var Comm Communicator +var DestReqQueue *DestinationRequestQueue + // SendErrorResponse common code to send HTTP error codes func SendErrorResponse(writer http.ResponseWriter, err error, message string, statusCode int) { if statusCode == 0 { @@ -142,6 +145,35 @@ func SendErrorResponse(writer http.ResponseWriter, err error, message string, st } } +func IsTransportError(pResp *http.Response, err error) bool { + if err != nil { + if strings.Contains(err.Error(), ": EOF") { + return true + } + + l_error_string := strings.ToLower(err.Error()) + if strings.Contains(l_error_string, "time") && strings.Contains(l_error_string, "out") { + return true + } else if strings.Contains(l_error_string, "connection") && (strings.Contains(l_error_string, "refused") || strings.Contains(l_error_string, "reset")) { + return true + } + } + + if pResp != nil { + if pResp.StatusCode == http.StatusBadGateway { + // 502: bad gateway error + return true + } else if pResp.StatusCode == http.StatusGatewayTimeout { + // 504: gateway timeout + return true + } else if pResp.StatusCode == http.StatusServiceUnavailable { + //503: service unavailable + return true + } + } + return false +} + func destinationExists(orgID string, destType string, destID string) bool { exists, err := Store.DestinationExists(orgID, destType, destID) if err != nil { diff --git a/core/communications/destinationRequestQueue.go b/core/communications/destinationRequestQueue.go new file mode 100644 index 0000000..f7f916d --- /dev/null +++ b/core/communications/destinationRequestQueue.go @@ -0,0 +1,109 @@ +package communications + +import ( + "github.com/open-horizon/edge-sync-service/common" + "github.com/open-horizon/edge-utilities/logger" + "github.com/open-horizon/edge-utilities/logger/log" + "github.com/open-horizon/edge-utilities/logger/trace" +) + +type DestinationRequestQueue struct { + destinationRequestQueue chan common.DestinationRequestInQueue + bufferSize uint64 +} + +func NewDestinationRequestQueue(bufferSize uint64) *DestinationRequestQueue { + q := &DestinationRequestQueue{ + destinationRequestQueue: make(chan common.DestinationRequestInQueue, bufferSize), + bufferSize: bufferSize, + } + + go q.run() + return q +} + +func (q *DestinationRequestQueue) run() { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Check object queue to process destination update request") + } + + for { + select { + case i, ok := <-q.destinationRequestQueue: + if ok { + meta := i.Object + if trace.IsLogging(logger.TRACE) { + trace.Trace("Get an object %s/%s/%s from destination request queue, update destination %s %s to %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID, i.Status) + } + switch i.Action { + case common.Update: + requeue := false + lockIndex := common.HashStrings(meta.DestOrgID, meta.ObjectType, meta.ObjectID) + common.ObjectLocks.Lock(lockIndex) + + // 1. retrieve notification + notification, err := Store.RetrieveNotificationRecord(meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + if err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to retrieve notification for %s %s %s %s %s, resend request to queue", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + requeue = true + } else if notification == nil { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Get nil notification for %s %s %s %s %s, ignore destination update request", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + } else if existingMeta, _, err := Store.RetrieveObjectAndStatus(meta.DestOrgID, meta.ObjectType, meta.ObjectID); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to retrieve metadata for %s %s %s %s %s, resend request to queue", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + requeue = true + } else if existingMeta == nil { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Get nil metadata for %s %s %s %s %s, ignore destination update request", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + } else if existingMeta.InstanceID != meta.InstanceID { + if trace.IsLogging(logger.TRACE) { + trace.Trace("ExistingMeta.InstanceID(%d) != meta.InstanceID(%d) for %s %s %s %s %s, ignore destination update request", existingMeta.InstanceID, meta.InstanceID, meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + } else if notification.Status != common.Data && notification.Status != common.Updated && notification.Status != common.Update && notification.Status != common.UpdatePending && notification.Status != common.ReceivedByDestination { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Notification status (%s) is not one of (data, updated, update, updatePending, receivedByDest) for %s %s %s %s %s, ignore destination update request", notification.Status, meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + } else if _, err = Store.UpdateObjectDeliveryStatus(i.Status, "", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to update status to %s of destiantion %s %s %s %s %s, resend request to queue", i.Status, meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + requeue = true + } else { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Updated destination status to %s, for %s %s %s %s %s", i.Status, meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + } + + common.ObjectLocks.Unlock(lockIndex) + if requeue { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Put request back to queue, for %s %s %s %s %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destination.DestType, i.Destination.DestID) + } + q.destinationRequestQueue <- i + } + } + + } else { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Nothing from destination request queue") + } + } + + } + + } +} + +func (q *DestinationRequestQueue) Close() { + close(q.destinationRequestQueue) +} + +func (q *DestinationRequestQueue) SendDestReqToQueue(destReqInQueue common.DestinationRequestInQueue) { + q.destinationRequestQueue <- destReqInQueue +} diff --git a/core/communications/httpCommunication.go b/core/communications/httpCommunication.go index 9dbe7c8..cc8b622 100644 --- a/core/communications/httpCommunication.go +++ b/core/communications/httpCommunication.go @@ -60,7 +60,10 @@ func (communication *HTTP) StartCommunication() common.SyncServiceError { http.Handle(pingURL, http.StripPrefix(pingURL, http.HandlerFunc(communication.handlePing))) http.Handle(objectRequestURL, http.StripPrefix(objectRequestURL, http.HandlerFunc(communication.handleObjects))) } else { - communication.httpClient = http.Client{Transport: &http.Transport{}} + communication.httpClient = http.Client{ + Transport: &http.Transport{}, + Timeout: time.Second * time.Duration(common.Configuration.HTTPESSClientTimeout), + } if common.Configuration.HTTPCSSUseSSL && len(common.Configuration.HTTPCSSCACertificate) > 0 { var caFile string if strings.HasPrefix(common.Configuration.HTTPCSSCACertificate, "/") { @@ -146,10 +149,16 @@ func (communication *HTTP) HandleRegAck() { } func (communication *HTTP) createError(response *http.Response, action string) common.SyncServiceError { - message := fmt.Sprintf("Failed to %s. Received code: %d %s.", action, response.StatusCode, response.Status) - contents, err := ioutil.ReadAll(response.Body) - if err == nil { - message += " Error: " + string(contents) + message := "" + if response == nil { + message = fmt.Sprintf("Failed to %s. Received nil response.", action) + } else { + message = fmt.Sprintf("Failed to %s. Received code: %d %s.", action, response.StatusCode, response.Status) + contents, err := ioutil.ReadAll(response.Body) + if err == nil { + message += " Error: " + string(contents) + } + } if log.IsLogging(logger.ERROR) { log.Error(message) @@ -239,6 +248,7 @@ func (communication *HTTP) SendNotificationMessage(notificationTopic string, des trace.Debug("In SendNotificationMessage for %s. notificationTopic: %s destType: %s destID: %s\n", common.Configuration.NodeType, notificationTopic, destType, destID) } + // CSS if common.Configuration.NodeType == common.CSS { // Create pending notification to be sent as a response to a GET request var status string @@ -265,83 +275,111 @@ func (communication *HTTP) SendNotificationMessage(notificationTopic string, des return Store.UpdateNotificationRecord(notification) } + // ESS url := buildObjectURL(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, instanceID, dataID, notificationTopic) var request *http.Request + var response *http.Response var err error - if notificationTopic == common.Update || notificationTopic == common.Delete || notificationTopic == common.Deleted { - if metaData == nil { - return &Error{"No meta data"} - } - body, err := json.MarshalIndent(metaData, "", " ") - if err != nil { - return &Error{"Failed to marshal payload. Error: " + err.Error()} - } - request, err = http.NewRequest("PUT", url, bytes.NewReader(body)) - if err != nil { - return &Error{"Failed to create HTTP request. Error: " + err.Error()} - } - request.ContentLength = int64(len(body)) - } else { - request, err = http.NewRequest("PUT", url, nil) - if err != nil { - return &Error{"Failed to create HTTP request. Error: " + err.Error()} - } - } - security.AddIdentityToSPIRequest(request, url) - request.Close = true + for i := 0; i < common.Configuration.ESSSPIMaxRetry; i++ { + if notificationTopic == common.Update || notificationTopic == common.Delete || notificationTopic == common.Deleted { + if metaData == nil { + return &Error{"No meta data"} + } + body, err := json.MarshalIndent(metaData, "", " ") + if err != nil { + return &Error{"Failed to marshal payload. Error: " + err.Error()} + } - response, err := communication.requestWrapper.do(request) - if response != nil && response.Body != nil { - defer response.Body.Close() - } - if err != nil { - return &Error{"Failed to send HTTP request. Error: " + err.Error()} - } - if response.StatusCode == http.StatusNoContent { - switch notificationTopic { - case common.Update: - // Push the data - if metaData.Link == "" && !metaData.NoData && !metaData.MetaOnly { - if err = communication.pushData(metaData); err != nil { - return err - } + request, err = http.NewRequest("PUT", url, bytes.NewReader(body)) + if err != nil { + return &Error{"Failed to create HTTP request. Error: " + err.Error()} } - // Mark updated - if err = handleObjectUpdated(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, - destType, destID, instanceID, dataID); err != nil { - return err + request.ContentLength = int64(len(body)) + } else { + request, err = http.NewRequest("PUT", url, nil) + if err != nil { + return &Error{"Failed to create HTTP request. Error: " + err.Error()} } - case common.Delete: - return handleAckDelete(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, - destType, destID, instanceID, dataID) - case common.Deleted: - return handleAckObjectDeleted(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, - destType, destID, instanceID) - case common.Consumed: - return handleAckConsumed(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) - case common.Received: - return handleAckObjectReceived(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) } - return nil - } else if response.StatusCode == http.StatusConflict { - if trace.IsLogging(logger.TRACE) { - trace.Trace("A notification of type %s was ignored by the other side (object %s:%s, instance id = %d)\n", notificationTopic, - metaData.ObjectType, metaData.ObjectID, instanceID) + security.AddIdentityToSPIRequest(request, url) + request.Close = true + + response, err = communication.requestWrapper.do(request) + if response != nil && response.Body != nil { + defer response.Body.Close() } - // We don't resend ignored notifications - switch notificationTopic { - case common.Deleted: - return handleAckObjectDeleted(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID) - case common.Consumed: - return handleAckConsumed(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) - case common.Received: - return handleAckObjectReceived(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) + + if IsTransportError(response, err) { + respCode := 0 + errMsg := "" + if response != nil { + respCode = response.StatusCode + } + if err != nil { + errMsg = err.Error() + } + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In SendNotificationMessage, receive transport error %s from topic: (%d) %s, response code is %d, maxRetry: %d, retry...", errMsg, i, notificationTopic, respCode, common.Configuration.ESSSPIMaxRetry) + } + + time.Sleep(time.Duration(common.Configuration.ESSCallSPIRetryInterval) * time.Second) + continue + } else if err != nil { + return &Error{"Failed to send HTTP request. Error: " + err.Error()} + } else if response == nil { + return &Error{"Received nil response from HTTP request. Error: " + err.Error()} + } else { + if response.StatusCode == http.StatusNoContent { + switch notificationTopic { + case common.Update: + // Push the data + if metaData.Link == "" && !metaData.NoData && !metaData.MetaOnly { + if err = communication.pushData(metaData); err != nil { + return err + } + } + // Mark updated + if err = handleObjectUpdated(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, + destType, destID, instanceID, dataID); err != nil { + return err + } + case common.Delete: + return handleAckDelete(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, + destType, destID, instanceID, dataID) + case common.Deleted: + return handleAckObjectDeleted(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, + destType, destID, instanceID) + case common.Consumed: + return handleAckConsumed(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) + case common.Received: + return handleAckObjectReceived(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) + } + return nil + } else if response.StatusCode == http.StatusConflict { + if trace.IsLogging(logger.TRACE) { + trace.Trace("A notification of type %s was ignored by the other side (object %s:%s, instance id = %d)\n", notificationTopic, + metaData.ObjectType, metaData.ObjectID, instanceID) + } + // We don't resend ignored notifications + switch notificationTopic { + case common.Deleted: + return handleAckObjectDeleted(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID) + case common.Consumed: + return handleAckConsumed(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) + case common.Received: + return handleAckObjectReceived(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, destType, destID, instanceID, dataID) + } + return nil + } } - return nil } - + // reach here if still see 504 timeout + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In SendNotificationMessage, out of retry for %s %s %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } return communication.createError(response, "send notification "+notificationTopic) } @@ -600,10 +638,36 @@ func (communication *HTTP) GetData(metaData common.MetaData, offset int64) commo trace.Trace("In http.GetData %s %s", metaData.ObjectType, metaData.ObjectID) } + // For debugging + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In http.GetData, retrieve notification %s, %s. %s, %s, %s", metaData.DestID, metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID) + + if n, err := Store.RetrieveNotificationRecord(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID); err != nil { + trace.Debug("Error when retrieve notification record, %s", err.Error()) + } else if n == nil { + trace.Debug("In GetData: nil notifications") + } else { + trace.Debug("In GetData: notification status %s", n.Status) + } + trace.Debug("In http.GetData, updating notification %s, %s. %s, %s, %s to getdata status", metaData.DestID, metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID) + } + if err := updateGetDataNotification(metaData, metaData.OriginType, metaData.OriginID, offset); err != nil { return err } + // now the ESS notification status is "getdata" + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Checking notifications after updating notification status") + if n, err := Store.RetrieveNotificationRecord(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID); err != nil { + trace.Debug("Error when retrieve notification record, %s\n", err.Error()) + } else if n == nil { + trace.Debug("Nil notifications") + } else { + trace.Debug("Notification status is %s after updating", n.Status) + } + } + url := buildObjectURL(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.InstanceID, metaData.DataID, common.Data) request, err := http.NewRequest("GET", url, nil) if err != nil { @@ -623,7 +687,8 @@ func (communication *HTTP) GetData(metaData common.MetaData, offset int64) commo return &common.NotFound{} } if response.StatusCode != http.StatusOK { - return ¬ificationHandlerError{"Error in GetData: failed to receive data from the other side"} + msg := fmt.Sprintf("Error in GetData: failed to receive data from the other side. Error code: %d, ", response.StatusCode) + return ¬ificationHandlerError{msg} } lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) @@ -633,8 +698,8 @@ func (communication *HTTP) GetData(metaData common.MetaData, offset int64) commo if common.IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" { dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) if dataVerified, err := dataVf.VerifyDataSignature(response.Body, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !dataVerified || err != nil { - if trace.IsLogging(logger.ERROR) { - trace.Error("Failed to verify data for object %s %s, remove temp data\n", metaData.ObjectType, metaData.ObjectID) + if log.IsLogging(logger.ERROR) { + log.Error("Failed to verify data for object %s %s, remove temp data\n", metaData.ObjectType, metaData.ObjectID) } dataVf.RemoveTempData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI) common.ObjectLocks.Unlock(lockIndex) @@ -664,18 +729,27 @@ func (communication *HTTP) GetData(metaData common.MetaData, offset int64) commo } } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Updating ESS object status to completelyReceived for %s %s %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } if err := Store.UpdateObjectStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, common.CompletelyReceived); err != nil { common.ObjectLocks.Unlock(lockIndex) return &Error{fmt.Sprintf("Error in GetData: %s\n", err)} } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("ESS object status updated to completelyReceived for %s %s %s", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } handleDataReceived(metaData) - notificationsInfo, err := PrepareObjectStatusNotification(metaData, common.Received) common.ObjectLocks.Unlock(lockIndex) + + notificationsInfo, err := PrepareObjectStatusNotification(metaData, common.Received) if err != nil { return err } + + // Send "received" notification if err := SendNotifications(notificationsInfo); err != nil { return err } @@ -743,18 +817,28 @@ func (communication *HTTP) Poll() bool { if trace.IsLogging(logger.TRACE) { trace.Trace("Polled the CSS, received %d objects.\n", len(payload)) } + for _, message := range payload { switch message.Type { case common.Update: - if err = handleUpdate(message.MetaData, 1); err != nil && !isIgnoredByHandler(err) { - if log.IsLogging(logger.ERROR) { - log.Error("Failed to handle update. Error: %s\n", err) - } - if common.IsNotFound(err) { - deleteObjectInfo("", "", "", message.MetaData.OriginType, message.MetaData.OriginID, - &message.MetaData, true) - } else if err = communication.SendErrorMessage(err, &message.MetaData, true); err != nil && log.IsLogging(logger.ERROR) { - log.Error("Failed to send error message. Error: %s\n", err) + if err = handleUpdate(message.MetaData, 1); err != nil { + if isIgnoredByHandler(err) { + if log.IsLogging(logger.DEBUG) { + log.Error("Ignore handler error, ignore for %s %s %s %d", message.MetaData.DestOrgID, message.MetaData.ObjectType, message.MetaData.ObjectID, message.MetaData.InstanceID) + } + } else { + if log.IsLogging(logger.ERROR) { + log.Error("Failed to handle update for %s %s %s %d. Error: %s\n", message.MetaData.DestOrgID, message.MetaData.ObjectType, message.MetaData.ObjectID, message.MetaData.InstanceID, err) + } + if common.IsNotFound(err) { + if log.IsLogging(logger.DEBUG) { + log.Error("Not found error, delete object info for %s %s %s %d", message.MetaData.DestOrgID, message.MetaData.ObjectType, message.MetaData.ObjectID, message.MetaData.InstanceID) + } + deleteObjectInfo("", "", "", message.MetaData.OriginType, message.MetaData.OriginID, + &message.MetaData, true) + } else if err = communication.SendErrorMessage(err, &message.MetaData, true); err != nil && log.IsLogging(logger.ERROR) { + log.Error("Failed to send error message. Error: %s\n", err) + } } } case common.Consumed: @@ -892,7 +976,6 @@ func (communication *HTTP) handleObjects(writer http.ResponseWriter, request *ht case common.Received: err = handleObjectReceived(orgID, objectType, objectID, destType, destID, instanceID, dataID) case common.Feedback: - payload := feedbackMessage{} if err = json.NewDecoder(request.Body).Decode(&payload); err == nil { err = handleFeedback(orgID, objectType, objectID, destType, destID, instanceID, dataID, payload.Code, payload.RetryInterval, payload.Reason) @@ -946,6 +1029,9 @@ func (communication *HTTP) handleObjects(writer http.ResponseWriter, request *ht writer.WriteHeader(http.StatusBadRequest) return } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In handleObjects: GET request: %s", action) + } communication.handleGetData(orgID, objectType, objectID, destType, destID, instanceID, dataID, writer, request) } else { writer.WriteHeader(http.StatusMethodNotAllowed) @@ -972,8 +1058,8 @@ func (communication *HTTP) handlePutData(orgID string, objectType string, object if common.IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" { dataVf = dataVerifier.NewDataVerifier(metaData.HashAlgorithm, metaData.PublicKey, metaData.Signature) if dataVerified, err := dataVf.VerifyDataSignature(request.Body, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI); !dataVerified || err != nil { - if trace.IsLogging(logger.ERROR) { - trace.Error("Failed to verify data for object %s %s, remove temp data\n", metaData.ObjectType, metaData.ObjectID) + if log.IsLogging(logger.ERROR) { + log.Error("Failed to verify data for object %s %s, remove temp data\n", metaData.ObjectType, metaData.ObjectID) } dataVf.RemoveTempData(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestinationDataURI) common.ObjectLocks.Unlock(lockIndex) @@ -1002,8 +1088,9 @@ func (communication *HTTP) handlePutData(orgID string, objectType string, object if metaData, err := Store.RetrieveObject(orgID, objectType, objectID); err == nil && metaData != nil { handleDataReceived(*metaData) - notificationsInfo, err := PrepareObjectStatusNotification(*metaData, common.Received) common.ObjectLocks.Unlock(lockIndex) + notificationsInfo, err := PrepareObjectStatusNotification(*metaData, common.Received) + if err != nil { return err } @@ -1021,10 +1108,48 @@ func (communication *HTTP) handlePutData(orgID string, objectType string, object func (communication *HTTP) handleGetData(orgID string, objectType string, objectID string, destType string, destID string, instanceID int64, dataID int64, writer http.ResponseWriter, request *http.Request) { + + updateNotificationRecord := false + if trace.IsLogging(logger.TRACE) { + trace.Trace("Handling object get data of %s %s %s %s \n", objectType, objectID, destType, destID) + } lockIndex := common.HashStrings(orgID, objectType, objectID) common.ObjectLocks.Lock(lockIndex) defer common.ObjectLocks.Unlock(lockIndex) + if trace.IsLogging(logger.DEBUG) { + trace.Trace("Handling object get data, retrieve notification record for %s %s %s %s %s\n", orgID, objectType, objectID, destType, destID) + } + notification, err := Store.RetrieveNotificationRecord(orgID, objectType, objectID, destType, destID) + if err != nil { + SendErrorResponse(writer, err, "", 0) + } else if notification == nil { + err = &Error{"Error in handleGetData: no notification to update."} + SendErrorResponse(writer, err, "", 0) + } else if notification.InstanceID != instanceID { + if log.IsLogging(logger.ERROR) { + log.Error("Handling object get data, notification.InstanceID(%d) != metaData,InstanceID(%d), notification status(%s) for %s %s %s %s %s\n", notification.InstanceID, instanceID, notification.Status, orgID, objectType, objectID, destType, destID) + } + + err = &ignoredByHandler{"Error in handleGetData: notification.InstanceID != instanceID or notification status is not updated."} + SendErrorResponse(writer, err, "", 0) + } else if notification.Status == common.Updated || notification.Status == common.Update || notification.Status == common.UpdatePending { + // notification.InstanceID == instanceID + updateNotificationRecord = true + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In handleGetData: notification (status: %s) is updated status, for %s %s %s %s %s, set updateNotificationRecord to %t \n", notification.Status, orgID, objectType, objectID, destType, destID, updateNotificationRecord) + } + } else { + // notification status "error" cannot update notification status to "data" + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In handleGetData: notification (status: %s) is not in updated status, for %s %s %s %s %s, set updateNotificationRecord to %t \n", notification.Status, orgID, objectType, objectID, destType, destID, updateNotificationRecord) + } + } + + if trace.IsLogging(logger.DEBUG) { + trace.Trace("Handling object get data, retrieve object data for %s %s\n", objectType, objectID) + } + if dataReader, err := Store.RetrieveObjectData(orgID, objectType, objectID); err != nil { SendErrorResponse(writer, err, "", 0) } else { @@ -1039,9 +1164,30 @@ func (communication *HTTP) handleGetData(orgID string, objectType string, object if err := Store.CloseDataReader(dataReader); err != nil { SendErrorResponse(writer, err, "", 0) } - notification := common.Notification{ObjectID: objectID, ObjectType: objectType, - DestOrgID: orgID, DestID: destID, DestType: destType, Status: common.Data, InstanceID: instanceID, DataID: dataID} - Store.UpdateNotificationRecord(notification) + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Handling object get data, update notification for %s %s %s %s, status: %s\n", objectType, objectID, destType, destID, common.Data) + } + // update notification only if current notification.InstanceID == metadata.InstanceID && current notification.status == "updated" + if updateNotificationRecord { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Handling object get data, update notification status to data for %s %s %s %s %s\n", orgID, objectType, objectID, destType, destID) + } + notification := common.Notification{ObjectID: objectID, ObjectType: objectType, + DestOrgID: orgID, DestID: destID, DestType: destType, Status: common.Data, InstanceID: instanceID, DataID: dataID} + if err = Store.UpdateNotificationRecord(notification); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Handling object get data, failed to update notification for %s %s %s %s with status: %s\n", objectType, objectID, destType, destID, common.Data) + } + } else { + if trace.IsLogging(logger.DEBUG) { + log.Debug("Handling object get data, update notification for %s %s %s %s with status %s is done\n", objectType, objectID, destType, destID, common.Data) + } + } + } else { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Handling object get data, return without update notification status to data for %s %s %s %s %s, set updateNotificationRecord to %t \n", orgID, objectType, objectID, destType, destID, updateNotificationRecord) + } + } } } } @@ -1164,39 +1310,123 @@ func (communication *HTTP) SendFeedbackMessage(code int, retryInterval int32, re return nil } + // code: 500, retry interval: 0, reason: "Error in GetData: failed to receive data from the other side" + if trace.IsLogging(logger.DEBUG) { + trace.Trace("SendFeedbackMessage: update notification record status to %s for object %s %s %s\n", common.ReceiverError, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + + lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + common.ObjectLocks.Lock(lockIndex) + notification := common.Notification{ObjectID: metaData.ObjectID, ObjectType: metaData.ObjectType, + DestOrgID: metaData.DestOrgID, DestID: metaData.OriginID, DestType: metaData.OriginType, + Status: common.ReceiverError, InstanceID: metaData.InstanceID, DataID: metaData.DataID} + + // Store the notification records in storage as part of the object + if err := Store.UpdateNotificationRecord(notification); err != nil { + common.ObjectLocks.Unlock(lockIndex) + if log.IsLogging(logger.ERROR) { + log.Error("In SendFeedbackMessage, failed to update notification record status to %s\n", common.ReceiverError) + } + return err + } + + common.ObjectLocks.Unlock(lockIndex) + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("SendFeedbackMessage: call feedback SPI %s %s %s instanceID: %d, dataID: %d\n", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.InstanceID, metaData.DataID) + } url := buildObjectURL(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.InstanceID, metaData.DataID, common.Feedback) var request *http.Request + var response *http.Response + var err error body, err := json.MarshalIndent(feedbackMessage{code, retryInterval, reason}, "", " ") if err != nil { return &Error{"Failed to marshal payload. Error: " + err.Error()} } - request, err = http.NewRequest("PUT", url, bytes.NewReader(body)) - if err != nil { - return &Error{"Failed to create HTTP request. Error: " + err.Error()} - } - request.ContentLength = int64(len(body)) + for i := 0; i < common.Configuration.ESSSPIMaxRetry; i++ { + request, err = http.NewRequest("PUT", url, bytes.NewReader(body)) + if err != nil { + return &Error{"Failed to create HTTP request. Error: " + err.Error()} + } + request.ContentLength = int64(len(body)) - security.AddIdentityToSPIRequest(request, url) + security.AddIdentityToSPIRequest(request, url) + response, err = communication.requestWrapper.do(request) + if response != nil && response.Body != nil { + defer response.Body.Close() + } - response, err := communication.requestWrapper.do(request) - if response != nil && response.Body != nil { - defer response.Body.Close() + if IsTransportError(response, err) { + respCode := 0 + errMsg := "" + if response != nil { + respCode = response.StatusCode + } + if err != nil { + errMsg = err.Error() + } + + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In SendFeedbackMessage: i: %d, receive %d from feedback spi, error: %s \n", i, respCode, errMsg) + } + if n, _ := Store.RetrieveNotificationRecord(notification.DestOrgID, notification.ObjectType, notification.ObjectID, + notification.DestType, notification.DestID); n != nil && n.Status == common.ReceiverError { + // retry /feedback when ESS doesn't receive new changes, + err = communication.createError(response, "send feedback") + time.Sleep(time.Duration(common.Configuration.ESSCallSPIRetryInterval) * time.Second) + continue + } else { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In SendFeedbackMessage: receive %d from feedback spi for %s %s %s, but notification is nil or status is not receiverError \n", response.StatusCode, metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + if n == nil { + trace.Debug("notification is nil\n") + } else if n.Status != common.ReceiverError { + trace.Debug("notification status (%s) is not receiverError\n", n.Status) + } + } + + err = communication.createError(response, "send feedback") + return err + } + } else if err != nil { + return &Error{"Failed to send HTTP request. Error: " + err.Error()} + } else if response == nil { + return &Error{"Received nil response from feedback HTTP request. Error: " + err.Error()} + } else if response.StatusCode == http.StatusNoContent { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In SendFeedbackMessage, i: %d, received %d from feedback spi \n", i, response.StatusCode) + } + return nil + } else { + // receive 409, ... + err = communication.createError(response, "send feedback") + return err + } } + if err != nil { - return &Error{"Failed to send HTTP request. Error: " + err.Error()} - } - if response.StatusCode == http.StatusNoContent { - return nil + // reach here if still receive 504 and out of retry + if log.IsLogging(logger.ERROR) { + log.Error("SendFeedbackMessage out of retry for %s %s %s\n", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + return err } - return communication.createError(response, "send feedback") + if trace.IsLogging(logger.DEBUG) { + trace.Debug("SendFeedbackMessage return with no error %s %s %s\n", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + } + return nil } // SendErrorMessage sends an error message from the ESS to the CSS or from the CSS to the ESS func (communication *HTTP) SendErrorMessage(err common.SyncServiceError, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In SendErrorMessage for %s, %s, %s, %s, %s, error is: %s\n", metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.DestType, metaData.DestID, err.Error()) + } + if common.Configuration.NodeType != common.ESS { // In HTTP the CSS sends error message in HTTP response return nil diff --git a/core/communications/httpCommunications_test.go b/core/communications/httpCommunications_test.go index 80bd9a1..8d0d20e 100644 --- a/core/communications/httpCommunications_test.go +++ b/core/communications/httpCommunications_test.go @@ -32,6 +32,7 @@ func TestHTTPCommUpdatedObjects(t *testing.T) { } defer Store.Stop() defer security.Stop() + defer DestReqQueue.Close() destination := common.Destination{DestOrgID: "myorg000", DestID: "dev1", DestType: "httpDevice", Communication: common.HTTPProtocol} if err := Store.StoreDestination(destination); err != nil { @@ -130,6 +131,7 @@ func TestHttpCommCssMisc(t *testing.T) { } defer Store.Stop() defer security.Stop() + defer DestReqQueue.Close() destination := common.Destination{DestOrgID: common.Configuration.OrgID, DestType: "httpDevice", DestID: "dev1", Communication: common.HTTPProtocol} if err := Store.StoreDestination(destination); err != nil { @@ -200,6 +202,7 @@ func TestHTTPCommEssSendObjects(t *testing.T) { } defer Store.Stop() defer security.Stop() + defer DestReqQueue.Close() testObjects := []httpTestEssSendObjectInfo{ {common.MetaData{ObjectID: "1", ObjectType: "type2", DestOrgID: "myorg000", OriginID: "dev1", OriginType: "httpDevice"}, @@ -288,6 +291,7 @@ func TestEssHTTPComm(t *testing.T) { t.Errorf(status) } defer Store.Stop() + defer DestReqQueue.Close() ctx.subTest = "register" err = httpComm.Register() @@ -584,6 +588,9 @@ func testHTTPCommSetup(nodeType string) string { return fmt.Sprintf("Failed to start HTTP communication. Error: %s", err.Error()) } Comm = httpComm + + DestReqQueue = NewDestinationRequestQueue(40) + return "" } diff --git a/core/communications/notification.go b/core/communications/notification.go index bf19d85..968f0ab 100644 --- a/core/communications/notification.go +++ b/core/communications/notification.go @@ -22,41 +22,50 @@ func (e *invalidNotification) Error() string { } // PrepareObjectNotifications sends notifications to object’s destinations -// This function should not acquire an object lock (common.ObjectLocks) as the caller has already acquired one. func PrepareObjectNotifications(metaData common.MetaData) ([]common.NotificationInfo, common.SyncServiceError) { + lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + common.ObjectLocks.Lock(lockIndex) + destinations, err := Store.GetObjectDestinations(metaData) if err == nil { err = Store.UpdateObjectDelivering(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) if err != nil && log.IsLogging(logger.ERROR) { log.Error("Failed to update object's delivery status. Error: " + err.Error()) } + common.ObjectLocks.Unlock(lockIndex) return PrepareUpdateNotification(metaData, destinations) } + common.ObjectLocks.Unlock(lockIndex) return nil, nil } // PrepareDeleteNotifications prepares the delete notification message -// This function should not acquire an object lock (common.ObjectLocks) as the caller has already acquired one. func PrepareDeleteNotifications(metaData common.MetaData) ([]common.NotificationInfo, common.SyncServiceError) { + lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + common.ObjectLocks.Lock(lockIndex) destinations, err := Store.GetObjectDestinations(metaData) if err != nil { + common.ObjectLocks.Unlock(lockIndex) return nil, err } - + common.ObjectLocks.Unlock(lockIndex) return prepareNotifications(common.Delete, metaData, destinations) } // PrepareNotificationsForDestinations prepares notification messages for the destinations if necessary -// This function should not acquire an object lock (common.ObjectLocks) as the caller has already acquired one. func PrepareNotificationsForDestinations(metaData common.MetaData, destinations []common.StoreDestinationStatus, topic string) ([]common.NotificationInfo, common.SyncServiceError) { dests := make([]common.Destination, 0) + for _, dest := range destinations { + lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + common.ObjectLocks.Lock(lockIndex) // If topic is delete (delete destination), and destination status is pending => notification will NOT be prepared for this destination if topic != common.Delete || (dest.Status == common.Delivering || dest.Status == common.Delivered || dest.Status == common.Consumed || dest.Status == common.Error) { dests = append(dests, dest.Destination) } + common.ObjectLocks.Unlock(lockIndex) } return prepareNotifications(topic, metaData, dests) } @@ -66,15 +75,25 @@ func prepareNotifications(topic string, metaData common.MetaData, destinations [ // Create an initial notification record for each destination for _, destination := range destinations { + lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + common.ObjectLocks.Lock(lockIndex) notification := common.Notification{ObjectID: metaData.ObjectID, ObjectType: metaData.ObjectType, DestOrgID: metaData.DestOrgID, DestID: destination.DestID, DestType: destination.DestType, Status: topic, InstanceID: metaData.InstanceID, DataID: metaData.DataID} + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, prepareNotifications update notification for %s %s %s %s with status %s\n", metaData.ObjectType, metaData.ObjectID, destination.DestType, destination.DestID, topic) + } // Store the notification records in storage as part of the object if err := Store.UpdateNotificationRecord(notification); err != nil { + common.ObjectLocks.Unlock(lockIndex) return nil, err } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, prepareNotifications, update notification for %s %s %s %s with status %s is done\n", metaData.ObjectType, metaData.ObjectID, destination.DestType, destination.DestID, topic) + } + // Set the DestID in case the object was for destinations of a type or a destinations list metaData.DestType = destination.DestType metaData.DestID = destination.DestID @@ -82,30 +101,40 @@ func prepareNotifications(topic string, metaData common.MetaData, destinations [ notificationInfo := common.NotificationInfo{NotificationTopic: topic, DestType: metaData.DestType, DestID: metaData.DestID, InstanceID: metaData.InstanceID, DataID: metaData.DataID, MetaData: &metaData} result = append(result, notificationInfo) + common.ObjectLocks.Unlock(lockIndex) } return result, nil } // PrepareUpdateNotification prepares the notification message from object's meta data -// This function should not acquire an object lock (common.ObjectLocks) as the caller has already acquired one. func PrepareUpdateNotification(metaData common.MetaData, destinations []common.Destination) ([]common.NotificationInfo, common.SyncServiceError) { return prepareNotifications(common.Update, metaData, destinations) } // PrepareObjectStatusNotification sends an object status message to the other side -// This function should not acquire an object lock (common.ObjectLocks) as the caller has already acquired one. func PrepareObjectStatusNotification(metaData common.MetaData, status string) ([]common.NotificationInfo, common.SyncServiceError) { + lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + common.ObjectLocks.Lock(lockIndex) notification := common.Notification{ObjectID: metaData.ObjectID, ObjectType: metaData.ObjectType, DestOrgID: metaData.DestOrgID, DestID: metaData.OriginID, DestType: metaData.OriginType, Status: status, InstanceID: metaData.InstanceID, DataID: metaData.DataID} + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, PrepareObjectStatusNotification, update notification for %s %s %s %s with status %s\n", metaData.ObjectType, metaData.ObjectID, notification.DestType, notification.DestID, notification.Status) + } // Store the notification records in storage as part of the object if err := Store.UpdateNotificationRecord(notification); err != nil { + common.ObjectLocks.Unlock(lockIndex) return nil, err } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, PrepareObjectStatusNotification, update notification for %s %s %s %s with status %s is done\n", metaData.ObjectType, metaData.ObjectID, notification.DestType, notification.DestID, notification.Status) + } notificationInfo := common.NotificationInfo{NotificationTopic: status, DestType: metaData.OriginType, DestID: metaData.OriginID, InstanceID: metaData.InstanceID, DataID: metaData.DataID, MetaData: &metaData} + + common.ObjectLocks.Unlock(lockIndex) return []common.NotificationInfo{notificationInfo}, nil } @@ -121,6 +150,7 @@ func SendNotifications(notifications []common.NotificationInfo) common.SyncServi } func resendNotificationsForDestination(dest common.Destination, resendReceivedObjects bool) common.SyncServiceError { + // Update, Received, GetData, Error, Delete, Deleted notifications, err := Store.RetrieveNotifications(dest.DestOrgID, dest.DestType, dest.DestID, resendReceivedObjects) if err != nil { message := fmt.Sprintf("Error in resendNotificationsForDestination. Error: %s\n", err) @@ -130,18 +160,38 @@ func resendNotificationsForDestination(dest common.Destination, resendReceivedOb return &invalidNotification{message} } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, resendNotificationsForDestination: get %d notification to resend\n", len(notifications)) + } + if len(notifications) > 0 { for _, notification := range notifications { // Retrieve the notification in case it was changed since the call to RetrieveNotifications + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, Retrieve the notification in case it was changed since the call to RetrieveNotifications\n") + } lockIndex := common.HashStrings(notification.DestOrgID, notification.ObjectType, notification.ObjectID) common.ObjectLocks.Lock(lockIndex) n, _ := Store.RetrieveNotificationRecord(notification.DestOrgID, notification.ObjectType, notification.ObjectID, notification.DestType, notification.DestID) if n == nil || n.Status != notification.Status || n.ResendTime != notification.ResendTime { + if trace.IsLogging(logger.DEBUG) { + if n == nil { + trace.Debug("In notification.go, resendNotificationsForDestination: n is nil, continue") + } else if n.Status != notification.Status { + trace.Debug("In notification.go, resendNotificationsForDestination: n.Status: %s, notificaiton.Status: %s, continue", n.Status, notification.Status) + } else { + trace.Debug("In notification.go, resendNotificationsForDestination: n.ResendTime: %d, notificaiton.ResendTime: %d, continue", n.ResendTime, notification.ResendTime) + } + } common.ObjectLocks.Unlock(lockIndex) continue } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, resendNotificationsForDestination: n.status is %s, retrive object and status...", n.Status) + } + metaData, status, err := Store.RetrieveObjectAndStatus(n.DestOrgID, n.ObjectType, n.ObjectID) if err != nil { message := fmt.Sprintf("Error in resendNotificationsForDestination. Error: %s\n", err) @@ -153,11 +203,17 @@ func resendNotificationsForDestination(dest common.Destination, resendReceivedOb } if metaData == nil { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, resendNotificationsForDestination: metaData is nil, delete notification record and continue") + } Store.DeleteNotificationRecords(n.DestOrgID, n.ObjectType, n.ObjectID, "", "") common.ObjectLocks.Unlock(lockIndex) continue } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, resendNotificationsForDestination: update notification resendtime") + } if err := Store.UpdateNotificationResendTime(*n); err != nil { if log.IsLogging(logger.ERROR) { log.Error(err.Error()) @@ -166,8 +222,16 @@ func resendNotificationsForDestination(dest common.Destination, resendReceivedOb continue } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, resendNotificationsForDestination: metaData.InstanceID: %d, notification.InstanceID: %d", metaData.InstanceID, n.InstanceID) + } + switch n.Status { case common.Getdata: + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, notification getdata status for destination, need to resend object %s %s to destination %s %s", n.ObjectType, n.ObjectID, n.DestType, n.DestID) + } + if status != common.PartiallyReceived { common.ObjectLocks.Unlock(lockIndex) continue @@ -181,20 +245,30 @@ func resendNotificationsForDestination(dest common.Destination, resendReceivedOb } if err = Comm.GetData(*metaData, offset); err != nil { if common.IsNotFound(err) { + if log.IsLogging(logger.ERROR) { + log.Error("Resending GetData, get notFound error for offset %d of %s:%s:%s, deleting object Info...", offset, n.DestOrgID, n.ObjectType, n.ObjectID) + } deleteObjectInfo("", "", "", n.DestType, n.DestID, metaData, true) } break } } Comm.UnlockDataChunks(lockIndex, metaData) - + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, notification getdata status for destination, resend object %s %s to destination %s %s done", n.ObjectType, n.ObjectID, n.DestType, n.DestID) + } case common.ReceivedByDestination: fallthrough + case common.CompletelyReceived: + fallthrough case common.Data: if dest.DestType == "" { common.ObjectLocks.Unlock(lockIndex) continue } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, notification data status for destination, need to resend object %s %s to destination %s %s\n", n.ObjectType, n.ObjectID, n.DestType, n.DestID) + } // We get here only when an ESS without persistent storage reconnects, // and the CSS has a notification with "data" or "received by destination" status. // Send update notification for this object. @@ -207,10 +281,41 @@ func resendNotificationsForDestination(dest common.Destination, resendReceivedOb metaData.DestType = n.DestType metaData.DestID = n.DestID err = Comm.SendNotificationMessage(common.Update, dest.DestType, dest.DestID, metaData.InstanceID, metaData.DataID, metaData) + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, done with resend objects for notification with data status, metaData.DestType: %s, metaData.DestID: %s\n", metaData.DestType, metaData.DestID) + } + case common.Error: + // resend only when error notification instance ID == metadata instanceID + if metaData.InstanceID == n.InstanceID { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, notification error status for destination, need to resend object %s %s to destination %s %s\n", n.ObjectType, n.ObjectID, n.DestType, n.DestID) + } + n.Status = common.Update + n.ResendTime = 0 + if err := Store.UpdateNotificationRecord(*n); err != nil && log.IsLogging(logger.ERROR) { + log.Error("Failed to update notification record. Error: " + err.Error()) + } + common.ObjectLocks.Unlock(lockIndex) + metaData.DestType = n.DestType + metaData.DestID = n.DestID + err = Comm.SendNotificationMessage(common.Update, n.DestType, n.DestID, metaData.InstanceID, metaData.DataID, metaData) + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, done with resend objects for notification with error, metaData.DestType: %s, metaData.DestID: %s", metaData.DestType, metaData.DestID) + } + } else { + common.ObjectLocks.Unlock(lockIndex) + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, error notification.InstanceID(%d) != metadata.InstanceID(%d), ignore resend object %s %s to destination %s %s", n.InstanceID, metaData.InstanceID, n.ObjectType, n.ObjectID, n.DestType, n.DestID) + } + } + default: common.ObjectLocks.Unlock(lockIndex) metaData.DestType = n.DestType metaData.DestID = n.DestID + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In notification.go, notification %s status for destination, need to resend object %s %s to destination %s %s", n.Status, n.ObjectType, n.ObjectID, n.DestType, n.DestID) + } err = Comm.SendNotificationMessage(n.Status, n.DestType, n.DestID, n.InstanceID, n.DataID, metaData) } if err != nil { @@ -220,6 +325,7 @@ func resendNotificationsForDestination(dest common.Destination, resendReceivedOb } return &invalidNotification{message} } + } } @@ -276,9 +382,9 @@ func ActivateObjects() { } common.ObjectLocks.Unlock(lockIndex) } else if status == common.ReadyToSend { + common.ObjectLocks.Unlock(lockIndex) object.Inactive = false notificationsInfo, err := PrepareObjectNotifications(object) - common.ObjectLocks.Unlock(lockIndex) if err == nil { if err := SendNotifications(notificationsInfo); err != nil && log.IsLogging(logger.ERROR) { log.Error("Error in ActivateObjects: %s\n", err) diff --git a/core/communications/notificationHandler.go b/core/communications/notificationHandler.go index 7fcc38c..2b29005 100644 --- a/core/communications/notificationHandler.go +++ b/core/communications/notificationHandler.go @@ -137,10 +137,7 @@ func handleRegisterNew(dest common.Destination, persistentStorage bool) common.S destinations := make([]common.Destination, 1) destinations[0] = dest for _, metaData := range objects { - lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) - common.ObjectLocks.Lock(lockIndex) notificationsInfo, err := PrepareUpdateNotification(metaData, destinations) - common.ObjectLocks.Unlock(lockIndex) if err == nil { if err := SendNotifications(notificationsInfo); err != nil { return err @@ -282,10 +279,7 @@ func handleRegAck() { destinations, _ := Store.GetObjectDestinations(objects[0]) for _, metaData := range objects { - lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) - common.ObjectLocks.Lock(lockIndex) notificationsInfo, err := PrepareUpdateNotification(metaData, destinations) - common.ObjectLocks.Unlock(lockIndex) if err == nil { if err := SendNotifications(notificationsInfo); err != nil { if trace.IsLogging(logger.ERROR) { @@ -307,7 +301,7 @@ func handleRegAck() { // Handle a notification about object update func handleUpdate(metaData common.MetaData, maxInflightChunks int) common.SyncServiceError { if trace.IsLogging(logger.TRACE) { - trace.Trace("Handling update of %s %s\n", metaData.ObjectType, metaData.ObjectID) + trace.Trace("Handling update of %s %s, instanceID: %d\n", metaData.ObjectType, metaData.ObjectID, metaData.InstanceID) } lockIndex := common.HashStrings(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) @@ -315,27 +309,53 @@ func handleUpdate(metaData common.MetaData, maxInflightChunks int) common.SyncSe notificationDataID := int64(-1) if notification, err := Store.RetrieveNotificationRecord(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, - metaData.OriginType, metaData.OriginID); err == nil && notification != nil { + metaData.OriginType, metaData.OriginID); err == nil && notification != nil && notification.Status != common.ReceiverError { if notification.InstanceID >= metaData.InstanceID { // This object has been sent already, ignore if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring object update of %s %s\n", metaData.ObjectType, metaData.ObjectID) + trace.Trace("Ignoring object update of %s %s %s %s, notification status: %s, notification.InstanceID: %d, send notification to other side\n", metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID, notification.Status, notification.InstanceID) } - common.ObjectLocks.Unlock(lockIndex) - - // Send ack to prevent resends of this notification - Comm.SendNotificationMessage(common.Updated, metaData.OriginType, metaData.OriginID, metaData.InstanceID, metaData.DataID, - &metaData) + existingMetaData, status, _ := Store.RetrieveObjectAndStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + if existingMetaData != nil && notification.InstanceID == metaData.InstanceID && (status == common.CompletelyReceived || status == common.ObjReceived) { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("The new object polled %s %s %s %s, existingMetaData.InstanceID: %d, object status %s is already completelyReceived, send %s notification to the other side", metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID, existingMetaData.InstanceID, status, common.Received) + } + // The new object polled is already completelyReceived or objReceived + common.ObjectLocks.Unlock(lockIndex) + Comm.SendNotificationMessage(common.Received, metaData.OriginType, metaData.OriginID, metaData.InstanceID, metaData.DataID, + &metaData) + } else { + if trace.IsLogging(logger.DEBUG) { + if existingMetaData == nil { + trace.Debug("ExistingMetaData is nil") + } else { + trace.Debug("The new object polled %s %s %s %s, existingMetaData status is %s, existingMetaData.InstanceID: %d, send %s notification to the other side\n", metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID, status, existingMetaData.InstanceID, common.Updated) + } + } - return &ignoredByHandler{} + common.ObjectLocks.Unlock(lockIndex) + // Send ack to prevent resends of this notification + Comm.SendNotificationMessage(common.Updated, metaData.OriginType, metaData.OriginID, metaData.InstanceID, metaData.DataID, + &metaData) + } + return &ignoredByHandler{"Ignore object update"} + } + if trace.IsLogging(logger.DEBUG) { + trace.Debug("notification.InstanceID(%d) < metaData.InstanceID(%d), delete local notification record\n", notification.InstanceID, metaData.InstanceID) } Store.DeleteNotificationRecords(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID, metaData.OriginType, metaData.OriginID) removeNotificationChunksInfo(metaData, metaData.OriginType, metaData.OriginID) notificationDataID = notification.DataID + } else if notification != nil { + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Notification status %s, notificaiton.InstanceID: %d, metadata.InstanceID: %d\n", notification.Status, notification.InstanceID, metaData.InstanceID) + } } + // for receive resend error notification from CSS, the ESS notification status is still "receiverError" + if trace.IsLogging(logger.TRACE) { trace.Trace("Finish process notification, then set status to partiallyReceived of %s %s\n", metaData.ObjectType, metaData.ObjectID) } @@ -364,7 +384,14 @@ func handleUpdate(metaData common.MetaData, maxInflightChunks int) common.SyncSe trace.Debug("existingLastDestinationPolicyServices length: %d\n", len(existingLastDestinationPolicyServices)) } - // Store the object + // check if object is already exist, and if it is created from other side, return with error + _, existingObjStatus, _ := Store.RetrieveObjectAndStatus(metaData.DestOrgID, metaData.ObjectType, metaData.ObjectID) + if existingObjStatus == common.ReadyToSend || existingObjStatus == common.NotReadyToSend { + common.ObjectLocks.Unlock(lockIndex) + return ¬ificationHandlerError{fmt.Sprintf("Error in handleUpdate: cannot update object from the receiver side.\n")} + } + + // Store the object. Now change the receiver status to "PartiallyReceived" or "CompletelyReceived" if _, err := Store.StoreObject(metaData, nil, status); err != nil { common.ObjectLocks.Unlock(lockIndex) return ¬ificationHandlerError{fmt.Sprintf("Error in handleUpdate: failed to store object. Error: %s\n", err)} @@ -413,17 +440,15 @@ func handleUpdate(metaData common.MetaData, maxInflightChunks int) common.SyncSe } + common.ObjectLocks.Unlock(lockIndex) if status == common.CompletelyReceived { notificationsInfo, err := PrepareObjectStatusNotification(metaData, common.Received) - common.ObjectLocks.Unlock(lockIndex) if err != nil { return err } return SendNotifications(notificationsInfo) } - common.ObjectLocks.Unlock(lockIndex) - // Call Notification module to send notification to object’s sender if err := Comm.SendNotificationMessage(common.Updated, metaData.OriginType, metaData.OriginID, metaData.InstanceID, metaData.DataID, &metaData); err != nil { @@ -460,22 +485,27 @@ func handleObjectUpdated(orgID string, objectType string, objectID string, destT common.ObjectLocks.Lock(lockIndex) defer common.ObjectLocks.Unlock(lockIndex) + if trace.IsLogging(logger.DEBUG) { + trace.Trace("In notificationHandler.go, handleObjectUpdated: retrieve notification record for %s %s %s %s %s\n", orgID, objectType, objectID, destType, destID) + } notification, err := Store.RetrieveNotificationRecord(orgID, objectType, objectID, destType, destID) if err != nil || notification == nil { return ¬ificationHandlerError{"Error in handleObjectUpdated: no notification to update."} } if notification.InstanceID != instanceID || (notification.Status != common.Update && notification.Status != common.UpdatePending) { - // This notification doesn't match the existing notification record, ignore + // This notification doesn't match the existing notification record, ignore => object has updates (update meta/put data/delete .....) if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring object updated of %s %s\n", objectType, objectID) + trace.Trace("Ignoring object updated of %s %s %s %s, notification.InstanceID: %d, instanceID: %d, notification status: %s\n", objectType, objectID, destType, destID, notification.InstanceID, instanceID, notification.Status) } - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore object updated"} } + if trace.IsLogging(logger.DEBUG) { + trace.Trace("In notificationHandler.go, handleObjectUpdated: update notification record to updated for %s %s %s %s %s\n", orgID, objectType, objectID, destType, destID) + } Store.UpdateNotificationRecord( common.Notification{ObjectID: objectID, ObjectType: objectType, DestOrgID: orgID, DestID: destID, DestType: destType, Status: common.Updated, InstanceID: instanceID, DataID: dataID}) - return nil } @@ -510,14 +540,15 @@ func handleObjectConsumed(orgID string, objectType string, objectID string, dest // Something went wrong: we can't retrieve the notification or the object, or the received notification doesn't // match the existing notification record if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring object consumed of %s %s\n", objectType, objectID) + trace.Trace("Ignoring object consumed of %s %s %s %s\n", objectType, objectID, destType, destID) } common.ObjectLocks.Unlock(lockIndex) + //common.NotificationLocks.Unlock(lockNotificationIndex) // Send ack to prevent future resends of this notification Comm.SendNotificationMessage(common.AckConsumed, destType, destID, instanceID, dataID, &common.MetaData{ObjectType: objectType, ObjectID: objectID, DestOrgID: orgID, DestType: destType, DestID: destID, OriginType: common.Configuration.DestinationType, OriginID: common.Configuration.DestinationID, InstanceID: instanceID, DataID: dataID}) - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore object consumed"} } if common.Configuration.NodeType == common.ESS { @@ -607,9 +638,9 @@ func handleAckConsumed(orgID string, objectType string, objectID string, destTyp if notification.InstanceID != instanceID || (notification.Status != common.Consumed && notification.Status != common.ConsumedPending) { // This notification doesn't match the existing notification record, ignore if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring ack consumed of %s %s\n", objectType, objectID) + trace.Trace("Ignoring ack consumed of %s %s %s %s\n", objectType, objectID, destType, destID) } - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore Ack consumed"} } // Mark the notification as ackconsumed @@ -651,6 +682,7 @@ func handleObjectReceived(orgID string, objectType string, objectID string, dest common.ObjectLocks.Unlock(lockIndex) return ¬ificationHandlerError{fmt.Sprintf("Error in handleObjectReceived: failed to retrieve notification record. Error: %s\n", err)} } + metaData, err := Store.RetrieveObject(orgID, objectType, objectID) if err != nil { common.ObjectLocks.Unlock(lockIndex) @@ -662,21 +694,44 @@ func handleObjectReceived(orgID string, objectType string, objectID string, dest // Something went wrong: we can't retrieve the notification or the object, or the received notification doesn't // match the existing notification record if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring object received of %s %s\n", objectType, objectID) + trace.Trace("Ignoring object received of %s %s for destination %s %s\n", objectType, objectID, destType, destID) } common.ObjectLocks.Unlock(lockIndex) // Send ack to prevent future resends of this notification - Comm.SendNotificationMessage(common.AckReceived, destType, destID, instanceID, dataID, - &common.MetaData{ObjectType: objectType, ObjectID: objectID, DestOrgID: orgID, DestType: destType, DestID: destID, - OriginType: common.Configuration.DestinationType, OriginID: common.Configuration.DestinationID, InstanceID: instanceID, DataID: dataID}) - return &ignoredByHandler{} + if notification != nil && metaData != nil && notification.InstanceID == instanceID && notification.Status == common.Error { + // dont send ackReceived + if trace.IsLogging(logger.TRACE) { + trace.Trace("Ignoring object received of %s %s for destination %s %s because notification.Status %s is not error\n", objectType, objectID, destType, destID, notification.Status) + } + } else { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Ignoring object received of %s %s for destination %s %s, send %s notification to other side\n", objectType, objectID, destType, destID, common.AckReceived) + } + Comm.SendNotificationMessage(common.AckReceived, destType, destID, instanceID, dataID, + &common.MetaData{ObjectType: objectType, ObjectID: objectID, DestOrgID: orgID, DestType: destType, DestID: destID, + OriginType: common.Configuration.DestinationType, OriginID: common.Configuration.DestinationID, InstanceID: instanceID, DataID: dataID}) + } + + return &ignoredByHandler{"Ignore object received"} } // Mark that the object was delivered to this destination + if trace.IsLogging(logger.DEBUG) { + trace.Debug("Update object status to delivery for %s %s %s %s %s\n", orgID, objectType, objectID, destType, destID) + } _, err = Store.UpdateObjectDeliveryStatus(common.Delivered, "", orgID, objectType, objectID, destType, destID) if err != nil && log.IsLogging(logger.ERROR) { - log.Error("Error in handleObjectReceived: failed to mark object as delivered to the destination. Error: %s\n", err) + log.Error("Error in handleObjectReceived: failed to mark object (%s %s %s) as delivered to the destination(%s %s). Error: %s. Sending destination update request to destRequestQueue", orgID, objectType, objectID, destType, destID, err) + // put this request in queue + destinationUpdateRequestInQueue := common.DestinationRequestInQueue{ + Action: common.Update, + Status: common.Delivered, + Object: *metaData, + Destination: common.Destination{DestType: destType, DestID: destID}, + } + DestReqQueue.SendDestReqToQueue(destinationUpdateRequestInQueue) } + // Mark the corresponding update notification as "received by destination" if err := Store.UpdateNotificationRecord( common.Notification{ObjectID: objectID, ObjectType: objectType, @@ -714,9 +769,9 @@ func handleAckObjectReceived(orgID string, objectType string, objectID string, d if notification.InstanceID != instanceID || (notification.Status != common.Received && notification.Status != common.ReceivedPending) { // This notification doesn't match the existing notification record, ignore if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring ack received of %s %s\n", objectType, objectID) + trace.Trace("Ignoring ack received of %s %s %s %s, notification InstanceID: %d, InstanceID: %d, notification status: %s\n", objectType, objectID, destType, destID, notification.InstanceID, instanceID, notification.Status) } - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore object ack received"} } // Mark the notification as ackreceived @@ -811,9 +866,9 @@ func handleAckDelete(orgID string, objectType string, objectID string, destType (notification.Status != common.Delete && notification.Status != common.DeletePending && notification.Status != common.Deleted) { // This notification doesn't match the existing notification record, ignore if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring ack delete of %s %s\n", objectType, objectID) + trace.Trace("Ignoring ack delete of %s %s %s %s\n", objectType, objectID, destType, destID) } - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore object ack delete"} } // Mark the notification as ackdelete @@ -859,12 +914,13 @@ func handleObjectDeleted(metaData common.MetaData) common.SyncServiceError { // Something went wrong: we can't retrieve the notification or the object, or the received notification doesn't // match the existing notification record if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring object deleted of %s %s\n", metaData.ObjectType, metaData.ObjectID) + trace.Trace("Ignoring object deleted of %s %s %s %s\n", metaData.ObjectType, metaData.ObjectID, metaData.DestType, metaData.DestID) } + common.ObjectLocks.Unlock(lockIndex) // Send ack to prevent future resends of this notification Comm.SendNotificationMessage(common.AckDeleted, metaData.DestType, metaData.DestID, metaData.InstanceID, metaData.DataID, &metaData) - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore object deleted"} } // Delete the notification @@ -902,9 +958,9 @@ func handleAckObjectDeleted(orgID string, objectType string, objectID string, de if notification.InstanceID != instanceID || (notification.Status != common.Deleted && notification.Status != common.DeletedPending) { // This notification doesn't match the existing notification record, ignore if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring ack object deleted of %s %s\n", objectType, objectID) + trace.Trace("Ignoring ack object deleted of %s %s %s %s\n", objectType, objectID, destType, destID) } - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore ack object deleted"} } // Delete the notification @@ -976,12 +1032,12 @@ func handleFeedback(orgID string, objectType string, objectID string, destType s if err != nil || notification == nil { return ¬ificationHandlerError{"Error in handleFeedback: no notification to update."} } - if notification.InstanceID != instanceID { + if notification.InstanceID != instanceID || notification.Status == common.ReceivedByDestination || notification.Status == common.Error { // This notification doesn't match the existing notification record, ignore if trace.IsLogging(logger.TRACE) { - trace.Trace("Ignoring feedback of %s %s\n", objectType, objectID) + trace.Trace("Ignoring feedback of %s %s %s %s, notification.InstanceID: %d, instanceID: %d, notification.Status: %s\n", objectType, objectID, destType, destID, notification.InstanceID, instanceID, notification.Status) } - return &ignoredByHandler{} + return &ignoredByHandler{"Ignore feedback"} } if code == common.InvalidObject { @@ -1002,13 +1058,33 @@ func handleFeedback(orgID string, objectType string, objectID string, destType s status = common.Update resendTime = time.Now().Unix() + int64(retryInterval) } - // Mark the corresponding notification as error + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In handleFeedback: update notification record for %s %s %s %s %s: status: %s", orgID, objectType, objectID, destType, destID, status) + } + + // Mark the corresponding notification as error or update if err := Store.UpdateNotificationRecord( common.Notification{ObjectID: objectID, ObjectType: objectType, DestOrgID: orgID, DestID: destID, DestType: destType, Status: status, InstanceID: instanceID, ResendTime: resendTime, DataID: dataID}, ); err != nil { + if log.IsLogging(logger.ERROR) { + log.Error("Error in handleFeedback: failed to update notification record. Error: %s\n", err) + } return ¬ificationHandlerError{fmt.Sprintf("Error in handleFeedback: failed to update notification record. Error: %s\n", err)} + } else { + // only for debugging + if trace.IsLogging(logger.DEBUG) { + trace.Debug("In handleFeedback, notification status for %s %s %s %s %s should be updated to %s, retrieve notification again for debugging", orgID, objectType, objectID, destType, destID, status) + notification, err = Store.RetrieveNotificationRecord(orgID, objectType, objectID, destType, destID) + if err != nil { + trace.Debug("In handleFeedback, error retrieve notification %s %s %s %s %s for debugging", orgID, objectType, objectID, destType, destID) + } else if notification == nil { + trace.Debug("In handleFeedback, nil notification %s %s %s %s %s for debugging", orgID, objectType, objectID, destType, destID) + } else { + trace.Debug("In handleFeedback, retrieve notification retrieved %s %s %s %s %s for debugging, status is %s", orgID, objectType, objectID, destType, destID, notification.Status) + } + } } } } @@ -1090,8 +1166,9 @@ func handleData(dataMessage []byte) (*common.MetaData, common.SyncServiceError) common.ObjectLocks.Unlock(lockIndex) return metaData, ¬ificationHandlerError{fmt.Sprintf("Error in handleData: %s\n", err)} } - notificationsInfo, err := PrepareObjectStatusNotification(*metaData, common.Received) common.ObjectLocks.Unlock(lockIndex) + notificationsInfo, err := PrepareObjectStatusNotification(*metaData, common.Received) + if err != nil { return metaData, err } @@ -1117,6 +1194,7 @@ func handleData(dataMessage []byte) (*common.MetaData, common.SyncServiceError) return metaData, nil } +// handleGetData for mqtt func handleGetData(metaData common.MetaData, offset int64) common.SyncServiceError { if trace.IsLogging(logger.TRACE) { trace.Trace("Handling data request for %s %s (offset %d)\n", metaData.ObjectType, metaData.ObjectID, offset) diff --git a/core/communications/notificationHandler_test.go b/core/communications/notificationHandler_test.go index aa73573..d380af6 100644 --- a/core/communications/notificationHandler_test.go +++ b/core/communications/notificationHandler_test.go @@ -43,6 +43,10 @@ func TestNotificationHandler(t *testing.T) { } common.Configuration.NodeType = common.CSS + + DestReqQueue = NewDestinationRequestQueue(40) + defer DestReqQueue.Close() + if err := Store.StoreDestination(dest); err != nil { t.Errorf("Failed to store destination. Error: %s", err.Error()) } diff --git a/core/communications/notification_test.go b/core/communications/notification_test.go index a4831eb..c327610 100644 --- a/core/communications/notification_test.go +++ b/core/communications/notification_test.go @@ -256,7 +256,7 @@ func TestActivateObjects(t *testing.T) { t.Errorf("RetrieveObjects returned %d objects instead of 1\n", len(objects)) } - time.Sleep(4 * time.Second) + time.Sleep(3 * time.Second) ActivateObjects() diff --git a/core/communications/objectWorkQueue.go b/core/communications/objectWorkQueue.go new file mode 100644 index 0000000..ed5545e --- /dev/null +++ b/core/communications/objectWorkQueue.go @@ -0,0 +1,93 @@ +package communications + +import ( + "github.com/open-horizon/edge-sync-service/common" + "github.com/open-horizon/edge-utilities/logger" + "github.com/open-horizon/edge-utilities/logger/trace" +) + +type ObjectWorkQueue struct { + objectQueue chan common.ObjectInQueue + bufferSize uint64 +} + +func NewObjectWorkQueue(bufferSize uint64) *ObjectWorkQueue { + q := &ObjectWorkQueue{ + objectQueue: make(chan common.ObjectInQueue, bufferSize), + bufferSize: bufferSize, + } + + go q.run() + return q +} + +func (q *ObjectWorkQueue) run() { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Check object queue to process notifications") + } + for { + select { + case i, ok := <-q.objectQueue: + if ok { + meta := i.Object + if trace.IsLogging(logger.TRACE) { + trace.Trace("Get an object %s/%s/%s from object Queue, NotificationType: %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.NotificationAction) + } + var notificationsInfo []common.NotificationInfo + + switch i.NotificationAction { + case common.Update: + if i.NotificationType == common.TypeObject { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Prepare update notifications for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + notificationsInfo, _ = PrepareObjectNotifications(meta) + } else if i.NotificationType == common.TypeDestination { + if trace.IsLogging(logger.TRACE) { + trace.Trace("For object %s/%s/%s, prepare update destination notifications for destinations %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destinations) + } + if len(i.Destinations) > 0 { + notificationsInfo, _ = PrepareNotificationsForDestinations(meta, i.Destinations, common.Update) + } + } + + case common.Delete: + if i.NotificationType == common.TypeObject { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Prepare delete notifications for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + notificationsInfo, _ = PrepareDeleteNotifications(meta) + } else if i.NotificationType == common.TypeDestination { + if trace.IsLogging(logger.TRACE) { + trace.Trace("For object %s/%s/%s, prepare delete destination notifications for destinations %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, i.Destinations) + } + if len(i.Destinations) > 0 { + notificationsInfo, _ = PrepareNotificationsForDestinations(meta, i.Destinations, common.Delete) + } + } + } + + SendNotifications(notificationsInfo) + + if trace.IsLogging(logger.TRACE) { + trace.Trace("Sent notifications for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID) + } + + } else { + if trace.IsLogging(logger.TRACE) { + trace.Trace("Nothing from object Queue") + } + + } + } + } + +} + +func (q *ObjectWorkQueue) Close() { + close(q.objectQueue) +} + +func (q *ObjectWorkQueue) SendObjectToQueue(objectInQueue common.ObjectInQueue) { + q.objectQueue <- objectInQueue +} diff --git a/core/storage/mongoStorage.go b/core/storage/mongoStorage.go index 65381cc..f6215f4 100644 --- a/core/storage/mongoStorage.go +++ b/core/storage/mongoStorage.go @@ -1739,7 +1739,8 @@ func (store *MongoStorage) RetrieveNotifications(orgID string, destType string, bson.M{"notification.status": common.Consumed}, bson.M{"notification.status": common.Getdata}, bson.M{"notification.status": common.Delete}, - bson.M{"notification.status": common.Deleted}}}}} + bson.M{"notification.status": common.Deleted}, + bson.M{"notification.status": common.Error}}}}} } else { if retrieveReceived { query = bson.M{"$or": []bson.M{ diff --git a/core/storage/storage.go b/core/storage/storage.go index 9e900b6..8328872 100644 --- a/core/storage/storage.go +++ b/core/storage/storage.go @@ -421,7 +421,7 @@ func createDestinationCollectionID(orgID string, destType string, destID string) func resendNotification(notification common.Notification, retrieveReceived bool) bool { s := notification.Status - return (s == common.Update || s == common.Consumed || s == common.Getdata || s == common.Delete || s == common.Deleted || s == common.Received || + return (s == common.Update || s == common.Consumed || s == common.Getdata || s == common.Delete || s == common.Deleted || s == common.Received || s == common.Error || (retrieveReceived && (s == common.Data || s == common.ReceivedByDestination))) } diff --git a/samples/send-receive-files/css-http.conf b/samples/send-receive-files/css-http.conf index ce64e67..0763a10 100644 --- a/samples/send-receive-files/css-http.conf +++ b/samples/send-receive-files/css-http.conf @@ -115,6 +115,11 @@ CommunicationProtocol http # Environment variable: PERSISTENCE_ROOT_PATH PersistenceRootPath ./persist +# Buffer size of Object Queue to send objects for notification handling +# For the CSS, default value is 1000 +# For the ESS, default value is 2 +# ObjectQueueBufferSize + ################################################################################# ### MQTT Communication Settings diff --git a/samples/send-receive-files/ess-http.conf b/samples/send-receive-files/ess-http.conf index f27bd73..a9f1c5b 100644 --- a/samples/send-receive-files/ess-http.conf +++ b/samples/send-receive-files/ess-http.conf @@ -115,6 +115,11 @@ CommunicationProtocol http # Environment variable: PERSISTENCE_ROOT_PATH PersistenceRootPath ./persist +# Buffer size of Object Queue to send objects for notification handling +# For the CSS, default value is 1000 +# For the ESS, default value is 2 +# ObjectQueueBufferSize + ################################################################################# ### MQTT Communication Settings