diff --git a/.travis.yml b/.travis.yml index 64cb671..bb6791a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,7 @@ before_install: install: - go get -t ./... + - go get github.com/mattn/goveralls script: - make test diff --git a/Makefile b/Makefile index cd89597..8deb0d9 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,13 @@ test_local: test_codecov: ./coveralls.sh +test_loop: + for i in $$( seq 100 ); \ + do echo "******* Run $$i"; echo; \ + go test -v -short -p=1 -run Agg -count 10 ./services/ > run.log || \ + ( cat run.log; exit 1 ) || exit 1; \ + done + test: test_fmt test_lint test_codecov -local: test_fmt test_lint test_local \ No newline at end of file +local: test_fmt test_lint test_local diff --git a/README.md b/README.md index 7c337b6..8b66c14 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -[![Build Status](https://travis-ci.org/lca1/medco-unlynx.svg?branch=master)](https://travis-ci.org/lca1/medco-unlynx) -[![Go Report Card](https://goreportcard.com/badge/github.com/lca1/medco-unlynx)](https://goreportcard.com/report/github.com/lca1/medco-unlynx) -[![Coverage Status](https://coveralls.io/repos/github/lca1/medco-unlynx/badge.svg?branch=master)](https://coveralls.io/github/lca1/medco-unlynx?branch=master) +[![Build Status](https://travis-ci.org/ldsec/medco-unlynx.svg?branch=master)](https://travis-ci.org/ldsec/medco-unlynx) +[![Go Report Card](https://goreportcard.com/badge/github.com/ldsec/medco-unlynx)](https://goreportcard.com/report/github.com/ldsec/medco-unlynx) +[![Coverage Status](https://coveralls.io/repos/github/ldsec/medco-unlynx/badge.svg?branch=master)](https://coveralls.io/github/ldsec/medco-unlynx?branch=master) ## Documentation MedCo documentation is centralized on the following website: diff --git a/app/data_decryption.go b/app/data_decryption.go index cb247c9..f11b119 100644 --- a/app/data_decryption.go +++ b/app/data_decryption.go @@ -2,9 +2,9 @@ package main import ( "errors" - "github.com/lca1/unlynx/lib" + "github.com/ldsec/unlynx/lib" + "github.com/urfave/cli" "go.dedis.ch/onet/v3/log" - "gopkg.in/urfave/cli.v1" "io" "os" "strconv" diff --git a/app/data_encryption.go b/app/data_encryption.go index 8a6dded..4d2ee5a 100644 --- a/app/data_encryption.go +++ b/app/data_encryption.go @@ -2,10 +2,10 @@ package main import ( "errors" - "github.com/lca1/unlynx/lib" + "github.com/ldsec/unlynx/lib" + "github.com/urfave/cli" "go.dedis.ch/onet/v3/app" "go.dedis.ch/onet/v3/log" - "gopkg.in/urfave/cli.v1" "io" "os" "strconv" diff --git a/app/generate_tagging_secrets.go b/app/generate_tagging_secrets.go index dad20c3..004905c 100644 --- a/app/generate_tagging_secrets.go +++ b/app/generate_tagging_secrets.go @@ -2,12 +2,12 @@ package main import ( "errors" - "github.com/lca1/medco-unlynx/services" - libunlynx "github.com/lca1/unlynx/lib" + "github.com/ldsec/medco-unlynx/services" + libunlynx "github.com/ldsec/unlynx/lib" + "github.com/urfave/cli" "go.dedis.ch/kyber/v3" "go.dedis.ch/onet/v3/app" "go.dedis.ch/onet/v3/log" - "gopkg.in/urfave/cli.v1" "os" "path" "strconv" diff --git a/app/get_aggregate_key.go b/app/get_aggregate_key.go index 53ae7c8..885ce95 100644 --- a/app/get_aggregate_key.go +++ b/app/get_aggregate_key.go @@ -3,9 +3,9 @@ package main import ( "encoding/base64" "errors" + "github.com/urfave/cli" "go.dedis.ch/onet/v3/app" "go.dedis.ch/onet/v3/log" - "gopkg.in/urfave/cli.v1" "os" "path" ) diff --git a/app/key_generation.go b/app/key_generation.go index 0e20feb..b7b2780 100644 --- a/app/key_generation.go +++ b/app/key_generation.go @@ -2,9 +2,9 @@ package main import ( "errors" - "github.com/lca1/unlynx/lib" + "github.com/ldsec/unlynx/lib" + "github.com/urfave/cli" "go.dedis.ch/onet/v3/log" - "gopkg.in/urfave/cli.v1" "io" "os" ) diff --git a/app/mapping_table_generation.go b/app/mapping_table_generation.go new file mode 100644 index 0000000..269d7f3 --- /dev/null +++ b/app/mapping_table_generation.go @@ -0,0 +1,109 @@ +package main + +import ( + "errors" + "github.com/urfave/cli" + "go.dedis.ch/kyber/v3" + "go.dedis.ch/kyber/v3/suites" + "go.dedis.ch/onet/v3/log" + "os" + "strconv" +) + +func mappingTableGenFromApp(c *cli.Context) error { + + var PointToInt = make(map[string]int64, 0) + var suite = suites.MustFind("Ed25519") + + // cli arguments + outputFile := c.String("outputFile") // mandatory + outputFormat := c.String("outputFormat") // typescript + nbMappings := c.Int64("nbMappings") // optional default to 1000 + checkNeg := c.Bool("checkNeg") // optional default to false + + var Bi kyber.Point + B := suite.Point().Base() + var m int64 + + // generate mapping in memory + for Bi, m = suite.Point().Null(), 0; m < nbMappings; Bi, m = Bi.Add(Bi, B), m+1 { + PointToInt[Bi.String()] = m + + if checkNeg { + neg := suite.Point().Mul(suite.Scalar().SetInt64(int64(-m)), B) + PointToInt[neg.String()] = -m + } + } + + // open file + file, err := os.Create(outputFile) + if err != nil { + return cli.NewExitError(err, 1) + } + defer file.Close() + + // write mapping to disk + switch outputFormat { + case "typescript": + err = writeMapToTSFile(file, PointToInt) + case "go": + err = writeMapToGoFile(file, PointToInt) + default: + err = errors.New("format selected is incorrect: " + outputFormat) + } + + if err != nil { + return cli.NewExitError(err, 2) + } + + log.Info("Successfully generated mapping file with ", nbMappings, " mappings to ", outputFile) + return nil +} + +func writeMapToTSFile(file *os.File, pointToInt map[string]int64) (err error) { + //export let PointToInt: Record = { + // "edc876d6831fd2105d0b4389ca2e283166469289146e2ce06faefe98b22548df": 5, + // "f47e49f9d07ad2c1606b4d94067c41f9777d4ffda709b71da1d88628fce34d85": 6, + //} + + _, err = file.WriteString("export let PointToInt: Record = {\n") + if err != nil { + return + } + for k, v := range pointToInt { + _, err = file.WriteString("\t" + `"` + k + `": ` + strconv.FormatInt(v, 10) + ",\n") + if err != nil { + return + } + } + _, err = file.WriteString("};") + if err != nil { + return + } + return +} + +func writeMapToGoFile(file *os.File, pointToInt map[string]int64) (err error) { + //package main + //var PointToInt = map[string]int64{ + // "00022ddff3737fda59ef096dae2ea2876a5893510442fde25cb37486ed8b97c3": 7414, + //} + + _, err = file.WriteString("package main \nvar PointToInt = map[string]int64{\n") + if err != nil { + return + } + + for k, v := range pointToInt { + _, err = file.WriteString("\t" + `"` + k + `": ` + strconv.FormatInt(v, 10) + ",\n") + if err != nil { + return + } + } + + _, err = file.WriteString("}") + if err != nil { + return + } + return +} diff --git a/app/medco.go b/app/medco.go index 569c047..7b3e160 100644 --- a/app/medco.go +++ b/app/medco.go @@ -4,11 +4,11 @@ import ( "errors" "os" - "github.com/lca1/unlynx/lib" + "github.com/ldsec/unlynx/lib" + "github.com/urfave/cli" "go.dedis.ch/onet/v3/app" "go.dedis.ch/onet/v3/log" "go.dedis.ch/onet/v3/network" - "gopkg.in/urfave/cli.v1" ) const ( @@ -24,6 +24,9 @@ const ( optionConfig = "config" optionConfigShort = "c" + optionTimeout = "timeout" + optionTimeoutShort = "t" + optionGroupFile = "file" optionGroupFileShort = "f" @@ -92,11 +95,42 @@ func main() { }, } + mappingTableGenFlags := []cli.Flag{ + cli.StringFlag{ + Name: "outputFile", + Usage: "Path to the file that will be generated.", + Required: true, + }, + cli.StringFlag{ + Name: "outputFormat", + Usage: "Format of the output file. Value: go|typescript. Default: typescript.", + Required: false, + Value: "typescript", + }, + cli.Int64Flag{ + Name: "nbMappings", + Usage: "Number of mappings to generate. Default: 1000.", + Required: false, + Value: 1000, + }, + cli.BoolFlag{ + Name: "checkNeg", + Usage: "Whether to check for negative values. Default: false.", + Required: false, + }, + } + serverFlags := []cli.Flag{ cli.StringFlag{ Name: optionConfig + ", " + optionConfigShort, Usage: "Configuration file of the server", }, + cli.Int64Flag{ + Name: optionTimeout + ", " + optionTimeoutShort, + Usage: "Communication timeout (in minutes)", + Required: false, + Value: 20, + }, } nonInteractiveSetupFlags := []cli.Flag{ @@ -172,6 +206,16 @@ func main() { }, // CLIENT END: KEY GENERATION ------------ + // BEGIN CLIENT: MAPPING TABLE GENERATION ---------- + { + Name: "mappingtablegen", + Aliases: []string{"m"}, + Usage: "Generate a point-integer mapping table.", + Action: mappingTableGenFromApp, + Flags: mappingTableGenFlags, + }, + // CLIENT END: MAPPING TABLE GENERATION ------------ + // BEGIN SERVER -------- { Name: "server", diff --git a/app/non_interactive_setup.go b/app/non_interactive_setup.go index a2d0bcf..ec6add3 100644 --- a/app/non_interactive_setup.go +++ b/app/non_interactive_setup.go @@ -2,13 +2,13 @@ package main import ( "errors" - "github.com/lca1/unlynx/lib" + "github.com/ldsec/unlynx/lib" + "github.com/urfave/cli" "go.dedis.ch/kyber/v3/util/encoding" "go.dedis.ch/kyber/v3/util/key" "go.dedis.ch/onet/v3/app" "go.dedis.ch/onet/v3/log" "go.dedis.ch/onet/v3/network" - "gopkg.in/urfave/cli.v1" ) // NonInteractiveSetup is used to setup the cothority node for unlynx in a non-interactive way (and without error checks) diff --git a/app/server.go b/app/server.go index 20a8cd4..9f144ca 100644 --- a/app/server.go +++ b/app/server.go @@ -1,20 +1,24 @@ package main import ( - "gopkg.in/urfave/cli.v1" + servicesmedco "github.com/ldsec/medco-unlynx/services" + "time" // Empty imports to have the init-functions called which should // register the protocol - _ "github.com/lca1/medco-unlynx/services" - _ "github.com/lca1/unlynx/protocols" + _ "github.com/ldsec/medco-unlynx/services" + _ "github.com/ldsec/unlynx/protocols" + "github.com/urfave/cli" "go.dedis.ch/onet/v3/app" ) func runServer(ctx *cli.Context) error { // first check the options config := ctx.String("config") + timeout := ctx.Int64("timeout") app.RunServer(config) + servicesmedco.TimeoutService = time.Duration(timeout) * time.Minute return nil } diff --git a/coveralls.sh b/coveralls.sh index 73d825d..8774868 100755 --- a/coveralls.sh +++ b/coveralls.sh @@ -15,7 +15,7 @@ echo "mode: atomic" > profile.cov for dir in ${DIR_SOURCE}; do echo ${dir} if ! echo ${DIR_EXCLUDE} | grep -q ${dir}; then - go test -short -p=1 -covermode=atomic -coverprofile=${dir}/profile.tmp ${dir} + go test -v -short -p=1 -covermode=atomic -coverprofile=${dir}/profile.tmp ${dir} if [ $? -ne 0 ]; then all_tests_passed=false diff --git a/deployment/Dockerfile b/deployment/Dockerfile index 0e6d984..0264f3f 100644 --- a/deployment/Dockerfile +++ b/deployment/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.12 as build +FROM golang:1.13 as build COPY ./ /src WORKDIR /src @@ -21,7 +21,7 @@ RUN CGO_ENABLED=0 go build -v ./... && \ mv /go/bin/app /go/bin/medco-unlynx # ------------------------------------------- -FROM golang:1.12-alpine as release +FROM golang:1.13-alpine as release # run time environment variables ENV NODE_IDX="0" \ diff --git a/go.mod b/go.mod index 83ca80a..a628a9e 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,23 @@ -module github.com/lca1/medco-unlynx +module github.com/ldsec/medco-unlynx + +go 1.13 require ( github.com/BurntSushi/toml v0.3.1 github.com/btcsuite/goleveldb v1.0.0 + github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/fanliao/go-concurrentMap v0.0.0-20141114143905-7d2d7a5ea67b - github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect - github.com/lca1/unlynx v1.3.1 - github.com/satori/go.uuid v1.2.0 - github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 // indirect - github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect - github.com/stretchr/testify v1.3.0 - go.dedis.ch/kyber/v3 v3.0.3 - go.dedis.ch/onet/v3 v3.0.14 - golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f // indirect - golang.org/x/net v0.0.0-20190522155817-f3200d17e092 // indirect - golang.org/x/sys v0.0.0-20190528183647-3626398d7749 // indirect - golang.org/x/text v0.3.2 // indirect - gopkg.in/urfave/cli.v1 v1.20.0 + github.com/gorilla/websocket v1.4.1 // indirect + github.com/ldsec/unlynx v1.4.0 + github.com/smartystreets/goconvey v1.6.4 // indirect + github.com/stretchr/testify v1.4.0 + github.com/urfave/cli v1.22.2 + go.dedis.ch/kyber/v3 v3.0.12 + go.dedis.ch/onet/v3 v3.1.0 + golang.org/x/crypto v0.0.0-20200210222208-86ce3cb69678 // indirect + golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 // indirect + golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 // indirect + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 + gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + gopkg.in/yaml.v2 v2.2.7 // indirect ) diff --git a/go.sum b/go.sum index 32a770d..327ed88 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,13 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= -github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/btcsuite/goleveldb v1.0.0 h1:Tvd0BfvqX9o823q1j2UZ/epQo09eJh6dTcRp79ilIN4= github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I= github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= +github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -22,10 +23,10 @@ github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e h1:KhcknUwkWHKZ github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c h1:7lF+Vz0LqiRidnzC1Oq86fpX1q/iEv2KJdrCtttYjT4= -github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -33,8 +34,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lca1/unlynx v1.3.1 h1:l/sLi4DGL9Rlvk6ySzR13VYjgbubY1tMHPyXGvd2zZU= -github.com/lca1/unlynx v1.3.1/go.mod h1:HOR8K2JLrylJjIDdMdFev5rE5q6XGifs0OnH7rvreqY= +github.com/ldsec/unlynx v1.4.0 h1:z4HIW4fPhvmext9ZyOQvXJvCTSw0wqPXoH/Q2541kfU= +github.com/ldsec/unlynx v1.4.0/go.mod h1:0UCnU0SmdPDjU5SDAhv7/YoEnmznwwShKI2SrelAYO4= github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -42,74 +43,87 @@ github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/r0fls/gostats v0.0.0-20180711082619-e793b1fda35c/go.mod h1:2mJY7Hx2k1GaMAmiAoyy090oY3RTSk3kkaaTieLq7wc= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 h1:hBSHahWMEgzwRyS6dRpxY0XyjZsHyQ61s084wo5PJe0= -github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= -github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= +github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/urfave/cli v1.22.2 h1:gsqYFH8bb9ekPA12kRo0hfjngWQjkJPlN9R0N78BoUo= +github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw= -go.dedis.ch/kyber/v3 v3.0.0-pre2/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= -go.dedis.ch/kyber/v3 v3.0.0 h1:XuefPFGJKPyfPBD6kXctbLb4smT9Il5HmUn303JRr08= -go.dedis.ch/kyber/v3 v3.0.0/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= -go.dedis.ch/kyber/v3 v3.0.2 h1:dhYLJksmOau7TYf1JS0iTpW6Bus+mtqxJBbM0Q/E9HU= -go.dedis.ch/kyber/v3 v3.0.2/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= -go.dedis.ch/kyber/v3 v3.0.3 h1:XdqCfDQSuSJli/KhO5jnUFfcr8GWfMimwnwScOH5Lg8= -go.dedis.ch/kyber/v3 v3.0.3/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= -go.dedis.ch/onet/v3 v3.0.0 h1:haBqkwkNNu/jHz7+PoZJS+xbtA5JheQvt0SSIjGyk5I= -go.dedis.ch/onet/v3 v3.0.0/go.mod h1:xqmP2+NvxeNzgmNj/4hf56EZm3KT0Qksz98miZw5G3A= -go.dedis.ch/onet/v3 v3.0.14 h1:KvYzHbhb74WufinPXoeICjD4/pJ4Jduw9r0enj/YzQI= -go.dedis.ch/onet/v3 v3.0.14/go.mod h1:8zNbWaMMpTiBfa8gftpoe726sivOF1ymxqkiPuvpdFA= +go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= +go.dedis.ch/kyber/v3 v3.0.5 h1:BpjX6vY1R3b7TnJ0mUnCFRVXEJThSAj1zQzmNh4v+70= +go.dedis.ch/kyber/v3 v3.0.5/go.mod h1:V1z0JihG9+dUEUCKLI9j9tjnlIflBw3wx8UOg0g3Pnk= +go.dedis.ch/kyber/v3 v3.0.9/go.mod h1:rhNjUUg6ahf8HEg5HUvVBYoWY4boAafX8tYxX+PS+qg= +go.dedis.ch/kyber/v3 v3.0.12 h1:15d61EyBcBoFIS97kS2c/Vz4o3FR8ALnZ2ck9J/ebYM= +go.dedis.ch/kyber/v3 v3.0.12/go.mod h1:kXy7p3STAurkADD+/aZcsznZGKVHEqbtmdIzvPfrs1U= +go.dedis.ch/onet/v3 v3.0.24 h1:DjPcMDgQgQdxi6Z6vHt3BSuKtZioDUHpIUEC9wQ9NmI= +go.dedis.ch/onet/v3 v3.0.24/go.mod h1:JhOZn9nJgpvxQWiY7Uebjj1AdXTW0ksQyq8RocRhwPk= +go.dedis.ch/onet/v3 v3.1.0 h1:KwofJGuw9T+051ejuWWxA6K1N6blNvhuwz5neasWg8Y= +go.dedis.ch/onet/v3 v3.1.0/go.mod h1:dDV7bRKbT3LJ7enHBcys2O0jXEY2jTUB4dq8HtekGbA= go.dedis.ch/protobuf v1.0.5/go.mod h1:eIV4wicvi6JK0q/QnfIEGeSFNG0ZeB24kzut5+HaRLo= -go.dedis.ch/protobuf v1.0.6 h1:E61p2XjYbYrTf3WeXE8M8Ui5WA3hX/NgbHHi5D0FLxI= -go.dedis.ch/protobuf v1.0.6/go.mod h1:YHYXW6dQ9p2iJ3f+2fxKnOpjGx0MvL4cwpg1RVNXaV8= -go.etcd.io/bbolt v1.3.0 h1:oY10fI923Q5pVCVt1GBTZMn8LHo5M+RCInFpeMnV4QI= -go.etcd.io/bbolt v1.3.0/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= -go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= -go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.dedis.ch/protobuf v1.0.7/go.mod h1:pv5ysfkDX/EawiPqcW3ikOxsL5t+BqnV6xHSmE79KI4= +go.dedis.ch/protobuf v1.0.8 h1:lmyHigYqVxoTN1V0adoGPvqSdjycAMK0XmTFjP893mA= +go.dedis.ch/protobuf v1.0.8/go.mod h1:pv5ysfkDX/EawiPqcW3ikOxsL5t+BqnV6xHSmE79KI4= +go.dedis.ch/protobuf v1.0.11 h1:FTYVIEzY/bfl37lu3pR4lIj+F9Vp1jE8oh91VmxKgLo= +go.dedis.ch/protobuf v1.0.11/go.mod h1:97QR256dnkimeNdfmURz0wAMNVbd1VmLXhG1CrTYrJ4= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b h1:Elez2XeF2p9uyVj0yEUDqQ56NFcDtcBNkYP7yv8YbUE= golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo= -golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200210222208-86ce3cb69678 h1:wCWoJcFExDgyYx2m2hpHgwz8W3+FPdfldvIgzqDIhyg= +golang.org/x/crypto v0.0.0-20200210222208-86ce3cb69678/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco= -golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 h1:e6HwijUxhDe+hPNjZQQn9bA5PW3vNmnN64U2ZW759Lk= +golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sys v0.0.0-20190124100055-b90733256f2e h1:3GIlrlVLfkoipSReOMNAgApI0ajnalyLa/EZHHca/XI= golang.org/x/sys v0.0.0-20190124100055-b90733256f2e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190528183647-3626398d7749 h1:oG2HS+e2B9VqK95y67B5MgJIJhOPY27/m5uJKJhHzus= -golang.org/x/sys v0.0.0-20190528183647-3626398d7749/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/satori/go.uuid.v1 v1.2.0 h1:AH9uksa7bGe9rluapecRKBCpZvxaBEyu0RepitcD0Hw= gopkg.in/satori/go.uuid.v1 v1.2.0/go.mod h1:kjjdhYBBaa5W5DYP+OcVG3fRM6VWu14hqDYST4Zvw+E= gopkg.in/tylerb/graceful.v1 v1.2.15 h1:1JmOyhKqAyX3BgTXMI84LwT6FOJ4tP2N9e2kwTCM0nQ= gopkg.in/tylerb/graceful.v1 v1.2.15/go.mod h1:yBhekWvR20ACXVObSSdD3u6S9DeSylanL2PAbAC/uJ8= -gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0= -gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= rsc.io/goversion v1.2.0 h1:SPn+NLTiAG7w30IRK/DKp1BjvpWabYgxlLp/+kx5J8w= rsc.io/goversion v1.2.0/go.mod h1:Eih9y/uIBS3ulggl7KNJ09xGSLcuNaLgmvvqa07sgfo= diff --git a/protocols/propagate.go b/protocols/propagate.go new file mode 100644 index 0000000..6238fb7 --- /dev/null +++ b/protocols/propagate.go @@ -0,0 +1,339 @@ +package protocols + +import ( + "golang.org/x/xerrors" + "reflect" + "strings" + "sync" + "time" + + "go.dedis.ch/onet/v3" + "go.dedis.ch/onet/v3/log" + "go.dedis.ch/onet/v3/network" +) + +func init() { + network.RegisterMessage(PropagateSendData{}) + network.RegisterMessage(PropagateReply{}) +} + +// How long to wait before timing out on waiting for the time-out. +const initialWait = 100000 * time.Millisecond + +// Propagate is a protocol that sends some data to all attached nodes +// and waits for confirmation before returning. +type Propagate struct { + *onet.TreeNodeInstance + onDataToChildren PropagationOneMsg + onDataToRoot PropagationOneMsgSend + onDoneCb PropagationMultiMsg + sd *PropagateSendData + ChannelSD chan struct { + *onet.TreeNode + PropagateSendData + } + ChannelReply chan struct { + *onet.TreeNode + PropagateReply + } + + allowedFailures int + sync.Mutex + closing chan bool +} + +// PropagateSendData is the message to pass the data to the children +type PropagateSendData struct { + // Data is the data to transmit to the children + Data []byte + // How long the root will wait for the children before + // timing out. + Timeout time.Duration +} + +// PropagateReply is sent from the children back to the root +type PropagateReply struct { + // Data is the data to transmit to the root + Data []byte + // Level is how many children replied + Level int +} + +// PropagationFunc starts the propagation protocol and blocks until all children +// minus the exception stored the new value or the timeout has been reached. +// The return value is the number of nodes that acknowledged having +// stored the new value or an error if the protocol couldn't start. +type PropagationFunc func(el *onet.Roster, msg network.Message, + timeout time.Duration) ([]network.Message, error) + +// PropagationOneMsg is the function that will store the new data. +type PropagationOneMsg func(network.Message) error + +// PropagationOneMsgSend is the function that will store the new data. +type PropagationOneMsgSend func() network.Message + +// PropagationMultiMsg is the function that will store the new data. +type PropagationMultiMsg func([]network.Message) + +// propagationContext is used for testing. +type propagationContext interface { + ProtocolRegister(name string, protocol onet.NewProtocol) (onet.ProtocolID, error) + ServerIdentity() *network.ServerIdentity + CreateProtocol(name string, t *onet.Tree) (onet.ProtocolInstance, error) +} + +// NewPropagationProtocol creates a new protocl for propagation. +func NewPropagationProtocol(n *onet.TreeNodeInstance) (onet.ProtocolInstance, error) { + p := &Propagate{ + sd: &PropagateSendData{[]byte{}, initialWait}, + TreeNodeInstance: n, + closing: make(chan bool), + allowedFailures: (len(n.Roster().List) - 1) / 3, + } + for _, h := range []interface{}{&p.ChannelSD, &p.ChannelReply} { + if err := p.RegisterChannel(h); err != nil { + return nil, err + } + } + return p, nil +} + +// NewPropagationFunc registers a new protocol name with the context c and will +// set f as handler for every new instance of that protocol. +// The protocol will fail if more than thresh nodes per subtree fail to respond. +// If thresh == -1, the threshold defaults to len(n.Roster().List-1)/3. Thus, for a roster of +// 5, t = int(4/3) = 1, e.g. 1 node out of the 5 can fail. +func NewPropagationFunc(c propagationContext, name string, + thresh int) (PropagationFunc, error) { + return NewPropagationFuncTest(c, name, thresh, nil, nil) +} + +// NewPropagationFuncTest takes two callbacks for easier testing without +// a `service.NewProtocl` +func NewPropagationFuncTest(c propagationContext, name string, thresh int, + onDataToChildren PropagationOneMsg, + onDataToRoot PropagationOneMsgSend) (PropagationFunc, error) { + pid, err := c.ProtocolRegister(name, func(n *onet.TreeNodeInstance) (onet. + ProtocolInstance, error) { + pi, err := NewPropagationProtocol(n) + if err != nil { + return nil, xerrors.Errorf("couldn't create protocol: %+v", err) + } + proto := pi.(*Propagate) + proto.onDataToChildren = onDataToChildren + proto.onDataToRoot = onDataToRoot + return pi, err + }) + + log.Lvl3("Registering new propagation for", c.ServerIdentity(), + name, pid) + return func(el *onet.Roster, msg network.Message, + to time.Duration) ([]network.Message, error) { + rooted := el.NewRosterWithRoot(c.ServerIdentity()) + if rooted == nil { + return nil, xerrors.New("we're not in the roster") + } + tree := rooted.GenerateNaryTree(len(el.List)) + if tree == nil { + return nil, xerrors.New("Didn't find root in tree") + } + log.Lvl3(el.List[0].Address, "Starting to propagate", reflect.TypeOf(msg)) + pi, err := c.CreateProtocol(name, tree) + if err != nil { + return nil, err + } + proto := pi.(*Propagate) + proto.Lock() + t := thresh + if t == -1 { + t = (len(el.List) - 1) / 3 + } + proto.allowedFailures = t + + if msg != nil { + d, err := network.Marshal(msg) + if err != nil { + proto.Unlock() + return nil, err + } + proto.sd.Data = d + } + proto.sd.Timeout = to + + done := make(chan []network.Message) + proto.onDoneCb = func(msg []network.Message) { + done <- msg + } + proto.Unlock() + + if err := proto.Start(); err != nil { + return nil, err + } + select { + case replies := <-done: + return replies, nil + case <-proto.closing: + return nil, nil + } + }, err +} + +// Start will contact everyone and make the connections +func (p *Propagate) Start() error { + log.Lvl4("going to contact", p.Root().ServerIdentity) + return p.SendTo(p.Root(), p.sd) +} + +// Dispatch can handle timeouts +func (p *Propagate) Dispatch() error { + process := true + var received int + var rcvMsgs [][]byte + log.Lvl4(p.ServerIdentity(), "Start dispatch") + defer p.Done() + defer func() { + if p.IsRoot() { + if p.onDoneCb != nil { + var rcvNetMsgs []network.Message + for _, data := range rcvMsgs { + if data != nil && len(data) > 0 { + _, netMsg, err := network.Unmarshal(data, p.Suite()) + if err != nil { + log.Warnf("Got error while unmarshaling: %+v", err) + } else { + rcvNetMsgs = append(rcvNetMsgs, netMsg) + } + } + } + p.onDoneCb(rcvNetMsgs) + } + } + }() + + var gotSendData bool + var errs []error + subtreeCount := p.TreeNode().SubtreeCount() + + for process { + p.Lock() + timeout := p.sd.Timeout + log.Lvl4("Got timeout", timeout, "from SendData") + p.Unlock() + select { + case msg := <-p.ChannelSD: + if gotSendData { + log.Error("already got msg") + continue + } + gotSendData = true + log.Lvl3(p.ServerIdentity(), "Got data from", msg.ServerIdentity, "and setting timeout to", msg.Timeout) + p.sd.Timeout = msg.Timeout + if p.onDataToChildren != nil { + _, netMsg, err := network.Unmarshal(msg.Data, p.Suite()) + if err != nil { + log.Lvlf2("Unmarshal failed with %v", err) + } else { + err := p.onDataToChildren(netMsg) + if err != nil { + log.Lvlf2("Propagation callback failed: %v", err) + } + } + } + if !p.IsRoot() { + log.Lvl3(p.ServerIdentity(), "Sending to parent") + var data []byte + if p.onDataToRoot != nil { + var err error + data, err = network.Marshal(p.onDataToRoot()) + if err != nil { + return xerrors.Errorf("couldn't marshal message: %+v", + err) + } + } + if err := p.SendToParent( + &PropagateReply{Data: data}); err != nil { + return err + } + } + if p.IsLeaf() { + process = false + } else { + log.Lvl3(p.ServerIdentity(), "Sending to children") + if errs = p.SendToChildrenInParallel(&msg.PropagateSendData); len(errs) != 0 { + var errsStr []string + for _, e := range errs { + errsStr = append(errsStr, e.Error()) + } + if len(errs) > p.allowedFailures { + return xerrors.New(strings.Join(errsStr, "\n")) + } + log.Lvl2("Error while sending to children:", errsStr) + } + } + case rep := <-p.ChannelReply: + if !gotSendData { + log.Error("got response before send") + continue + } + received++ + log.Lvl4(p.ServerIdentity(), "received:", received, subtreeCount) + if !p.IsRoot() { + if err := p.SendToParent(&PropagateReply{ + Data: rep.Data}); err != nil { + return err + } + } else { + rcvMsgs = append(rcvMsgs, rep.Data) + } + // Only wait for the number of children that successfully received our message. + if received == subtreeCount-len(errs) && received >= subtreeCount-p.allowedFailures { + process = false + } + case <-time.After(timeout): + if received < subtreeCount-p.allowedFailures { + _, _, err := network.Unmarshal(p.sd.Data, p.Suite()) + return xerrors.Errorf("Timeout of %s reached, "+ + "got %v but need %v, err: %+v", + timeout, received, subtreeCount-p.allowedFailures, err) + } + process = false + case <-p.closing: + process = false + p.onDoneCb = nil + } + } + log.Lvl3(p.ServerIdentity(), "done, isroot:", p.IsRoot()) + return nil +} + +// RegisterOnDone takes a function that will be called once the data has been +// sent to the whole tree. It receives the number of nodes that replied +// successfully to the propagation. +func (p *Propagate) RegisterOnDone(fn PropagationMultiMsg) { + p.onDoneCb = fn +} + +// RegisterOnDataToChildren takes a function that will be called for that node if it +// needs to update its data. +func (p *Propagate) RegisterOnDataToChildren(fn PropagationOneMsg) { + p.onDataToChildren = fn +} + +// RegisterOnDataToRoot takes a function that will be called for that node if it +// needs to update its data. +func (p *Propagate) RegisterOnDataToRoot(fn PropagationOneMsgSend) { + p.onDataToRoot = fn +} + +// Config stores the basic configuration for that protocol. +func (p *Propagate) Config(d []byte, timeout time.Duration) { + p.sd.Data = d + p.sd.Timeout = timeout +} + +// Shutdown informs the Dispatch method to stop +// waiting. +func (p *Propagate) Shutdown() error { + close(p.closing) + return nil +} diff --git a/protocols/propagate_test.go b/protocols/propagate_test.go new file mode 100644 index 0000000..1735dc0 --- /dev/null +++ b/protocols/propagate_test.go @@ -0,0 +1,100 @@ +package protocols + +import ( + "bytes" + "github.com/stretchr/testify/require" + "go.dedis.ch/kyber/v3/suites" + "golang.org/x/xerrors" + "reflect" + "sync" + "testing" + "time" + + "go.dedis.ch/onet/v3" + "go.dedis.ch/onet/v3/log" + "go.dedis.ch/onet/v3/network" +) + +type propagateMsg struct { + Data []byte +} + +func init() { + network.RegisterMessage(propagateMsg{}) +} + +func TestPropagation(t *testing.T) { + propagate(t, + []int{3, 10, 14, 4, 8, 8}, + []int{0, 0, 0, 1, 3, 6}) +} + +var tSuite = suites.MustFind("Ed25519") + +// Tests an n-node system +func propagate(t *testing.T, nbrNodes, nbrFailures []int) { + for i, n := range nbrNodes { + local := onet.NewLocalTest(tSuite) + servers, el, _ := local.GenTree(n, true) + var recvCount int + var iMut sync.Mutex + msg := &propagateMsg{[]byte("propagate")} + propFuncs := make([]PropagationFunc, n) + + // setup the servers + var err error + for n, server := range servers { + pc := &PC{server, local.Overlays[server.ServerIdentity.ID]} + propFuncs[n], err = NewPropagationFuncTest(pc, "Propagate", + nbrFailures[i], + func(m network.Message) error { + if bytes.Equal(msg.Data, m.(*propagateMsg).Data) { + iMut.Lock() + recvCount++ + iMut.Unlock() + return nil + } + + t.Error("Didn't receive correct data") + return xerrors.New("Didn't receive correct data") + }, + func() network.Message { + return &propagateMsg{Data: []byte{1, 2, 3}} + }) + require.NoError(t, err) + } + + // shut down some servers to simulate failure + for k := 0; k < nbrFailures[i]; k++ { + err = servers[len(servers)-1-k].Close() + require.NoError(t, err) + } + + // start the propagation + log.Lvl2("Starting to propagate", reflect.TypeOf(msg)) + datas, err := propFuncs[0](el, msg, + 1*time.Second) + require.NoError(t, err) + require.Equal(t, n, recvCount+nbrFailures[i], "Didn't get data-request") + require.Equal(t, n-1, len(datas)+nbrFailures[i], "Not all nodes replied") + + local.CloseAll() + log.AfterTest(t) + } +} + +type PC struct { + C *onet.Server + O *onet.Overlay +} + +func (pc *PC) ProtocolRegister(name string, protocol onet.NewProtocol) (onet.ProtocolID, error) { + return pc.C.ProtocolRegister(name, protocol) +} +func (pc *PC) ServerIdentity() *network.ServerIdentity { + return pc.C.ServerIdentity + +} +func (pc *PC) CreateProtocol(name string, t *onet.Tree) (onet.ProtocolInstance, error) { + return pc.O.CreateProtocol(name, t, onet.NilServiceID) +} diff --git a/services/api.go b/services/api.go index b6a1bbd..a23f778 100644 --- a/services/api.go +++ b/services/api.go @@ -1,8 +1,7 @@ package servicesmedco import ( - "github.com/lca1/unlynx/lib" - "github.com/satori/go.uuid" + "github.com/ldsec/unlynx/lib" "go.dedis.ch/kyber/v3" "go.dedis.ch/kyber/v3/util/key" "go.dedis.ch/onet/v3" @@ -41,17 +40,14 @@ func (c *API) SendSurveyDDTRequestTerms(entities *onet.Roster, surveyID SurveyID start := time.Now() log.Lvl2("Client", c.ClientID, "is creating a DDT survey with ID:", surveyID) - rndUUID := uuid.NewV4() sdq := SurveyDDTRequest{ - SurveyID: SurveyID(rndUUID.String()), + SurveyID: surveyID, Roster: *entities, Proofs: proofs, Testing: testing, // query parameters to DDT Terms: terms, - - IntraMessage: false, } resp := ResultDDT{} @@ -59,8 +55,8 @@ func (c *API) SendSurveyDDTRequestTerms(entities *onet.Roster, surveyID SurveyID if err != nil { return nil, nil, TimeResults{}, err } - resp.TR.MapTR[DDTRequestTime] = time.Since(start) - return &surveyID, resp.Result, resp.TR, nil + resp.TR[DDTRequestTime] = time.Since(start) + return &surveyID, resp.Result, TimeResults{resp.TR}, nil } // SendSurveyKSRequest performs key switching in a list of values @@ -86,12 +82,14 @@ func (c *API) SendSurveyKSRequest(entities *onet.Roster, surveyID SurveyID, cPK } // SendSurveyShuffleRequest performs shuffling + key switching on a list of values -func (c *API) SendSurveyShuffleRequest(entities *onet.Roster, surveyID SurveyID, cPK kyber.Point, value libunlynx.CipherText, proofs bool) (*SurveyID, libunlynx.CipherText, TimeResults, error) { +func (c *API) SendSurveyShuffleRequest(entities *onet.Roster, surveyID SurveyID, cPK kyber.Point, value *libunlynx.CipherText, proofs bool) (*SurveyID, libunlynx.CipherText, TimeResults, error) { start := time.Now() log.Lvl2("Client", c.ClientID, "is creating a Shuffle survey with ID:", surveyID) target := make(libunlynx.CipherVector, 0) - target = append(target, value) + if value != nil { + target = append(target, *value) + } ssr := SurveyShuffleRequest{ SurveyID: surveyID, Roster: *entities, diff --git a/services/service.go b/services/service.go index ef8648a..c30f9db 100644 --- a/services/service.go +++ b/services/service.go @@ -2,59 +2,51 @@ package servicesmedco import ( "encoding/base64" + "fmt" "github.com/BurntSushi/toml" "github.com/btcsuite/goleveldb/leveldb/errors" "github.com/fanliao/go-concurrentMap" - "github.com/lca1/unlynx/lib" - "github.com/lca1/unlynx/lib/tools" - "github.com/lca1/unlynx/protocols" + "github.com/ldsec/medco-unlynx/protocols" + "github.com/ldsec/unlynx/lib" + "github.com/ldsec/unlynx/protocols" "go.dedis.ch/kyber/v3" "go.dedis.ch/kyber/v3/util/random" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" "go.dedis.ch/onet/v3/network" + "golang.org/x/xerrors" "os" "strings" "sync" "time" ) -// MsgTypes defines the Message Type ID for all the service's intra-messages. -type MsgTypes struct { - msgSurveyDDTRequestTerms network.MessageTypeID - msgSurveyTagGenerated network.MessageTypeID - msgSurveyKSRequest network.MessageTypeID - msgSurveyShuffleRequest network.MessageTypeID - msgSurveyShuffleGenerated network.MessageTypeID - msgSurveyAggRequest network.MessageTypeID - msgSurveyAggGenerated network.MessageTypeID -} - -var msgTypes = MsgTypes{} - func init() { _, err := onet.RegisterNewService(Name, NewService) log.ErrFatal(err) - // messages for DDT Request - msgTypes.msgSurveyDDTRequestTerms = network.RegisterMessage(&SurveyDDTRequest{}) - msgTypes.msgSurveyTagGenerated = network.RegisterMessage(&SurveyTagGenerated{}) - network.RegisterMessage(&ResultDDT{}) - - // messages for the other requests - msgTypes.msgSurveyKSRequest = network.RegisterMessage(&SurveyKSRequest{}) - msgTypes.msgSurveyShuffleRequest = network.RegisterMessage(&SurveyShuffleRequest{}) - msgTypes.msgSurveyShuffleGenerated = network.RegisterMessage(&SurveyShuffleGenerated{}) - msgTypes.msgSurveyAggRequest = network.RegisterMessage(&SurveyAggRequest{}) - msgTypes.msgSurveyAggGenerated = network.RegisterMessage(&SurveyAggGenerated{}) - network.RegisterMessage(&Result{}) + // Register SurveyShuffleRequest for propagation-protocol + network.RegisterMessage(&SurveyShuffleRequest{}) + + // Default timeout just for all tests to work + TimeoutService = 20 * time.Minute } +// TimeoutService is the communication idle timeout +var TimeoutService time.Duration + +var propagateShuffleFromChildren = "PropShuffleFromChildren" +var propagateShuffleToChildren = "PropShuffleToChildren" + // Service defines a service in unlynx type Service struct { *onet.ServiceProcessor - MapSurveyTag *concurrent.ConcurrentMap + Timeout time.Duration + + shuffleGetData protocols.PropagationFunc + shufflePutData protocols.PropagationFunc + MapSurveyKS *concurrent.ConcurrentMap MapSurveyShuffle *concurrent.ConcurrentMap MapSurveyAgg *concurrent.ConcurrentMap @@ -64,207 +56,99 @@ type Service struct { // NewService constructor which registers the needed messages. func NewService(c *onet.Context) (onet.Service, error) { newUnLynxInstance := &Service{ + Timeout: TimeoutService, ServiceProcessor: onet.NewServiceProcessor(c), - MapSurveyTag: concurrent.NewConcurrentMap(), MapSurveyKS: concurrent.NewConcurrentMap(), MapSurveyShuffle: concurrent.NewConcurrentMap(), MapSurveyAgg: concurrent.NewConcurrentMap(), Mutex: &sync.Mutex{}, } - - if cerr := newUnLynxInstance.RegisterHandler(newUnLynxInstance.HandleSurveyDDTRequestTerms); cerr != nil { - log.Error("Wrong Handler.", cerr) - return nil, cerr - } - if cerr := newUnLynxInstance.RegisterHandler(newUnLynxInstance.HandleSurveyKSRequest); cerr != nil { - log.Error("Wrong Handler.", cerr) - return nil, cerr + var err error + newUnLynxInstance.shuffleGetData, err = + protocols.NewPropagationFunc(newUnLynxInstance, propagateShuffleFromChildren, -1) + if err != nil { + return nil, fmt.Errorf("couldn't create propagation function: %+v", err) } - if cerr := newUnLynxInstance.RegisterHandler(newUnLynxInstance.HandleSurveyShuffleRequest); cerr != nil { - log.Error("Wrong Handler.", cerr) - return nil, cerr + newUnLynxInstance.shufflePutData, err = + protocols.NewPropagationFunc(newUnLynxInstance, propagateShuffleToChildren, -1) + if err != nil { + return nil, fmt.Errorf("couldn't create propagation function: %+v", err) } - if cerr := newUnLynxInstance.RegisterHandler(newUnLynxInstance.HandleSurveyAggRequest); cerr != nil { + + if cerr := newUnLynxInstance.RegisterHandlers( + newUnLynxInstance.HandleSurveyDDTRequestTerms, + newUnLynxInstance.HandleSurveyKSRequest, + newUnLynxInstance.HandleSurveyShuffleRequest, + newUnLynxInstance.HandleSurveyAggRequest); cerr != nil { log.Error("Wrong Handler.", cerr) return nil, cerr } - c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyDDTRequestTerms) - c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyTagGenerated) - - c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyKSRequest) - - c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyShuffleRequest) - c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyShuffleGenerated) - - c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyAggRequest) - c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyAggGenerated) - return newUnLynxInstance, nil } -// Process implements the processor interface and is used to recognize messages broadcasted between servers -func (s *Service) Process(msg *network.Envelope) { - if msg.MsgType.Equal(msgTypes.msgSurveyDDTRequestTerms) { - tmp := (msg.Msg).(*SurveyDDTRequest) - _, err := s.HandleSurveyDDTRequestTerms(tmp) - if err != nil { - log.Error(err) - } - } else if msg.MsgType.Equal(msgTypes.msgSurveyTagGenerated) { - tmp := (msg.Msg).(*SurveyTagGenerated) - _, err := s.HandleSurveyTagGenerated(tmp) - if err != nil { - log.Error(err) - } - } else if msg.MsgType.Equal(msgTypes.msgSurveyKSRequest) { - tmp := (msg.Msg).(*SurveyKSRequest) - _, err := s.HandleSurveyKSRequest(tmp) - if err != nil { - log.Error(err) - } - } else if msg.MsgType.Equal(msgTypes.msgSurveyShuffleRequest) { - tmp := (msg.Msg).(*SurveyShuffleRequest) - _, err := s.HandleSurveyShuffleRequest(tmp) - if err != nil { - log.Error(err) - } - } else if msg.MsgType.Equal(msgTypes.msgSurveyShuffleGenerated) { - tmp := (msg.Msg).(*SurveyShuffleGenerated) - _, err := s.HandleSurveyShuffleGenerated(tmp) - if err != nil { - log.Error(err) - } - } else if msg.MsgType.Equal(msgTypes.msgSurveyAggRequest) { - tmp := (msg.Msg).(*SurveyAggRequest) - _, err := s.HandleSurveyAggRequest(tmp) - if err != nil { - log.Error(err) - } - } else if msg.MsgType.Equal(msgTypes.msgSurveyAggGenerated) { - tmp := (msg.Msg).(*SurveyAggGenerated) - _, err := s.HandleSurveyAggGenerated(tmp) - if err != nil { - log.Error(err) - } - } else { - log.Error("Cannot identify the intra-message") - } -} - -// Request Handlers -//______________________________________________________________________________________________________________________ - -// HandleSurveyTagGenerated handles triggers the SurveyDDTChannel -func (s *Service) HandleSurveyTagGenerated(recq *SurveyTagGenerated) (network.Message, error) { - surveyTag, err := s.getSurveyTag(recq.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } - surveyTag.SurveyChannel <- 1 - return nil, nil -} - // HandleSurveyDDTRequestTerms handles the reception of the query terms to be deterministically tagged func (s *Service) HandleSurveyDDTRequestTerms(sdq *SurveyDDTRequest) (network.Message, error) { + // sanitize params + if err := emptySurveyID(sdq.SurveyID); err != nil { + return nil, xerrors.Errorf("%+v", err) + } + if err := emptyRoster(sdq.Roster); err != nil { + return nil, xerrors.Errorf("%+v", err) + } // if this server is the one receiving the request from the client - if !sdq.IntraMessage { - log.Lvl2(s.ServerIdentity().String(), " received a SurveyDDTRequestTerms:", sdq.SurveyID) - - if len(sdq.Terms) == 0 { - log.Lvl2(s.ServerIdentity(), " for survey", sdq.SurveyID, "has no data to det tag") - return &ResultDDT{}, nil - } - - // initialize timers - mapTR := make(map[string]time.Duration) - err := s.putSurveyTag(sdq.SurveyID, - SurveyTag{ - SurveyID: sdq.SurveyID, - Request: *sdq, - SurveyChannel: make(chan int, 100), - TR: TimeResults{mapTR}, - }) - if err != nil { - log.Error(err) - return nil, err - } + log.Lvl2(s.ServerIdentity().String(), " received a SurveyDDTRequestTerms:", sdq.SurveyID) - // signal the other nodes that they need to prepare to execute a DDT (no need to send the terms - // we only need the message source so that they know which node requested the DDT and fetch the secret accordingly) - err = libunlynxtools.SendISMOthers(s.ServiceProcessor, &sdq.Roster, - &SurveyDDTRequest{ - SurveyID: sdq.SurveyID, - Roster: sdq.Roster, - IntraMessage: true, - MessageSource: s.ServerIdentity(), - Proofs: sdq.Proofs, - Testing: sdq.Testing, - }) - if err != nil { - log.Error(err) - return nil, err - } - - surveyTag, err := s.getSurveyTag(sdq.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } - - // waits for all other nodes to receive the survey - counter := len(sdq.Roster.List) - 1 - for counter > 0 { - counter = counter - <-surveyTag.SurveyChannel - } - - deterministicTaggingResult, execTime, communicationTime, err := s.TaggingPhase(sdq.SurveyID, &sdq.Roster) - if err != nil { - log.Error(err) - return nil, err - } + if len(sdq.Terms) == 0 { + return nil, xerrors.Errorf(s.ServerIdentity().String() + " for survey" + string(sdq.SurveyID) + "has no data to det tag") + } - // convert the result to of the tagging for something close to the response of i2b2 (array of tagged terms) - listTaggedTerms := make([]libunlynx.GroupingKey, 0) - for _, el := range deterministicTaggingResult { - listTaggedTerms = append(listTaggedTerms, libunlynx.GroupingKey(el.String())) - } + // initialize timers + mapTR := make(map[string]time.Duration) - surveyTag, err = s.getSurveyTag(sdq.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } - surveyTag.TR.MapTR[TaggingTimeExec] = execTime - surveyTag.TR.MapTR[TaggingTimeCommunication] = communicationTime - return &ResultDDT{Result: listTaggedTerms, TR: surveyTag.TR}, nil + request := SurveyDDTRequest{ + SurveyID: sdq.SurveyID, + Proofs: sdq.Proofs, + Testing: sdq.Testing, + Terms: sdq.Terms, + MessageSource: s.ServerIdentity(), } - log.Lvl2(s.ServerIdentity().String(), " is notified of survey:", sdq.SurveyID) - - err := s.putSurveyTag(sdq.SurveyID, SurveyTag{ - SurveyID: sdq.SurveyID, - Request: *sdq, - }) + deterministicTaggingResult, execTime, communicationTime, + err := s.TaggingPhase(&request, &sdq.Roster) if err != nil { log.Error(err) return nil, err } - // sends a signal to unlock waiting channel - err = s.SendRaw(sdq.MessageSource, &SurveyTagGenerated{SurveyID: sdq.SurveyID}) - if err != nil { - log.Error("sending error ", err) - return nil, err + // convert the result to of the tagging for something close to the response of i2b2 (array of tagged terms) + listTaggedTerms := make([]libunlynx.GroupingKey, 0) + for _, el := range deterministicTaggingResult { + listTaggedTerms = append(listTaggedTerms, libunlynx.GroupingKey(el.String())) } - return nil, nil + mapTR[TaggingTimeExec] = execTime + mapTR[TaggingTimeCommunication] = communicationTime + return &ResultDDT{Result: listTaggedTerms, TR: mapTR}, nil } // HandleSurveyKSRequest handles the reception of the aggregate local result to be key switched func (s *Service) HandleSurveyKSRequest(skr *SurveyKSRequest) (network.Message, error) { + // sanitize params + if err := emptySurveyID(skr.SurveyID); err != nil { + return nil, xerrors.Errorf("%+v", err) + } + if err := emptyRoster(skr.Roster); err != nil { + return nil, xerrors.Errorf("%+v", err) + } + if skr.ClientPubKey == nil { + return nil, xerrors.Errorf("no target public key") + } + if skr.KSTarget == nil && len(skr.KSTarget) == 0 { + return nil, xerrors.Errorf(s.ServerIdentity().String() + " for survey" + string(skr.SurveyID) + "has no data to key switch") + } + log.Lvl2(s.ServerIdentity().String(), " received a SurveyKSRequest:", skr.SurveyID) mapTR := make(map[string]time.Duration) @@ -274,84 +158,97 @@ func (s *Service) HandleSurveyKSRequest(skr *SurveyKSRequest) (network.Message, TR: TimeResults{MapTR: mapTR}, }) if err != nil { - log.Error(err) - return nil, err + s.deleteSurveyKS(skr.SurveyID) + return nil, xerrors.Errorf("%+v", err) } // key switch the results keySwitchingResult, execTime, communicationTime, err := s.KeySwitchingPhase(skr.SurveyID, KSRequestName, &skr.Roster) if err != nil { - log.Error("key switching error:", err) - return nil, err + s.deleteSurveyKS(skr.SurveyID) + return nil, xerrors.Errorf("key switching error: %+v", err) } surveyKS, err := s.getSurveyKS(skr.SurveyID) if err != nil { - log.Error(err) - return nil, err + s.deleteSurveyKS(skr.SurveyID) + return nil, xerrors.Errorf("%+v", err) } surveyKS.TR.MapTR[KSTimeExec] = execTime surveyKS.TR.MapTR[KSTimeCommunication] = communicationTime + + // remove query from map + _, err = s.deleteSurveyKS(skr.SurveyID) + if err != nil { + return nil, xerrors.Errorf("%+v", err) + } + return &Result{Result: keySwitchingResult, TR: surveyKS.TR}, nil } // HandleSurveyShuffleRequest handles the reception of the aggregate local result to be shared/shuffled/switched func (s *Service) HandleSurveyShuffleRequest(ssr *SurveyShuffleRequest) (network.Message, error) { - var root bool - if s.ServerIdentity().String() == ssr.Roster.List[0].String() { - root = true - } else { - root = false + + // sanitize params + if err := emptySurveyID(ssr.SurveyID); err != nil { + return nil, xerrors.Errorf("%+v", err) + } + if err := emptyRoster(ssr.Roster); err != nil { + return nil, xerrors.Errorf("%+v", err) + } + if ssr.ClientPubKey == nil { + return nil, xerrors.Errorf("no target public key") } - log.Lvl2(s.ServerIdentity().String(), " received a SurveyShuffleRequest:", ssr.SurveyID, "(root =", root, "- intra =", ssr.IntraMessage, ")") + root := s.ServerIdentity().String() == ssr.Roster.List[0].String() - // (root = true - intra = false ) - if !ssr.IntraMessage && root { + log.Lvl2(s.ServerIdentity().String(), " received a SurveyShuffleRequest:", ssr.SurveyID, "(root =", root, ")") - mapTR := make(map[string]time.Duration) - err := s.putSurveyShuffle(ssr.SurveyID, SurveyShuffle{ - SurveyID: ssr.SurveyID, - Request: *ssr, - SurveyChannel: make(chan int, 100), - TR: TimeResults{MapTR: mapTR}}) - if err != nil { - log.Error(err) - return nil, err - } + if root { + //Message sent to root node: + //1. collect encrypted data from children + //2. run shuffling protocol + //3. distributed shuffled data to children + //4. start key-switching + //5. return data to client - // send signal to unlock the other nodes - err = libunlynxtools.SendISMOthers(s.ServiceProcessor, &ssr.Roster, &SurveyShuffleGenerated{SurveyID: ssr.SurveyID}) - if err != nil { - log.Error("broadcasting error ", err) - return nil, err + if ssr.ShuffleTarget == nil || len(ssr.ShuffleTarget) == 0 { + return nil, xerrors.Errorf(s.ServerIdentity().String() + " for survey" + string(ssr.SurveyID) + "has no data to shuffle") } - surveyShuffle, err := s.getSurveyShuffle(ssr.SurveyID) + childrenMsgs, err := s.shuffleGetData(&ssr.Roster, + &ProtocolConfig{SurveyID: ssr.SurveyID}, s.Timeout) if err != nil { - return nil, err + return nil, fmt.Errorf("couldn't get children data: %+v", err) } - // wait until you've got all the aggregate results from the other nodes - counter := len(ssr.Roster.List) - 1 - for counter > 0 { - counter = counter - <-surveyShuffle.SurveyChannel + mapTR := make(map[string]time.Duration) + surveyShuffle := SurveyShuffle{ + SurveyID: ssr.SurveyID, + Request: *ssr, + SurveyChannel: make(chan int, 100), + TR: TimeResults{MapTR: mapTR}, + } + for _, msg := range childrenMsgs { + req, ok := msg.(*SurveyShuffleRequest) + if !ok { + return nil, xerrors.New("couldn't convert msg to Request") + } + surveyShuffle.Request.ShuffleTarget = append(surveyShuffle. + Request.ShuffleTarget, req.ShuffleTarget...) } - surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID) + err = s.putSurveyShuffle(ssr.SurveyID, surveyShuffle) if err != nil { - return nil, err - } - if len(surveyShuffle.Request.ShuffleTarget) <= 1 { - return nil, errors.New("no data to shuffle") + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.Errorf("%+v", err) } // shuffle the results shufflingResult, execTime, communicationTime, err := s.ShufflingPhase(ssr.SurveyID, &ssr.Roster) - if err != nil { - log.Error("shuffling error", err) - return nil, err + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.Errorf("shuffling error: %+v", err) } shufflingFinalResult := make(libunlynx.CipherVector, 0) @@ -365,13 +262,12 @@ func (s *Service) HandleSurveyShuffleRequest(ssr *SurveyShuffleRequest) (network err = s.putSurveyShuffle(ssr.SurveyID, surveyShuffle) if err != nil { - log.Error(err) - return nil, err + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.Errorf("%+v", err) } // send the shuffled results to all the other nodes ssr.KSTarget = shufflingFinalResult - ssr.IntraMessage = true ssr.MessageSource = s.ServerIdentity() // let's delete what we don't need (less communication time) @@ -380,226 +276,143 @@ func (s *Service) HandleSurveyShuffleRequest(ssr *SurveyShuffleRequest) (network // signal the other nodes that they need to prepare to execute a key switching // basically after shuffling the results the root server needs to send them back // to the remaining nodes for key switching - err = libunlynxtools.SendISMOthers(s.ServiceProcessor, &ssr.Roster, ssr) + _, err = s.shufflePutData(&ssr.Roster, ssr, s.Timeout) if err != nil { - log.Error("broadcasting error ", err) + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, fmt.Errorf("couldn't send data to children: %+v", err) } // key switch the results keySwitchingResult, execTime, communicationTime, err := s.KeySwitchingPhase(ssr.SurveyID, ShuffleRequestName, &ssr.Roster) if err != nil { - log.Error("key switching error", err) - return nil, err + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.Errorf("key switching error: %+v", err) } // get server index - index := 0 - for i, r := range ssr.Roster.List { - if r.String() == s.ServerIdentity().String() { - index = i - break - } + index, _ := ssr.Roster.Search(s.ServerIdentity().ID) + if index < 0 { + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.New("couldn't find this node in the roster") } - surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } surveyShuffle.TR.MapTR[KSTimeExec] = execTime surveyShuffle.TR.MapTR[KSTimeCommunication] = communicationTime - return &Result{Result: libunlynx.CipherVector{keySwitchingResult[index]}, TR: surveyShuffle.TR}, nil - - //(root = false - intra = false ) - } else if !ssr.IntraMessage && !root { // if message sent by client and not a root node - - mapTR := make(map[string]time.Duration) - err := s.putSurveyShuffle(ssr.SurveyID, SurveyShuffle{ - SurveyID: ssr.SurveyID, - Request: *ssr, - SurveyChannel: make(chan int, 100), - FinalResultsChannel: make(chan int, 100), - TR: TimeResults{MapTR: mapTR}}) + // remove query from map + _, err = s.deleteSurveyShuffle(ssr.SurveyID) if err != nil { - log.Error(err) - return nil, err + return nil, xerrors.Errorf("%+v", err) } - ssr.IntraMessage = true - ssr.MessageSource = s.ServerIdentity() + return &Result{Result: libunlynx.CipherVector{keySwitchingResult[index]}, TR: surveyShuffle.TR}, nil - surveyShuffle, err := s.getSurveyShuffle(ssr.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } + } + //if message sent to children node: + //1. Send encrypted data to root node + //2. participate in shuffling + //3. receive shuffled data from root node + //4. start key-switching + //5. return data to client - // wait for root to be ready to send the local aggregate result - <-surveyShuffle.SurveyChannel + mapTR := make(map[string]time.Duration) + surveyShuffle := SurveyShuffle{ + SurveyID: ssr.SurveyID, + Request: *ssr, + SurveyChannel: make(chan int, 100), + FinalResultsChannel: make(chan int, 100), + TR: TimeResults{MapTR: mapTR}, + } + err := s.putSurveyShuffle(ssr.SurveyID, surveyShuffle) + if err != nil { + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.Errorf("%+v", err) + } - // send your local aggregate result to the root server (index 0) - err = s.SendRaw(ssr.Roster.List[0], ssr) - if err != nil { - log.Error(s.ServerIdentity().String()+"could not send its aggregate value", err) - return nil, err - } + ssr.MessageSource = s.ServerIdentity() + // wait for root to be ready to send the local aggregate result + select { + case <-surveyShuffle.SurveyChannel: + // update the local survey with the shuffled results surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID) if err != nil { - log.Error(err) - return nil, err + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.Errorf("%+v", err) } - //waits for the final results to be ready - <-surveyShuffle.FinalResultsChannel - // get server index - index := 0 - for i, r := range ssr.Roster.List { - if r.String() == s.ServerIdentity().String() { - index = i - break - } - } - - surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID) + // key switch the results + keySwitchingResult, execTime, communicationTime, err := s.KeySwitchingPhase(ssr.SurveyID, ShuffleRequestName, &ssr.Roster) if err != nil { - log.Error(err) - return nil, err + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.Errorf("key switching error: %+v", err) } - return &Result{Result: libunlynx.CipherVector{surveyShuffle.Request.KSTarget[index]}, TR: surveyShuffle.TR}, nil - - // (root = true - intra = true ) - } else if ssr.IntraMessage && root { // if message sent by another node and is root - - // the other nodes sent their local aggregation values - s.Mutex.Lock() - surveyShuffle, err := s.getSurveyShuffle(ssr.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } - surveyShuffle.Request.ShuffleTarget = append(surveyShuffle.Request.ShuffleTarget, ssr.ShuffleTarget...) - err = s.putSurveyShuffle(ssr.SurveyID, surveyShuffle) - if err != nil { - log.Error(err) - return nil, err - } - s.Mutex.Unlock() + surveyShuffle.TR.MapTR[KSTimeExec] = execTime + surveyShuffle.TR.MapTR[KSTimeCommunication] = communicationTime - surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID) - if err != nil { - log.Error(err) - return nil, err + // get server index + index, _ := ssr.Roster.Search(s.ServerIdentity().ID) + if index < 0 { + s.deleteSurveyShuffle(ssr.SurveyID) + return nil, xerrors.New("couldn't find this node in the roster") } - surveyShuffle.SurveyChannel <- 1 - - // (root = false - intra = true ) - } else { // if message sent by another node and not root - // update the local survey with the shuffled results - s.Mutex.Lock() - surveyShuffle, err := s.getSurveyShuffle(ssr.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } - surveyShuffle.Request.KSTarget = ssr.KSTarget - err = s.putSurveyShuffle(ssr.SurveyID, surveyShuffle) + // remove query from map + _, err = s.deleteSurveyShuffle(ssr.SurveyID) if err != nil { - log.Error(err) - return nil, err + return nil, xerrors.Errorf("%+v", err) } - s.Mutex.Unlock() - // key switch the results - keySwitchingResult, execTime, communicationTime, err := s.KeySwitchingPhase(ssr.SurveyID, ShuffleRequestName, &ssr.Roster) - if err != nil { - log.Error("key switching error", err) - return nil, err - } + return &Result{Result: libunlynx.CipherVector{keySwitchingResult[index]}, + TR: surveyShuffle.TR}, nil - s.Mutex.Lock() - surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID) - if err != nil { - log.Error(err) - return nil, err - } - surveyShuffle.Request.KSTarget = keySwitchingResult - surveyShuffle.TR.MapTR[KSTimeExec] = execTime - surveyShuffle.TR.MapTR[KSTimeCommunication] = communicationTime - err = s.putSurveyShuffle(ssr.SurveyID, surveyShuffle) + case <-time.After(s.Timeout): + // remove query from map + _, err = s.deleteSurveyShuffle(ssr.SurveyID) if err != nil { - log.Error(err) - return nil, err + return nil, xerrors.Errorf("%+v", err) } - s.Mutex.Unlock() - surveyShuffle.FinalResultsChannel <- 1 + return nil, xerrors.Errorf(s.ServerIdentity().String(), "didn't get a reply from the nodes in time.") } - return nil, nil } -// HandleSurveyShuffleGenerated handles triggers the SurveyChannel in the Shuffle Request -func (s *Service) HandleSurveyShuffleGenerated(recq *SurveyShuffleGenerated) (network.Message, error) { - var el interface{} - el = nil - for el == nil { - el, _ = s.MapSurveyShuffle.Get((string)(recq.SurveyID)) - if el != nil { - break - } - time.Sleep(time.Millisecond * 100) +// HandleSurveyAggRequest handles the reception of the aggregate local result to be shared/shuffled/switched +func (s *Service) HandleSurveyAggRequest(sar *SurveyAggRequest) (network.Message, error) { + // sanitize params + if err := emptyRoster(sar.Roster); err != nil { + return nil, xerrors.Errorf("%+v", err) } - - surveyShuffle, err := s.getSurveyShuffle(recq.SurveyID) - if err != nil { - log.Error(err) - return nil, err + if err := emptySurveyID(sar.SurveyID); err != nil { + return nil, xerrors.Errorf("%+v", err) + } + if sar.AggregateTarget.K == nil || sar.AggregateTarget.C == nil { + return nil, xerrors.Errorf(s.ServerIdentity().String() + " for survey" + string(sar.SurveyID) + "has no data to aggregate") + } + if sar.ClientPubKey == nil { + return nil, xerrors.Errorf("no target public key") } - surveyShuffle.SurveyChannel <- 1 - return nil, nil -} -// HandleSurveyAggRequest handles the reception of the aggregate local result to be shared/shuffled/switched -func (s *Service) HandleSurveyAggRequest(sar *SurveyAggRequest) (network.Message, error) { log.Lvl2(s.ServerIdentity().String(), " received a SurveyAggRequest:", sar.SurveyID) mapTR := make(map[string]time.Duration) - err := s.putSurveyAgg(sar.SurveyID, SurveyAgg{ - SurveyID: sar.SurveyID, - Request: *sar, - SurveyChannel: make(chan int, 100), - TR: TimeResults{MapTR: mapTR}}) - - if err != nil { - log.Error(err) - return nil, err - } - - // send signal to unlock the other nodes - err = libunlynxtools.SendISMOthers(s.ServiceProcessor, &sar.Roster, &SurveyAggGenerated{SurveyID: sar.SurveyID}) - if err != nil { - log.Error("broadcasting error ", err) + surveyAgg := SurveyAgg{ + SurveyID: sar.SurveyID, + Request: *sar, + TR: TimeResults{MapTR: mapTR}, } - - surveyAgg, err := s.getSurveyAgg(sar.SurveyID) + err := s.putSurveyAgg(sar.SurveyID, surveyAgg) if err != nil { - return nil, err - } - - // wait until you've got all the aggregate results from the other nodes - counter := len(sar.Roster.List) - 1 - for counter > 0 { - counter = counter - <-surveyAgg.SurveyChannel + s.deleteSurveyAgg(sar.SurveyID) + return nil, xerrors.Errorf("%+v", err) } // collectively aggregate the results aggregationResult, aggrTime, err := s.CollectiveAggregationPhase(sar.SurveyID, &sar.Roster) if err != nil { - log.Error("aggregation error", err) - return nil, err + s.deleteSurveyAgg(sar.SurveyID) + return nil, xerrors.Errorf("aggregation error: %+v", err) } surveyAgg.Request.KSTarget = aggregationResult @@ -607,38 +420,26 @@ func (s *Service) HandleSurveyAggRequest(sar *SurveyAggRequest) (network.Message err = s.putSurveyAgg(sar.SurveyID, surveyAgg) if err != nil { - return nil, err + s.deleteSurveyAgg(sar.SurveyID) + return nil, xerrors.Errorf("%+v", err) } // key switch the results keySwitchingResult, execTime, communicationTime, err := s.KeySwitchingPhase(sar.SurveyID, AggRequestName, &sar.Roster) if err != nil { - log.Error("key switching error", err) - return nil, err + s.deleteSurveyAgg(sar.SurveyID) + return nil, xerrors.Errorf("key switching error: %+v", err) } surveyAgg.TR.MapTR[KSTimeExec] = execTime surveyAgg.TR.MapTR[KSTimeCommunication] = communicationTime - return &Result{Result: keySwitchingResult, TR: surveyAgg.TR}, nil -} - -// HandleSurveyAggGenerated handles triggers the SurveyChannel -func (s *Service) HandleSurveyAggGenerated(recq *SurveyAggGenerated) (network.Message, error) { - var el interface{} - el = nil - for el == nil { - el, _ = s.MapSurveyAgg.Get((string)(recq.SurveyID)) - if el != nil { - break - } - time.Sleep(time.Millisecond * 100) - } - surveyAgg, err := s.getSurveyAgg(recq.SurveyID) + // remove query from map + _, err = s.deleteSurveyAgg(sar.SurveyID) if err != nil { - return nil, err + return nil, xerrors.Errorf("%+v", err) } - surveyAgg.SurveyChannel <- 1 - return nil, nil + + return &Result{Result: keySwitchingResult, TR: surveyAgg.TR}, nil } // Protocol Handlers @@ -689,26 +490,32 @@ func (s *Service) whatRequest(target string) (bool, libunlynx.CipherVector, kybe } // NewProtocol creates a protocol instance executed by all nodes -func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfig) (onet.ProtocolInstance, error) { - if err := tn.SetConfig(conf); err != nil { - return nil, err - } - +func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, + conf *onet.GenericConfig) (onet.ProtocolInstance, error) { var pi onet.ProtocolInstance var err error - target := SurveyID(string(conf.Data)) + var target SurveyID + var protoConf ProtocolConfig + + if conf != nil && conf.Data != nil { + protoConf, err = unmarshalProtocolConfig(conf.Data) + if err != nil { + return nil, err + } + target = protoConf.getTarget() + } switch tn.ProtocolName() { case protocolsunlynx.DeterministicTaggingProtocolName: - surveyTag, err := s.getSurveyTag(target) + _, sti, err := network.Unmarshal(protoConf.Data, libunlynx.SuiTe) if err != nil { - log.Error(err) - return nil, err + log.Fatal(err) + return nil, fmt.Errorf("couldn't unmarshal: %+v", err) } + surveyRequest := sti.(*SurveyDDTRequest) pi, err = protocolsunlynx.NewDeterministicTaggingProtocol(tn) if err != nil { - log.Error(err) return nil, err } hashCreation := pi.(*protocolsunlynx.DeterministicTaggingProtocol) @@ -716,41 +523,56 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi var serverIDMap *network.ServerIdentity if tn.IsRoot() { - dataToDDT := surveyTag.Request.Terms + dataToDDT := surveyRequest.Terms hashCreation.TargetOfSwitch = &dataToDDT + surveyRequest.Terms = libunlynx.CipherVector{} + + pc, err := newProtocolConfig(surveyRequest.SurveyID, "", + surveyRequest) + if err != nil { + return nil, fmt.Errorf("couldn't update protocolConfig: %+v", + err) + } + newConfig, err := pc.getConfig() + if err != nil { + return nil, fmt.Errorf("couldn't set config again: %+v", err) + } + conf = &newConfig + serverIDMap = s.ServerIdentity() } else { - serverIDMap = surveyTag.Request.MessageSource + serverIDMap = surveyRequest.MessageSource } s.Mutex.Lock() var aux kyber.Scalar - if surveyTag.Request.Testing { - aux, err = CheckDDTSecrets(DDTSecretsPath+"_"+s.ServerIdentity().Address.Host()+":"+s.ServerIdentity().Address.Port()+".toml", serverIDMap.Address, nil) + if surveyRequest.Testing { + path := DDTSecretsPath + "_" + s.ServerIdentity().Address.Host() + ":" + s.ServerIdentity().Address.Port() + ".toml" + aux, err = CheckDDTSecrets(path, serverIDMap.Address, nil) if err != nil || aux == nil { + log.Fatal(err) return nil, errors.New("Error while reading the DDT secrets from file") } } else { aux, err = CheckDDTSecrets(os.Getenv("UNLYNX_DDT_SECRETS_FILE_PATH"), serverIDMap.Address, nil) if err != nil || aux == nil { + log.Fatal(err) return nil, errors.New("Error while reading the DDT secrets from file") } } hashCreation.SurveySecretKey = &aux - hashCreation.Proofs = surveyTag.Request.Proofs + hashCreation.Proofs = surveyRequest.Proofs s.Mutex.Unlock() case protocolsunlynx.ShufflingProtocolName: surveyShuffle, err := s.getSurveyShuffle(target) if err != nil { - log.Error(err) return nil, err } pi, err = protocolsunlynx.NewShufflingProtocol(tn) if err != nil { - log.Error(err) return nil, err } @@ -766,7 +588,6 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi case protocolsunlynx.KeySwitchingProtocolName: pi, err = protocolsunlynx.NewKeySwitchingProtocol(tn) if err != nil { - log.Error(err) return nil, err } @@ -776,7 +597,6 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi //define which map to retrieve the values to key switch proofs, data, cPubKey, err := s.whatRequest(string(target)) if err != nil { - log.Error(err) return nil, err } @@ -786,16 +606,26 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi tmp := cPubKey keySwitch.TargetPublicKey = &tmp } + case protocolsunlynx.CollectiveAggregationProtocolName: - surveyAgg, err := s.getSurveyAgg(target) - if err != nil { - log.Error(err) - return nil, err + var surveyAgg SurveyAgg + maxLoop := int(s.Timeout.Minutes()) * 60 + for i := 1; i <= maxLoop; i++ { + surveyAgg, err = s.getSurveyAgg(target) + if err != nil { + log.Lvl3(s.ServerIdentity(), "Waiting for data to arrive for survey", target) + if i == maxLoop { + return nil, xerrors.New( + "didn't get data within time - aborting") + } + time.Sleep(1 * time.Second) + } else { + break + } } pi, err = protocolsunlynx.NewCollectiveAggregationProtocol(tn) if err != nil { - log.Error(err) return nil, err } @@ -805,34 +635,109 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi data := make([]libunlynx.CipherText, 0) data = append(data, surveyAgg.Request.AggregateTarget) aggr.SimpleData = &data + + case propagateShuffleFromChildren: + pi, err = protocols.NewPropagationProtocol(tn) + if err != nil { + return nil, xerrors.Errorf("couldn't create protocol: %+v", err) + } + prop := pi.(*protocols.Propagate) + surveyIDChan := make(chan SurveyID) + prop.RegisterOnDataToChildren(func(msg network.Message) error { + pc, ok := msg.(*ProtocolConfig) + if !ok { + return errors.New("didn't get ProtocolConfig") + } + surveyIDChan <- pc.SurveyID + return nil + }) + surveyShuffleChan := make(chan SurveyShuffle) + prop.RegisterOnDataToRoot(func() network.Message { + select { + case ss := <-surveyShuffleChan: + return &ss.Request + case <-time.After(s.Timeout): + return errors.New(s.ServerIdentity().String() + "didn't get the data from the nodes in time.") + } + }) + go func() { + select { + case surveyID := <-surveyIDChan: + for { + surveyShuffle, err := s.getSurveyShuffle(surveyID) + if err != nil { + time.Sleep(100 * time.Millisecond) + } else { + surveyShuffleChan <- surveyShuffle + break + } + } + case <-time.After(s.Timeout): + log.Error(s.ServerIdentity().String() + "didn't get the survey notification in time.") + return + } + }() + + case propagateShuffleToChildren: + pi, err = protocols.NewPropagationProtocol(tn) + if err != nil { + return nil, xerrors.Errorf("couldn't create new protocol: %+v", + err) + } + prop := pi.(*protocols.Propagate) + prop.RegisterOnDataToChildren(func(msg network.Message) error { + ssr, ok := msg.(*SurveyShuffleRequest) + if !ok { + return xerrors.New("didn't receive SurveyShuffleRequest" + + " message") + } + surveyShuffle, err := s.getSurveyShuffle(ssr.SurveyID) + if err != nil { + return xerrors.Errorf("couldn't get survey: %+v", err) + } + surveyShuffle.Request.KSTarget = ssr.KSTarget + err = s.putSurveyShuffle(ssr.SurveyID, surveyShuffle) + if err != nil { + return xerrors.Errorf( + "couldn't store new surveyShuffle: %+v", err) + } + + surveyShuffle.SurveyChannel <- 1 + return nil + }) + default: return nil, errors.New("Service attempts to start an unknown protocol: " + tn.ProtocolName() + ".") } + if err := tn.SetConfig(conf); err != nil { + return nil, xerrors.Errorf("couldn't set config: %+v", err) + } + return pi, nil } // StartProtocol starts a specific protocol (Shuffling, KeySwitching, etc.) -func (s *Service) StartProtocol(name, typeQ string, targetSurvey SurveyID, roster *onet.Roster) (onet.ProtocolInstance, error) { +func (s *Service) StartProtocol(name, typeQ string, pc ProtocolConfig, + roster *onet.Roster) (onet.ProtocolInstance, error) { tree := roster.GenerateNaryTreeWithRoot(2, s.ServerIdentity()) tn := s.NewTreeNodeInstance(tree, tree.Root, name) - var confData string if name == protocolsunlynx.KeySwitchingProtocolName { - confData = string(targetSurvey) + "/" + typeQ - } else { - confData = string(targetSurvey) + pc.TypeQ = typeQ } - conf := onet.GenericConfig{Data: []byte(confData)} + conf, err := pc.getConfig() + if err != nil { + return nil, fmt.Errorf("couldn't get config: %+v", err) + } pi, err := s.NewProtocol(tn, &conf) if err != nil || pi == nil { - return nil, err + return nil, fmt.Errorf("couldn't start new protocol: %+v", err) } err = s.RegisterProtocolInstance(pi) if err != nil { - log.Error(err) return nil, err } @@ -854,60 +759,81 @@ func (s *Service) StartProtocol(name, typeQ string, targetSurvey SurveyID, roste //______________________________________________________________________________________________________________________ // TaggingPhase performs the private grouping on the currently collected data. -func (s *Service) TaggingPhase(targetSurvey SurveyID, roster *onet.Roster) ([]libunlynx.DeterministCipherText, time.Duration, time.Duration, error) { +func (s *Service) TaggingPhase(targetSurvey *SurveyDDTRequest, + roster *onet.Roster) ([]libunlynx.DeterministCipherText, time.Duration, time.Duration, error) { start := time.Now() - pi, err := s.StartProtocol(protocolsunlynx.DeterministicTaggingProtocolName, "", targetSurvey, roster) + pc, err := newProtocolConfig(targetSurvey.SurveyID, "", targetSurvey) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, fmt.Errorf("couldn't get protoConfig: %+v", err) + } + pi, err := s.StartProtocol(protocolsunlynx. + DeterministicTaggingProtocolName, "", pc, roster) + if err != nil { + return nil, 0, 0, fmt.Errorf("couldn't start protocol: %+v", err) + } + select { + case deterministicTaggingResult := <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel: + execTime := pi.(*protocolsunlynx.DeterministicTaggingProtocol).ExecTime + return deterministicTaggingResult, execTime, time.Since(start) - execTime, nil + case <-time.After(s.Timeout): + return nil, 0, 0, fmt.Errorf("couldn't finish tagging protocol in time") } - deterministicTaggingResult := <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel - - execTime := pi.(*protocolsunlynx.DeterministicTaggingProtocol).ExecTime - return deterministicTaggingResult, execTime, time.Since(start) - execTime, nil } // CollectiveAggregationPhase performs a collective aggregation between the participating nodes func (s *Service) CollectiveAggregationPhase(targetSurvey SurveyID, roster *onet.Roster) (libunlynx.CipherText, time.Duration, error) { start := time.Now() - pi, err := s.StartProtocol(protocolsunlynx.CollectiveAggregationProtocolName, "", targetSurvey, roster) + pi, err := s.StartProtocol(protocolsunlynx.CollectiveAggregationProtocolName, "", + ProtocolConfig{targetSurvey, "", nil}, roster) if err != nil { return libunlynx.CipherText{}, 0, err } - aggregationResult := <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel - - // in the resulting map there is only one element - var finalResult libunlynx.CipherText - for _, v := range aggregationResult.GroupedData { - finalResult = v.AggregatingAttributes[0] - break + select { + case aggregationResult := <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel: + // in the resulting map there is only one element + var finalResult libunlynx.CipherText + for _, v := range aggregationResult.GroupedData { + finalResult = v.AggregatingAttributes[0] + break + } + return finalResult, time.Since(start), nil + case <-time.After(s.Timeout): + return libunlynx.CipherText{}, 0, fmt.Errorf("couldn't finish collective aggregation protocol in time") } - return finalResult, time.Since(start), nil } // ShufflingPhase performs the shuffling aggregated results from each of the nodes func (s *Service) ShufflingPhase(targetSurvey SurveyID, roster *onet.Roster) ([]libunlynx.CipherVector, time.Duration, time.Duration, error) { start := time.Now() - pi, err := s.StartProtocol(protocolsunlynx.ShufflingProtocolName, "", targetSurvey, roster) + pi, err := s.StartProtocol(protocolsunlynx.ShufflingProtocolName, "", + ProtocolConfig{targetSurvey, "", nil}, roster) if err != nil { return nil, 0, 0, err } - shufflingResult := <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel - - execTime := pi.(*protocolsunlynx.ShufflingProtocol).ExecTime - return shufflingResult, execTime, time.Since(start) - execTime, nil + select { + case shufflingResult := <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel: + execTime := pi.(*protocolsunlynx.ShufflingProtocol).ExecTime + return shufflingResult, execTime, time.Since(start) - execTime, nil + case <-time.After(s.Timeout): + return nil, 0, 0, fmt.Errorf("couldn't finish shuffling protocol in time") + } } // KeySwitchingPhase performs the switch to the querier key on the currently aggregated data. func (s *Service) KeySwitchingPhase(targetSurvey SurveyID, typeQ string, roster *onet.Roster) (libunlynx.CipherVector, time.Duration, time.Duration, error) { start := time.Now() - pi, err := s.StartProtocol(protocolsunlynx.KeySwitchingProtocolName, typeQ, targetSurvey, roster) + pi, err := s.StartProtocol(protocolsunlynx.KeySwitchingProtocolName, typeQ, + ProtocolConfig{targetSurvey, "", nil}, roster) if err != nil { return nil, 0, 0, err } - keySwitchedAggregatedResponses := <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel - - execTime := pi.(*protocolsunlynx.KeySwitchingProtocol).ExecTime - return keySwitchedAggregatedResponses, execTime, time.Since(start) - execTime, nil + select { + case keySwitchedAggregatedResponses := <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel: + execTime := pi.(*protocolsunlynx.KeySwitchingProtocol).ExecTime + return keySwitchedAggregatedResponses, execTime, time.Since(start) - execTime, nil + case <-time.After(s.Timeout): + return nil, 0, 0, fmt.Errorf("couldn't finish key switching protocol in time") + } } // Support functions @@ -940,9 +866,7 @@ func createTOMLSecrets(path string, id network.Address, secret kyber.Scalar) (ky secret = libunlynx.SuiTe.Scalar().Pick(random.New()) } b, err := secret.MarshalBinary() - if err != nil { - log.Error(err) return nil, err } @@ -952,7 +876,6 @@ func createTOMLSecrets(path string, id network.Address, secret kyber.Scalar) (ky err = encoder.Encode(&endR) if err != nil { - log.Error(err) return nil, err } @@ -969,7 +892,6 @@ func addTOMLSecret(path string, content privateTOML) error { err = encoder.Encode(&content) if err != nil { - log.Error(err) return err } @@ -986,7 +908,6 @@ func CheckDDTSecrets(path string, id network.Address, secret kyber.Scalar) (kybe contents := privateTOML{} if _, err := toml.DecodeFile(path, &contents); err != nil { - log.Error(err) return nil, err } @@ -996,13 +917,11 @@ func CheckDDTSecrets(path string, id network.Address, secret kyber.Scalar) (kybe b, err := base64.URLEncoding.DecodeString(el.Secret) if err != nil { - log.Error(err) return nil, err } err = secret.UnmarshalBinary(b) if err != nil { - log.Error(err) return nil, err } @@ -1018,7 +937,6 @@ func CheckDDTSecrets(path string, id network.Address, secret kyber.Scalar) (kybe b, err := secret.MarshalBinary() if err != nil { - log.Error(err) return nil, err } @@ -1031,3 +949,17 @@ func CheckDDTSecrets(path string, id network.Address, secret kyber.Scalar) (kybe return secret, nil } + +func emptySurveyID(id SurveyID) error { + if id == "" { + return errors.New("survey id is empty") + } + return nil +} + +func emptyRoster(roster onet.Roster) error { + if len(roster.List) == 0 { + return errors.New("roster is empty") + } + return nil +} diff --git a/services/service_test.go b/services/service_test.go index 8007687..8d99fab 100644 --- a/services/service_test.go +++ b/services/service_test.go @@ -1,8 +1,8 @@ package servicesmedco_test import ( - "github.com/lca1/medco-unlynx/services" - "github.com/lca1/unlynx/lib" + "github.com/ldsec/medco-unlynx/services" + "github.com/ldsec/unlynx/lib" "github.com/stretchr/testify/assert" "go.dedis.ch/kyber/v3" "go.dedis.ch/onet/v3" @@ -15,7 +15,7 @@ import ( func getParam(nbServers int) (*onet.Roster, *onet.LocalTest) { - log.SetDebugVisible(1) + log.SetDebugVisible(2) local := onet.NewLocalTest(libunlynx.SuiTe) // generate 3 hosts, they don't connect, they process messages, and they // don't register the tree or entity list @@ -46,9 +46,10 @@ func getQueryParams(nbQp int, encKey kyber.Point) libunlynx.CipherVector { func TestServiceDDT(t *testing.T) { // test with 10 servers - el, local := getParam(10) - // test with 10 concurrent clients - clients := getClients(10, el) + nbrServers := 3 + el, local := getParam(nbrServers) + // test with as many clients as servers + clients := getClients(nbrServers, el) // the first two threads execute the same operation (repetition) to check that in the end it yields the same result clients[1] = clients[0] // test the query DDT with 500 query terms @@ -60,12 +61,25 @@ func TestServiceDDT(t *testing.T) { results := make(map[string][]libunlynx.GroupingKey) + // sanitization tests + // no SurveyID + _, _, _, err := clients[0].SendSurveyDDTRequestTerms(el, "", qt, proofs, true) + assert.Error(t, err) + // no Roster + emptyRoster := *el + emptyRoster.List = nil + _, _, _, err = clients[0].SendSurveyDDTRequestTerms(&emptyRoster, "testDDTSurvey_", qt, proofs, true) + assert.Error(t, err) + // no terms to tag + _, _, _, err = clients[0].SendSurveyDDTRequestTerms(el, "testDDTSurvey_", nil, proofs, true) + wg := libunlynx.StartParallelize(len(clients)) var mutex = sync.Mutex{} for i, client := range clients { go func(i int, client *servicesmedco.API) { defer wg.Done() + var err error _, res, tr, err := client.SendSurveyDDTRequestTerms(el, servicesmedco.SurveyID("testDDTSurvey_"+client.ClientID), qt, proofs, true) mutex.Lock() results["testDDTSurvey_"+client.ClientID] = res @@ -87,10 +101,11 @@ func TestServiceDDT(t *testing.T) { func TestServiceKS(t *testing.T) { // test with 10 servers - el, local := getParam(10) - // test with 10 concurrent clients - nbHosts := 10 - clients := getClients(nbHosts, el) + nbrServers := 3 + el, local := getParam(nbrServers) + // test with as many clients as servers + nbrClients := nbrServers + clients := getClients(nbrClients, el) defer local.CloseAll() proofs := false @@ -98,9 +113,9 @@ func TestServiceKS(t *testing.T) { secKeys := make([]kyber.Scalar, 0) pubKeys := make([]kyber.Point, 0) targetData := make(libunlynx.CipherVector, 0) - results := make([][]int64, nbHosts) + results := make([][]int64, nbrClients) - for i := 0; i < nbHosts; i++ { + for i := 0; i < nbrClients; i++ { _, sK, pK := libunlynx.GenKeys(1) secKeys = append(secKeys, sK[0]) pubKeys = append(pubKeys, pK[0]) @@ -108,7 +123,23 @@ func TestServiceKS(t *testing.T) { targetData = append(targetData, *libunlynx.EncryptInt(el.Aggregate, int64(i))) } - wg := libunlynx.StartParallelize(nbHosts) + // sanitization tests + // no SurveyID + _, _, _, err := clients[0].SendSurveyKSRequest(el, "", pubKeys[0], targetData, proofs) + assert.Error(t, err) + // no Roster + emptyRoster := *el + emptyRoster.List = nil + _, _, _, err = clients[0].SendSurveyKSRequest(&emptyRoster, "testKSRequest", pubKeys[0], targetData, proofs) + assert.Error(t, err) + // no target pubKey + _, _, _, err = clients[0].SendSurveyKSRequest(el, "testKSRequest", nil, targetData, proofs) + assert.Error(t, err) + // no terms to key switch + _, _, _, err = clients[0].SendSurveyKSRequest(el, "testKSRequest", pubKeys[0], nil, proofs) + assert.Error(t, err) + + wg := libunlynx.StartParallelize(nbrClients) var mutex = sync.Mutex{} for i, client := range clients { go func(i int, client *servicesmedco.API) { @@ -133,7 +164,7 @@ func TestServiceKS(t *testing.T) { // Check result for _, res := range results { - for i := 0; i < nbHosts; i++ { + for i := 0; i < nbrClients; i++ { assert.Equal(t, res[i], int64(i)) } } @@ -141,10 +172,11 @@ func TestServiceKS(t *testing.T) { func TestServiceAgg(t *testing.T) { // test with 10 servers - el, local := getParam(10) - // test with 10 concurrent clients - nbHosts := 10 - clients := getClients(nbHosts, el) + nbrServers := 3 + el, local := getParam(nbrServers) + // test with as many clients as servers + nbrClients := nbrServers + clients := getClients(nbrClients, el) defer local.CloseAll() proofs := false @@ -152,21 +184,38 @@ func TestServiceAgg(t *testing.T) { secKeys := make([]kyber.Scalar, 0) pubKeys := make([]kyber.Point, 0) targetData := *libunlynx.EncryptInt(el.Aggregate, int64(1)) - results := make([]int64, nbHosts) + results := make([]int64, nbrClients) - for i := 0; i < nbHosts; i++ { + for i := 0; i < nbrClients; i++ { _, sK, pK := libunlynx.GenKeys(1) secKeys = append(secKeys, sK[0]) pubKeys = append(pubKeys, pK[0]) } - wg := libunlynx.StartParallelize(nbHosts) + // sanitization tests + // no SurveyID + _, _, _, err := clients[0].SendSurveyAggRequest(el, "", pubKeys[0], targetData, proofs) + assert.Error(t, err) + // no Roster + emptyRoster := *el + emptyRoster.List = nil + _, _, _, err = clients[0].SendSurveyAggRequest(&emptyRoster, "testAggRequest", pubKeys[0], targetData, proofs) + assert.Error(t, err) + // no target pubKey + _, _, _, err = clients[0].SendSurveyAggRequest(el, "testAggRequest", nil, targetData, proofs) + assert.Error(t, err) + // no terms to aggregate + emptyData := libunlynx.CipherText{} + _, _, _, err = clients[0].SendSurveyAggRequest(el, "testAggRequest", pubKeys[0], emptyData, proofs) + assert.Error(t, err) + + wg := libunlynx.StartParallelize(nbrClients) var mutex = sync.Mutex{} for i, client := range clients { go func(i int, client *servicesmedco.API) { defer wg.Done() - _, res, tr, err := client.SendSurveyAggRequest(el, servicesmedco.SurveyID("testAggRequest"), pubKeys[i], targetData, proofs) + _, res, tr, err := client.SendSurveyAggRequest(el, "testAggRequest", pubKeys[i], targetData, proofs) if err != nil { t.Fatal("Client", client.ClientID, " service did not start: ", err) } @@ -181,15 +230,16 @@ func TestServiceAgg(t *testing.T) { // Check result for _, res := range results { - assert.Equal(t, res, int64(10)) + assert.Equal(t, res, int64(nbrServers)) } } func TestServiceShuffle(t *testing.T) { // test with 10 servers - el, local := getParam(10) - // test with 10 concurrent clients - nbHosts := 10 + nbrServers := 3 + el, local := getParam(nbrServers) + // test with as many clients as servers + nbHosts := nbrServers clients := getClients(nbHosts, el) defer local.CloseAll() @@ -208,13 +258,31 @@ func TestServiceShuffle(t *testing.T) { targetData = append(targetData, *libunlynx.EncryptInt(el.Aggregate, int64(i))) } + // sanitization tests + // no SurveyID + _, _, _, err := clients[0].SendSurveyShuffleRequest(el, "", pubKeys[0], &targetData[0], proofs) + assert.Error(t, err) + // no Roster + emptyRoster := *el + emptyRoster.List = nil + _, _, _, err = clients[0].SendSurveyShuffleRequest(&emptyRoster, "testShuffleRequest", pubKeys[0], &targetData[0], proofs) + assert.Error(t, err) + // no target pubKey + _, _, _, err = clients[0].SendSurveyShuffleRequest(el, "testShuffleRequest", nil, &targetData[0], proofs) + assert.Error(t, err) + // no terms to aggregate + _, _, _, err = clients[0].SendSurveyShuffleRequest(el, "testShuffleRequest", pubKeys[0], nil, proofs) + assert.Error(t, err) + wg := libunlynx.StartParallelize(nbHosts) var mutex = sync.Mutex{} for i, client := range clients { go func(i int, client *servicesmedco.API) { defer wg.Done() - _, res, tr, err := client.SendSurveyShuffleRequest(el, servicesmedco.SurveyID("testShuffleRequest"), pubKeys[i], targetData[i], proofs) + var err error + _, res, tr, err := client.SendSurveyShuffleRequest(el, + servicesmedco.SurveyID("testShuffleRequest"), pubKeys[i], &targetData[i], proofs) if err != nil { t.Fatal("Client", client.ClientID, " service did not start: ", err) } @@ -222,7 +290,7 @@ func TestServiceShuffle(t *testing.T) { mutex.Lock() results[i] = libunlynx.DecryptInt(secKeys[i], res) mutex.Unlock() - log.Lvl1("Time:", tr.MapTR) + log.Lvl1(i, "Time:", tr.MapTR) }(i, client) } libunlynx.EndParallelize(wg) diff --git a/services/structs.go b/services/structs.go index 7f87e0f..51efe6b 100644 --- a/services/structs.go +++ b/services/structs.go @@ -1,14 +1,20 @@ package servicesmedco import ( + "fmt" "github.com/btcsuite/goleveldb/leveldb/errors" - "github.com/lca1/unlynx/lib" + "github.com/ldsec/unlynx/lib" "go.dedis.ch/kyber/v3" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/network" "time" ) +func init() { + network.RegisterMessage(ProtocolConfig{}) + network.RegisterMessage(SurveyDDTRequest{}) +} + // Name is the registered name for the medco service. const Name = "medco" @@ -52,7 +58,7 @@ const ( // ResultDDT will contain final results of the DDT of the query terms. type ResultDDT struct { Result []libunlynx.GroupingKey - TR TimeResults + TR map[string]time.Duration } // Result will contain the final results for the other queries @@ -64,6 +70,14 @@ type Result struct { // SurveyID unique ID for each survey. type SurveyID string +// ProtocolConfig holds the configuration that will be passed from node to +// node. It replaces the map and calling all nodes. +type ProtocolConfig struct { + SurveyID SurveyID + TypeQ string + Data []byte +} + // SurveyDDTRequest is the message used trigger the DDT of the query parameters type SurveyDDTRequest struct { SurveyID SurveyID @@ -74,7 +88,6 @@ type SurveyDDTRequest struct { Terms libunlynx.CipherVector // query terms // message handling - IntraMessage bool MessageSource *network.ServerIdentity } @@ -99,7 +112,6 @@ type SurveyShuffleRequest struct { KSTarget libunlynx.CipherVector // the final results to be key switched // message handling - IntraMessage bool MessageSource *network.ServerIdentity } @@ -114,14 +126,6 @@ type SurveyAggRequest struct { KSTarget libunlynx.CipherText // the final aggregated result to be key switched } -// SurveyTag is the struct that we persist in the service that contains all the data for the DDT protocol -type SurveyTag struct { - SurveyID SurveyID - Request SurveyDDTRequest - SurveyChannel chan int // To wait for the survey to be created before the DDT protocol - TR TimeResults -} - // SurveyKS is the struct that we persist in the service that contains all the data for the Key Switch request phase type SurveyKS struct { SurveyID SurveyID @@ -140,15 +144,9 @@ type SurveyShuffle struct { // SurveyAgg is the struct that we persist in the service that contains all the data for the Aggregation request phase type SurveyAgg struct { - SurveyID SurveyID - Request SurveyAggRequest - SurveyChannel chan int // To wait for all the aggregate results to be received by the root node - TR TimeResults -} - -// SurveyTagGenerated is used to ensure that all servers get the survey before starting the DDT protocol -type SurveyTagGenerated struct { SurveyID SurveyID + Request SurveyAggRequest + TR TimeResults } // SurveyShuffleGenerated is used to ensure that the root server creates the survey before all the other nodes send it their results @@ -161,15 +159,37 @@ type SurveyAggGenerated struct { SurveyID SurveyID } -func (s *Service) getSurveyTag(sid SurveyID) (SurveyTag, error) { - surv, err := s.MapSurveyTag.Get(string(sid)) +func (s *Service) deleteSurveyKS(sid SurveyID) (SurveyKS, error) { + surv, err := s.MapSurveyKS.Remove(string(sid)) + if err != nil { + return SurveyKS{}, errors.New("Error" + err.Error() + "while deleting surveyID: " + string(sid)) + } + if surv == nil { + return SurveyKS{}, errors.New("No entry in map with surveyID: " + string(sid)) + } + return surv.(SurveyKS), nil +} + +func (s *Service) deleteSurveyShuffle(sid SurveyID) (SurveyShuffle, error) { + surv, err := s.MapSurveyShuffle.Remove(string(sid)) + if err != nil { + return SurveyShuffle{}, errors.New("Error" + err.Error() + "while deleting surveyID: " + string(sid)) + } + if surv == nil { + return SurveyShuffle{}, errors.New("No entry in map with surveyID: " + string(sid)) + } + return surv.(SurveyShuffle), nil +} + +func (s *Service) deleteSurveyAgg(sid SurveyID) (SurveyAgg, error) { + surv, err := s.MapSurveyAgg.Remove(string(sid)) if err != nil { - return SurveyTag{}, errors.New("Error" + err.Error() + "while getting surveyID: " + string(sid)) + return SurveyAgg{}, errors.New("Error" + err.Error() + "while deleting surveyID: " + string(sid)) } if surv == nil { - return SurveyTag{}, errors.New("Empty map entry while getting surveyID: " + string(sid)) + return SurveyAgg{}, errors.New("No entry in map with surveyID: " + string(sid)) } - return surv.(SurveyTag), nil + return surv.(SurveyAgg), nil } func (s *Service) getSurveyKS(sid SurveyID) (SurveyKS, error) { @@ -205,11 +225,6 @@ func (s *Service) getSurveyAgg(sid SurveyID) (SurveyAgg, error) { return surv.(SurveyAgg), nil } -func (s *Service) putSurveyTag(sid SurveyID, surv SurveyTag) error { - _, err := s.MapSurveyTag.Put(string(sid), surv) - return err -} - func (s *Service) putSurveyKS(sid SurveyID, surv SurveyKS) error { _, err := s.MapSurveyKS.Put(string(sid), surv) return err @@ -224,3 +239,32 @@ func (s *Service) putSurveyAgg(sid SurveyID, surv SurveyAgg) error { _, err := s.MapSurveyAgg.Put(string(sid), surv) return err } + +func unmarshalProtocolConfig(buf []byte) (pc ProtocolConfig, err error) { + _, pcInt, err := network.Unmarshal(buf, libunlynx.SuiTe) + if err != nil { + return + } + pc = *pcInt.(*ProtocolConfig) + return +} + +func newProtocolConfig(sid SurveyID, tq string, data interface{}) ( + pc ProtocolConfig, err error) { + pc.SurveyID = sid + pc.TypeQ = tq + pc.Data, err = network.Marshal(data) + return +} + +func (pc ProtocolConfig) getConfig() (gc onet.GenericConfig, err error) { + gc.Data, err = network.Marshal(&pc) + return +} + +func (pc ProtocolConfig) getTarget() SurveyID { + if pc.TypeQ == "" { + return pc.SurveyID + } + return SurveyID(fmt.Sprintf("%s/%s", pc.SurveyID, pc.TypeQ)) +}