Skip to content

Commit

Permalink
Merge pull request #106 from LiilyZhang/v1.6
Browse files Browse the repository at this point in the history
V1.6
  • Loading branch information
linggao authored Mar 23, 2022
2 parents 566d61f + 619bb5c commit ae121e4
Show file tree
Hide file tree
Showing 36 changed files with 2,659 additions and 722 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ jobs:
path: go/src/github.com/open-horizon/edge-sync-service

# prepare the environment
- name: Set up golang 1.14.1
- name: Set up golang 1.16
uses: actions/setup-go@v2
with:
go-version: '1.14.1'
go-version: '1.16'

# build
- name: Build
Expand Down
138 changes: 132 additions & 6 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
"crypto/sha256"
"fmt"
"hash"
"net/http"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"
)

// SyncServiceError is a common error type used in the sync service
Expand Down Expand Up @@ -97,6 +102,22 @@ func IsNotFound(err error) bool {
return ok
}

// InvalidRequest is the error for invalid reguests
// swagger:ignore
type IgnoredRequest struct {
Message string
}

func (e *IgnoredRequest) Error() string {
return e.Message
}

// IsInvalidRequest returns true if the error passed in is the common.InvalidRequest error
func IsIgnoredRequest(err error) bool {
_, ok := err.(*IgnoredRequest)
return ok
}

// Destination describes a sync service node.
// Each sync service edge node (ESS) has an address that is composed of the node's ID, Type, and Organization.
// An ESS node communicates with the CSS using either MQTT or HTTP.
Expand Down Expand Up @@ -258,6 +279,84 @@ func GetHash(hashAlgo string) (hash.Hash, crypto.Hash, SyncServiceError) {
}
}

func GetStartAndEndRangeFromRangeHeader(request *http.Request) (int64, int64, SyncServiceError) {
// Get range from the "Range:bytes={startOffset}-{endOffset}"
requestRangeAll := request.Header.Get("Range")
if requestRangeAll == "" {
return -1, -1, nil
}
requestRange := requestRangeAll[6:]
ranges := strings.Split(requestRange, "-")

if len(ranges) != 2 {
return -1, -1, &InvalidRequest{Message: "Failed to parse Range header: " + requestRangeAll}
}

beginOffset, err := strconv.ParseInt(ranges[0], 10, 64)
if err != nil {
return -1, -1, &InvalidRequest{Message: "Failed to get begin offset from Range header: " + err.Error()}
}

endOffset, err := strconv.ParseInt(ranges[1], 10, 64)
if err != nil {
return -1, -1, &InvalidRequest{Message: "Failed to get end offset from Range header: " + err.Error()}
}

if beginOffset > endOffset {
return -1, -1, &InvalidRequest{Message: "Begin offset cannot be greater than end offset"}
}

return beginOffset, endOffset, nil
}

// Content-Range: bytes 1-2/*\
// Returns totalsize, startOffset, endOffset, err
func GetStartAndEndRangeFromContentRangeHeader(request *http.Request) (int64, int64, int64, SyncServiceError) {
// Get range from the "Range:bytes={startOffset}-{endOffset}"
requestContentRange := request.Header.Get("Content-Range")
if requestContentRange == "" {
return 0, -1, -1, nil
}
contentRange := strings.Replace(requestContentRange, "bytes ", "", -1)
// 1-2/30
ranges := strings.Split(contentRange, "/")

if len(ranges) != 2 {
return 0, -1, -1, &InvalidRequest{Message: "Failed to parse Content-Range header: " + requestContentRange}
}
// [1-2, 30]
totalSize, err := strconv.ParseInt(ranges[1], 10, 64)
if err != nil {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get total size from Content-Range header: " + err.Error()}
}

offsets := strings.Split(ranges[0], "-")
if len(offsets) != 2 {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get offsets from Content-Range header: " + requestContentRange}
}

startOffset, err := strconv.ParseInt(offsets[0], 10, 64)
if err != nil {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get start offset from Content-Range header: " + err.Error()}
}

endOffset, err := strconv.ParseInt(offsets[1], 10, 64)
if err != nil {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get end offset from Content-Range header: " + err.Error()}
}

if startOffset > endOffset {
return 0, -1, -1, &InvalidRequest{Message: "Begin offset cannot be greater than end offset"}
}

return totalSize, startOffset, endOffset, nil
}

func GetMMSUploadOwnerHeader(request *http.Request) string {
uploadOwner := request.Header.Get("MMS-Upload-Owner")
return uploadOwner
}

// MetaData is the metadata that identifies and defines the sync service object.
// Every object includes metadata (mandatory) and data (optional). The metadata and data can be updated independently.
// Each sync service node (ESS) has an address that is composed of the node's ID, Type, and Organization.
Expand Down Expand Up @@ -575,6 +674,10 @@ type DestinationRequestInQueue struct {
Destination Destination
}

type ObjectInVerifyQueue struct {
Object MetaData
}

// ACLentry contains ACL information about each user
type ACLentry struct {
Username string
Expand All @@ -584,20 +687,27 @@ type ACLentry struct {

// Object status
const (
// status at sender side
NotReadyToSend = "notReady" // The object is not ready to be sent to the other side
Verifying = "verifying" // The object data is in the process of verification
VerificationFailed = "verificationFailed" // The data verification is failed
ReadyToSend = "ready" // The object is ready to be sent to the other side
PartiallyReceived = "partiallyreceived" // Received the object from the other side, waiting for its data
CompletelyReceived = "completelyReceived" // The object was received completely from the other side
ObjConsumed = "objconsumed" // The object was consumed by the app
ObjDeleted = "objdeleted" // The object was deleted by the other side
ObjReceived = "objreceived" // The object was received by the app
ConsumedByDest = "consumedByDest" // The object was consumed by the other side (ESS only)
// status at receiver side
PartiallyReceived = "partiallyreceived" // Received the object from the other side, waiting for its data
ReceiverVerifying = "receiverVerifying" // The object data at receiver side is in the process of verification
ReceiverVerificationFailed = "receiverVerificationFailed" // The data verification is failed at receiver side
CompletelyReceived = "completelyReceived" // The object was received completely from the other side
ObjConsumed = "objconsumed" // The object was consumed by the app
ObjDeleted = "objdeleted" // The object was deleted by the other side
ObjReceived = "objreceived" // The object was received by the app
ConsumedByDest = "consumedByDest" // The object was consumed by the other side (ESS only)
)

// Notification status and type
const (
Update = "update"
Updated = "updated"
HandleUpdate = "handleUpdate"
Consumed = "consumed"
AckConsumed = "ackconsumed"
ConsumedByDestination = "consumedByDest"
Expand Down Expand Up @@ -776,6 +886,15 @@ func NewLocks(name string) *Locks {
return &locks
}

// ObjectDownloadSemaphore sets the concurrent spi object download concurrency
var ObjectDownloadSemaphore *semaphore.Weighted

// InitObjectDownloadSemaphore initializes ObjectDownloadSemaphore
func InitObjectDownloadSemaphore() {
maxWorkers := runtime.GOMAXPROCS(-1) * Configuration.HTTPCSSObjDownloadConcurrencyMultiplier
ObjectDownloadSemaphore = semaphore.NewWeighted(int64(maxWorkers))
}

// ObjectLocks are locks for object and notification changes
var ObjectLocks Locks

Expand Down Expand Up @@ -942,6 +1061,13 @@ func IsValidHashAlgorithm(hashAlgorithm string) bool {
return false
}

func NeedDataVerification(metaData MetaData) bool {
if IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" {
return true
}
return false
}

// IsValidName checks if the string only contains letters, digits, and !@#%^*-_.~
var IsValidName = regexp.MustCompile(`^[a-zA-Z0-9|!|@|#|$|^|*|\-|_|.|~|\pL|\pN]+$`).MatchString

Expand Down
31 changes: 29 additions & 2 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ type Config struct {
// For the ESS, default value is 2
ObjectQueueBufferSize uint64 `env:"OBJECT_QUEUE_BUFFER_SIZE"`

// Buffer size of Object Queue to verify object data
// Default size is 500
VerifyQueueBufferSize uint64 `env:"VERIFY_QUEUE_BUFFER_SIZE"`

// CommunicationProtocol is a comma separated list of protocols to be used for communication between CSS and ESS
// The elements of the list can be 'http', 'mqtt', and 'wiotp'
// wiotp indicates MQTT communication via the Watson IoT Platform and mqtt indicates direct MQTT communication to a broker
Expand Down Expand Up @@ -204,6 +208,14 @@ type Config struct {
// default is 120s
HTTPESSClientTimeout int `env:"HTTPESSClientTimeout"`

// HTTPESSObjClientTimeout is to specify the http client timeout for downloading models (or objects) in seconds for ESS
// default is 600s
HTTPESSObjClientTimeout int `env:"HTTPESSObjClientTimeout"`

// HTTPCSSObjDownloadConcurrencyMultiplier specifies a number to multiple the number of threads by to set allowed concurrent downloads per CSS
// default is 1
HTTPCSSObjDownloadConcurrencyMultiplier int `env:"HTTPCSSObjDownloadConcurrencyMultiplier"`

// LogLevel specifies the logging level in string format
LogLevel string `env:"LOG_LEVEL"`

Expand Down Expand Up @@ -257,6 +269,10 @@ type Config struct {
// A value of zero means ESSs are never removed
RemoveESSRegistrationTime int16 `env:"REMOVE_ESS_REGISTRATION_TIME"`

// EnableDataChunk specifies whether or not to transfer data in chunks between CSS and ESS
// It is always true for MQTT
EnableDataChunk bool `env:"ENABLE_DATA_CHUNK"`

// Maximum size of data that can be sent in one message
MaxDataChunkSize int `env:"MAX_DATA_CHUNK_SIZE"`

Expand Down Expand Up @@ -493,6 +509,7 @@ func ValidateConfig() error {
}
if mqtt {
Configuration.CommunicationProtocol = MQTTProtocol
Configuration.EnableDataChunk = true
} else if wiotp {
Configuration.CommunicationProtocol = WIoTP
} else {
Expand All @@ -505,6 +522,7 @@ func ValidateConfig() error {
if http {
if mqtt {
Configuration.CommunicationProtocol = HybridMQTT
Configuration.EnableDataChunk = true
} else if wiotp {
Configuration.CommunicationProtocol = HybridWIoTP
} else {
Expand All @@ -513,6 +531,7 @@ func ValidateConfig() error {
} else {
if mqtt {
Configuration.CommunicationProtocol = MQTTProtocol
Configuration.EnableDataChunk = true
} else if wiotp {
Configuration.CommunicationProtocol = WIoTP
}
Expand Down Expand Up @@ -675,6 +694,10 @@ func ValidateConfig() error {
}
}

if Configuration.VerifyQueueBufferSize == 0 {
Configuration.VerifyQueueBufferSize = 500
}

return nil
}

Expand All @@ -689,7 +712,8 @@ func SetDefaultConfig(config *Config) {
config.ListeningAddress = ""
config.SecureListeningPort = 8443
config.UnsecureListeningPort = 8080
config.LeadershipTimeout = 30
config.LeadershipTimeout = 45
config.VerifyQueueBufferSize = 500
config.AuthenticationHandler = "dummy"
config.CSSOnWIoTP = false
config.UsingEdgeConnector = false
Expand All @@ -713,7 +737,8 @@ func SetDefaultConfig(config *Config) {
config.ESSCallSPIRetryInterval = 2
config.ESSPingInterval = 1
config.RemoveESSRegistrationTime = 30
config.MaxDataChunkSize = 120 * 1024
config.EnableDataChunk = true
config.MaxDataChunkSize = 5120 * 1024
config.MaxInflightChunks = 1
config.MongoAddressCsv = "localhost:27017"
config.MongoDbName = "d_edge"
Expand All @@ -733,6 +758,8 @@ func SetDefaultConfig(config *Config) {
config.HTTPCSSUseSSL = false
config.HTTPCSSCACertificate = ""
config.HTTPESSClientTimeout = 120
config.HTTPESSObjClientTimeout = 600
config.HTTPCSSObjDownloadConcurrencyMultiplier = 1
config.MessagingGroupCacheExpiration = 60
config.ShutdownQuiesceTime = 60
config.ESSConsumedObjectsKept = 1000
Expand Down
Loading

0 comments on commit ae121e4

Please sign in to comment.