Skip to content

Commit

Permalink
Issue2543 & Issue91 - Implementing chunk data transfer (upload/downl…
Browse files Browse the repository at this point in the history
…oad) and data transfer resuming featrue for sync service using http protocal

Signed-off-by: zhangl <zhangl@us.ibm.com>
  • Loading branch information
LiilyZhang committed Jan 10, 2022
1 parent c649cf7 commit 03ac552
Show file tree
Hide file tree
Showing 27 changed files with 2,057 additions and 486 deletions.
99 changes: 99 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
"crypto/sha256"
"fmt"
"hash"
"net/http"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"
)

// SyncServiceError is a common error type used in the sync service
Expand Down Expand Up @@ -258,6 +263,79 @@ func GetHash(hashAlgo string) (hash.Hash, crypto.Hash, SyncServiceError) {
}
}

func GetStartAndEndRangeFromRangeHeader(request *http.Request) (int64, int64, SyncServiceError) {
// Get range from the "Range:bytes={startOffset}-{endOffset}"
requestRangeAll := request.Header.Get("Range")
if requestRangeAll == "" {
return -1, -1, nil
}
requestRange := requestRangeAll[6:]
ranges := strings.Split(requestRange, "-")

if len(ranges) != 2 {
return -1, -1, &InvalidRequest{Message: "Failed to parse Range header: " + requestRangeAll}
}

beginOffset, err := strconv.ParseInt(ranges[0], 10, 64)
if err != nil {
return -1, -1, &InvalidRequest{Message: "Failed to get begin offset from Range header: " + err.Error()}
}

endOffset, err := strconv.ParseInt(ranges[1], 10, 64)
if err != nil {
return -1, -1, &InvalidRequest{Message: "Failed to get end offset from Range header: " + err.Error()}
}

if beginOffset > endOffset {
return -1, -1, &InvalidRequest{Message: "Begin offset cannot be greater than end offset"}
}

return beginOffset, endOffset, nil
}

// Content-Range: bytes 1-2/*\
// Returns totalsize, startOffset, endOffset, err
func GetStartAndEndRangeFromContentRangeHeader(request *http.Request) (int64, int64, int64, SyncServiceError) {
// Get range from the "Range:bytes={startOffset}-{endOffset}"
requestContentRange := request.Header.Get("Content-Range")
if requestContentRange == "" {
return 0, -1, -1, nil
}
contentRange := strings.Replace(requestContentRange, "bytes ", "", -1)
// 1-2/30
ranges := strings.Split(contentRange, "/")

if len(ranges) != 2 {
return 0, -1, -1, &InvalidRequest{Message: "Failed to parse Content-Range header: " + requestContentRange}
}
// [1-2, 30]
totalSize, err := strconv.ParseInt(ranges[1], 10, 64)
if err != nil {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get total size from Content-Range header: " + err.Error()}
}

offsets := strings.Split(ranges[0], "-")
if len(offsets) != 2 {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get offsets from Content-Range header: " + requestContentRange}
}

startOffset, err := strconv.ParseInt(offsets[0], 10, 64)
if err != nil {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get start offset from Content-Range header: " + err.Error()}
}

endOffset, err := strconv.ParseInt(offsets[1], 10, 64)
if err != nil {
return 0, -1, -1, &InvalidRequest{Message: "Failed to get end offset from Content-Range header: " + err.Error()}
}

if startOffset > endOffset {
return 0, -1, -1, &InvalidRequest{Message: "Begin offset cannot be greater than end offset"}
}

return totalSize, startOffset, endOffset, nil
}

// MetaData is the metadata that identifies and defines the sync service object.
// Every object includes metadata (mandatory) and data (optional). The metadata and data can be updated independently.
// Each sync service node (ESS) has an address that is composed of the node's ID, Type, and Organization.
Expand Down Expand Up @@ -403,6 +481,10 @@ type MetaData struct {
// Optional field, default is false (not visiable to all users)
Public bool `json:"public" bson:"public"`

// DataVerified is an internal field set by ESS after ESS downloads data from CSS or by CSS after ESS uploads data
// Data can be obtained only when DataVerified field is true
DataVerified bool `json:"dataVerified" bson:"data-verified"`

// OwnerID is an internal field indicating who creates the object
// This field should not be set by users
OwnerID string `json:"ownerID" bson:"owner-id"`
Expand Down Expand Up @@ -598,6 +680,7 @@ const (
const (
Update = "update"
Updated = "updated"
HandleUpdate = "handleUpdate"
Consumed = "consumed"
AckConsumed = "ackconsumed"
ConsumedByDestination = "consumedByDest"
Expand Down Expand Up @@ -776,6 +859,15 @@ func NewLocks(name string) *Locks {
return &locks
}

// ObjectDownloadSemaphore sets the concurrent spi object download concurrency
var ObjectDownloadSemaphore *semaphore.Weighted

// InitObjectDownloadSemaphore initializes ObjectDownloadSemaphore
func InitObjectDownloadSemaphore() {
maxWorkers := runtime.GOMAXPROCS(-1) * Configuration.HTTPCSSObjDownloadConcurrencyMultiplier
ObjectDownloadSemaphore = semaphore.NewWeighted(int64(maxWorkers))
}

// ObjectLocks are locks for object and notification changes
var ObjectLocks Locks

Expand Down Expand Up @@ -942,6 +1034,13 @@ func IsValidHashAlgorithm(hashAlgorithm string) bool {
return false
}

func NeedDataVerification(metaData MetaData) bool {
if IsValidHashAlgorithm(metaData.HashAlgorithm) && metaData.PublicKey != "" && metaData.Signature != "" {
return true
}
return false
}

// IsValidName checks if the string only contains letters, digits, and !@#%^*-_.~
var IsValidName = regexp.MustCompile(`^[a-zA-Z0-9|!|@|#|$|^|*|\-|_|.|~|\pL|\pN]+$`).MatchString

Expand Down
20 changes: 19 additions & 1 deletion common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ type Config struct {
// default is 120s
HTTPESSClientTimeout int `env:"HTTPESSClientTimeout"`

// HTTPESSObjClientTimeout is to specify the http client timeout for downloading models (or objects) in seconds for ESS
// default is 600s
HTTPESSObjClientTimeout int `env:"HTTPESSObjClientTimeout"`

// HTTPCSSObjDownloadConcurrencyMultiplier specifies a number to multiple the number of threads by to set allowed concurrent downloads per CSS
// default is 1
HTTPCSSObjDownloadConcurrencyMultiplier int `env:"HTTPCSSObjDownloadConcurrencyMultiplier"`

// LogLevel specifies the logging level in string format
LogLevel string `env:"LOG_LEVEL"`

Expand Down Expand Up @@ -257,6 +265,10 @@ type Config struct {
// A value of zero means ESSs are never removed
RemoveESSRegistrationTime int16 `env:"REMOVE_ESS_REGISTRATION_TIME"`

// EnableDataChunk specifies whether or not to transfer data in chunks between CSS and ESS
// It is always true for MQTT
EnableDataChunk bool `env:"ENABLE_DATA_CHUNK"`

// Maximum size of data that can be sent in one message
MaxDataChunkSize int `env:"MAX_DATA_CHUNK_SIZE"`

Expand Down Expand Up @@ -493,6 +505,7 @@ func ValidateConfig() error {
}
if mqtt {
Configuration.CommunicationProtocol = MQTTProtocol
Configuration.EnableDataChunk = true
} else if wiotp {
Configuration.CommunicationProtocol = WIoTP
} else {
Expand All @@ -505,6 +518,7 @@ func ValidateConfig() error {
if http {
if mqtt {
Configuration.CommunicationProtocol = HybridMQTT
Configuration.EnableDataChunk = true
} else if wiotp {
Configuration.CommunicationProtocol = HybridWIoTP
} else {
Expand All @@ -513,6 +527,7 @@ func ValidateConfig() error {
} else {
if mqtt {
Configuration.CommunicationProtocol = MQTTProtocol
Configuration.EnableDataChunk = true
} else if wiotp {
Configuration.CommunicationProtocol = WIoTP
}
Expand Down Expand Up @@ -713,7 +728,8 @@ func SetDefaultConfig(config *Config) {
config.ESSCallSPIRetryInterval = 2
config.ESSPingInterval = 1
config.RemoveESSRegistrationTime = 30
config.MaxDataChunkSize = 120 * 1024
config.EnableDataChunk = true
config.MaxDataChunkSize = 5120 * 1024
config.MaxInflightChunks = 1
config.MongoAddressCsv = "localhost:27017"
config.MongoDbName = "d_edge"
Expand All @@ -733,6 +749,8 @@ func SetDefaultConfig(config *Config) {
config.HTTPCSSUseSSL = false
config.HTTPCSSCACertificate = ""
config.HTTPESSClientTimeout = 120
config.HTTPESSObjClientTimeout = 600
config.HTTPCSSObjDownloadConcurrencyMultiplier = 1
config.MessagingGroupCacheExpiration = 60
config.ShutdownQuiesceTime = 60
config.ESSConsumedObjectsKept = 1000
Expand Down
Loading

0 comments on commit 03ac552

Please sign in to comment.