diff --git a/common/common.go b/common/common.go index cb105ea..289f370 100644 --- a/common/common.go +++ b/common/common.go @@ -118,6 +118,20 @@ func IsIgnoredRequest(err error) bool { return ok } +// TooManyRequestError is the error for too many request +type TooManyRequestError struct { + Message string +} + +func (e *TooManyRequestError) Error() string { + return e.Message +} + +func IsTooManyRequestError(err error) bool { + _, ok := err.(*TooManyRequestError) + return ok +} + // Destination describes a sync service node. // Each sync service edge node (ESS) has an address that is composed of the node's ID, Type, and Organization. // An ESS node communicates with the CSS using either MQTT or HTTP. diff --git a/core/base/apiServer.go b/core/base/apiServer.go index 975f79e..1eae03b 100644 --- a/core/base/apiServer.go +++ b/core/base/apiServer.go @@ -2159,6 +2159,19 @@ func handleObjectGetData(orgID string, objectType string, objectID string, canAc } } + if common.ObjectDownloadSemaphore.TryAcquire(1) == false { + // If too many downloads are in flight, agent will get error and retry. Originally, there was a lock around the download that + // caused the downloads to be serial. It was changed to use a semaphore to allow limited concurrency. + if trace.IsLogging(logger.TRACE) { + trace.Trace("Failed to acquire semaphore for handleObjects of %s %s %s \n", orgID, objectType, objectID) + } + err := &common.TooManyRequestError{Message: "Error in handleObjects: Unable to acquire object semaphore."} + communications.SendErrorResponse(writer, err, "", 0) + return + } + + defer common.ObjectDownloadSemaphore.Release(1) + // Get range from the header "Range:bytes={startOffset}-{endOffset}" var dataReader io.Reader var eof bool diff --git a/core/communications/communicator.go b/core/communications/communicator.go index 444713e..d764c7c 100644 --- a/core/communications/communicator.go +++ b/core/communications/communicator.go @@ -147,6 +147,8 @@ func SendErrorResponse(writer http.ResponseWriter, err error, message string, st statusCode = http.StatusConflict case *common.IgnoredRequest: statusCode = http.StatusTemporaryRedirect + case *common.TooManyRequestError: + statusCode = http.StatusTooManyRequests case *Error: // Don't return an error if it's a communication error statusCode = http.StatusNoContent @@ -198,7 +200,7 @@ func IsTransportError(pResp *http.Response, err error) bool { // 503: service unavailable return true } else if pResp.StatusCode == http.StatusRequestTimeout { - // 408: request time out + // 408: request time out return true } else if pResp.StatusCode == http.StatusTooManyRequests { // 429: too many requests diff --git a/core/communications/httpCommunication.go b/core/communications/httpCommunication.go index 2843294..c3373c7 100644 --- a/core/communications/httpCommunication.go +++ b/core/communications/httpCommunication.go @@ -1687,7 +1687,7 @@ func (communication *HTTP) handleGetData(orgID string, objectType string, object if trace.IsLogging(logger.TRACE) { trace.Trace("Failed to acquire semaphore for handleGetData of %s %s %s %s \n", objectType, objectID, destType, destID) } - err := &Error{"Error in handleGetData: Unable to acquire object semaphore."} + err := &common.TooManyRequestError{Message: "Error in handleGetData: Unable to acquire object semaphore."} SendErrorResponse(writer, err, "", http.StatusTooManyRequests) return }