summaryrefslogtreecommitdiff
path: root/libnetwork/cmd
diff options
context:
space:
mode:
authorFlavio Crisciani <flavio.crisciani@docker.com>2018-05-24 15:22:58 -0700
committerFlavio Crisciani <flavio.crisciani@docker.com>2018-05-29 08:03:32 -0700
commit1784a46e64b3ac5d152fd59c7a110f8d78570317 (patch)
tree1cd07059d89438843f8b4de551a8630a614f153d /libnetwork/cmd
parent55567d88e463f2cfddce2ee1bdbde5f50e09a623 (diff)
downloaddocker-1784a46e64b3ac5d152fd59c7a110f8d78570317.tar.gz
Enable network-db test image creation
Updated makefile Moved binaries in the bin/ directory Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
Diffstat (limited to 'libnetwork/cmd')
-rw-r--r--libnetwork/cmd/networkdb-test/Dockerfile9
-rw-r--r--libnetwork/cmd/networkdb-test/README15
-rw-r--r--libnetwork/cmd/networkdb-test/dbclient/ndbClient.go693
-rw-r--r--libnetwork/cmd/networkdb-test/dbserver/ndbServer.go111
-rw-r--r--libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go120
-rw-r--r--libnetwork/cmd/networkdb-test/testMain.go24
6 files changed, 972 insertions, 0 deletions
diff --git a/libnetwork/cmd/networkdb-test/Dockerfile b/libnetwork/cmd/networkdb-test/Dockerfile
new file mode 100644
index 0000000000..f30a6b423c
--- /dev/null
+++ b/libnetwork/cmd/networkdb-test/Dockerfile
@@ -0,0 +1,9 @@
+FROM alpine
+
+RUN apk --no-cache add curl
+
+COPY testMain /app/
+
+WORKDIR app
+
+ENTRYPOINT ["/app/testMain"]
diff --git a/libnetwork/cmd/networkdb-test/README b/libnetwork/cmd/networkdb-test/README
new file mode 100644
index 0000000000..18227f94aa
--- /dev/null
+++ b/libnetwork/cmd/networkdb-test/README
@@ -0,0 +1,15 @@
+SERVER
+
+cd test/networkdb
+env GOOS=linux go build -v testMain.go && docker build -t dockereng/e2e-networkdb .
+(only for testkit case) docker push dockereng/e2e-networkdb
+
+Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 dockereng/e2e-networkdb server 8000
+
+CLIENT
+
+cd test/networkdb
+Join cluster: docker run -it --network net1 dockereng/e2e-networkdb client join testdb 8000
+Join network: docker run -it --network net1 dockereng/e2e-networkdb client join-network testdb 8000 test
+Run test: docker run -it --network net1 dockereng/e2e-networkdb client write-delete-unique-keys testdb 8000 test tableBla 3 10
+check table: curl "localhost:32768/gettable?nid=test&tname=table_name"
diff --git a/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go b/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go
new file mode 100644
index 0000000000..e2574fc3cd
--- /dev/null
+++ b/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go
@@ -0,0 +1,693 @@
+package dbclient
+
+import (
+ "context"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/sirupsen/logrus"
+)
+
+var servicePort string
+
+const totalWrittenKeys string = "totalKeys"
+
+type resultTuple struct {
+ id string
+ result int
+}
+
+func httpGetFatalError(ip, port, path string) {
+ // for {
+ body, err := httpGet(ip, port, path)
+ if err != nil || !strings.Contains(string(body), "OK") {
+ // if strings.Contains(err.Error(), "EOF") {
+ // logrus.Warnf("Got EOF path:%s err:%s", path, err)
+ // continue
+ // }
+ log.Fatalf("[%s] error %s %s", path, err, body)
+ }
+ // break
+ // }
+}
+
+func httpGet(ip, port, path string) ([]byte, error) {
+ resp, err := http.Get("http://" + ip + ":" + port + path)
+ if err != nil {
+ logrus.Errorf("httpGet error:%s", err)
+ return nil, err
+ }
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+ return body, err
+}
+
+func joinCluster(ip, port string, members []string, doneCh chan resultTuple) {
+ httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ","))
+
+ if doneCh != nil {
+ doneCh <- resultTuple{id: ip, result: 0}
+ }
+}
+
+func joinNetwork(ip, port, network string, doneCh chan resultTuple) {
+ httpGetFatalError(ip, port, "/joinnetwork?nid="+network)
+
+ if doneCh != nil {
+ doneCh <- resultTuple{id: ip, result: 0}
+ }
+}
+
+func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
+ httpGetFatalError(ip, port, "/leavenetwork?nid="+network)
+
+ if doneCh != nil {
+ doneCh <- resultTuple{id: ip, result: 0}
+ }
+}
+
+func writeTableKey(ip, port, networkName, tableName, key string) {
+ createPath := "/createentry?unsafe&nid=" + networkName + "&tname=" + tableName + "&value=v&key="
+ httpGetFatalError(ip, port, createPath+key)
+}
+
+func deleteTableKey(ip, port, networkName, tableName, key string) {
+ deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key="
+ httpGetFatalError(ip, port, deletePath+key)
+}
+
+func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
+ body, err := httpGet(ip, port, "/clusterpeers")
+
+ if err != nil {
+ logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
+ doneCh <- resultTuple{id: ip, result: -1}
+ return
+ }
+ peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
+ peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
+
+ doneCh <- resultTuple{id: ip, result: peersNum}
+}
+
+func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
+ body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
+
+ if err != nil {
+ logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
+ doneCh <- resultTuple{id: ip, result: -1}
+ return
+ }
+ peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
+ peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
+
+ doneCh <- resultTuple{id: ip, result: peersNum}
+}
+
+func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
+ body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
+
+ if err != nil {
+ logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
+ doneCh <- resultTuple{id: ip, result: -1}
+ return
+ }
+ elementsRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
+ entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
+ doneCh <- resultTuple{id: ip, result: entriesNum}
+}
+
+func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
+ httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
+ if doneCh != nil {
+ doneCh <- resultTuple{id: ip, result: 0}
+ }
+}
+
+func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
+ body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
+
+ if err != nil {
+ logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
+ doneCh <- resultTuple{id: ip, result: -1}
+ return
+ }
+ elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
+ entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
+ doneCh <- resultTuple{id: ip, result: entriesNum}
+}
+
+func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
+ for x := 0; ; x++ {
+ select {
+ case <-ctx.Done():
+ doneCh <- resultTuple{id: ip, result: x}
+ return
+ default:
+ k := key + strconv.Itoa(x)
+ // write key
+ writeTableKey(ip, port, networkName, tableName, k)
+ // give time to send out key writes
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+}
+
+func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
+ for x := 0; ; x++ {
+ select {
+ case <-ctx.Done():
+ doneCh <- resultTuple{id: ip, result: x}
+ return
+ default:
+ k := key + strconv.Itoa(x)
+ // write key
+ writeTableKey(ip, port, networkName, tableName, k)
+ // give time to send out key writes
+ time.Sleep(100 * time.Millisecond)
+ // delete key
+ deleteTableKey(ip, port, networkName, tableName, k)
+ }
+ }
+}
+
+func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
+ for x := 0; ; x++ {
+ select {
+ case <-ctx.Done():
+ doneCh <- resultTuple{id: ip, result: x}
+ return
+ default:
+ k := key + strconv.Itoa(x)
+ // write key
+ writeTableKey(ip, port, networkName, tableName, k)
+ time.Sleep(100 * time.Millisecond)
+ // delete key
+ deleteTableKey(ip, port, networkName, tableName, k)
+ // give some time
+ time.Sleep(100 * time.Millisecond)
+ // leave network
+ leaveNetwork(ip, port, networkName, nil)
+ // join network
+ joinNetwork(ip, port, networkName, nil)
+ }
+ }
+}
+
+func ready(ip, port string, doneCh chan resultTuple) {
+ for {
+ body, err := httpGet(ip, port, "/ready")
+ if err != nil || !strings.Contains(string(body), "OK") {
+ time.Sleep(500 * time.Millisecond)
+ continue
+ }
+ // success
+ break
+ }
+ // notify the completion
+ doneCh <- resultTuple{id: ip, result: 0}
+}
+
+func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
+ startTime := time.Now().UnixNano()
+ var successTime int64
+
+ // Loop for 2 minutes to guartee that the result is stable
+ for {
+ select {
+ case <-ctx.Done():
+ // Validate test success, if the time is set means that all the tables are empty
+ if successTime != 0 {
+ logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
+ return
+ }
+ log.Fatal("Test failed, there is still entries in the tables of the nodes")
+ default:
+ logrus.Infof("Checking table %s expected %d", tableName, expectedEntries)
+ doneCh := make(chan resultTuple, len(ips))
+ for _, ip := range ips {
+ go fn(ip, servicePort, networkName, tableName, doneCh)
+ }
+
+ nodesWithCorrectEntriesNum := 0
+ for i := len(ips); i > 0; i-- {
+ tableEntries := <-doneCh
+ logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result)
+ if tableEntries.result == expectedEntries {
+ nodesWithCorrectEntriesNum++
+ }
+ }
+ close(doneCh)
+ if nodesWithCorrectEntriesNum == len(ips) {
+ if successTime == 0 {
+ successTime = time.Now().UnixNano()
+ logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond)
+ }
+ } else {
+ successTime = 0
+ }
+ time.Sleep(10 * time.Second)
+ }
+ }
+}
+
+func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int {
+ var totalKeys int
+ resultTable := make(map[string]int)
+ for i := 0; i < parallelWriters; i++ {
+ logrus.Infof("Waiting for %d workers", parallelWriters-i)
+ workerReturn := <-doneCh
+ totalKeys += workerReturn.result
+ if mustWrite && workerReturn.result == 0 {
+ log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result)
+ }
+ if !mustWrite && workerReturn.result != 0 {
+ log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result)
+ }
+ if mustWrite {
+ resultTable[workerReturn.id] = workerReturn.result
+ logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result)
+ }
+ }
+ resultTable[totalWrittenKeys] = totalKeys
+ return resultTable
+}
+
+// ready
+func doReady(ips []string) {
+ doneCh := make(chan resultTuple, len(ips))
+ // check all the nodes
+ for _, ip := range ips {
+ go ready(ip, servicePort, doneCh)
+ }
+ // wait for the readiness of all nodes
+ for i := len(ips); i > 0; i-- {
+ <-doneCh
+ }
+ close(doneCh)
+}
+
+// join
+func doJoin(ips []string) {
+ doneCh := make(chan resultTuple, len(ips))
+ // check all the nodes
+ for i, ip := range ips {
+ members := append([]string(nil), ips[:i]...)
+ members = append(members, ips[i+1:]...)
+ go joinCluster(ip, servicePort, members, doneCh)
+ }
+ // wait for the readiness of all nodes
+ for i := len(ips); i > 0; i-- {
+ <-doneCh
+ }
+ close(doneCh)
+}
+
+// cluster-peers expectedNumberPeers
+func doClusterPeers(ips []string, args []string) {
+ doneCh := make(chan resultTuple, len(ips))
+ expectedPeers, _ := strconv.Atoi(args[0])
+ // check all the nodes
+ for _, ip := range ips {
+ go clusterPeersNumber(ip, servicePort, doneCh)
+ }
+ // wait for the readiness of all nodes
+ for i := len(ips); i > 0; i-- {
+ node := <-doneCh
+ if node.result != expectedPeers {
+ log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
+ }
+ }
+ close(doneCh)
+}
+
+// join-network networkName
+func doJoinNetwork(ips []string, args []string) {
+ doneCh := make(chan resultTuple, len(ips))
+ // check all the nodes
+ for _, ip := range ips {
+ go joinNetwork(ip, servicePort, args[0], doneCh)
+ }
+ // wait for the readiness of all nodes
+ for i := len(ips); i > 0; i-- {
+ <-doneCh
+ }
+ close(doneCh)
+}
+
+// leave-network networkName
+func doLeaveNetwork(ips []string, args []string) {
+ doneCh := make(chan resultTuple, len(ips))
+ // check all the nodes
+ for _, ip := range ips {
+ go leaveNetwork(ip, servicePort, args[0], doneCh)
+ }
+ // wait for the readiness of all nodes
+ for i := len(ips); i > 0; i-- {
+ <-doneCh
+ }
+ close(doneCh)
+}
+
+// cluster-peers networkName expectedNumberPeers maxRetry
+func doNetworkPeers(ips []string, args []string) {
+ doneCh := make(chan resultTuple, len(ips))
+ networkName := args[0]
+ expectedPeers, _ := strconv.Atoi(args[1])
+ maxRetry, _ := strconv.Atoi(args[2])
+ for retry := 0; retry < maxRetry; retry++ {
+ // check all the nodes
+ for _, ip := range ips {
+ go networkPeersNumber(ip, servicePort, networkName, doneCh)
+ }
+ // wait for the readiness of all nodes
+ for i := len(ips); i > 0; i-- {
+ node := <-doneCh
+ if node.result != expectedPeers {
+ if retry == maxRetry-1 {
+ log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
+ } else {
+ logrus.Warnf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
+ }
+ time.Sleep(1 * time.Second)
+ }
+ }
+ }
+ close(doneCh)
+}
+
+// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
+func doWriteDeleteUniqueKeys(ips []string, args []string) {
+ networkName := args[0]
+ tableName := args[1]
+ parallelWriters, _ := strconv.Atoi(args[2])
+ writeTimeSec, _ := strconv.Atoi(args[3])
+
+ doneCh := make(chan resultTuple, parallelWriters)
+ // Enable watch of tables from clients
+ for i := 0; i < parallelWriters; i++ {
+ go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
+ }
+ waitWriters(parallelWriters, false, doneCh)
+
+ // Start parallel writers that will create and delete unique keys
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+ for i := 0; i < parallelWriters; i++ {
+ key := "key-" + strconv.Itoa(i) + "-"
+ logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+ go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+ }
+
+ // Sync with all the writers
+ keyMap := waitWriters(parallelWriters, true, doneCh)
+ cancel()
+ logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+ // check table entries for 2 minutes
+ ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+ checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+ cancel()
+ ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
+ checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
+ cancel()
+}
+
+// write-unique-keys networkName tableName numParallelWriters writeTimeSec
+func doWriteUniqueKeys(ips []string, args []string) {
+ networkName := args[0]
+ tableName := args[1]
+ parallelWriters, _ := strconv.Atoi(args[2])
+ writeTimeSec, _ := strconv.Atoi(args[3])
+
+ doneCh := make(chan resultTuple, parallelWriters)
+ // Enable watch of tables from clients
+ for i := 0; i < parallelWriters; i++ {
+ go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
+ }
+ waitWriters(parallelWriters, false, doneCh)
+
+ // Start parallel writers that will create and delete unique keys
+ defer close(doneCh)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+ for i := 0; i < parallelWriters; i++ {
+ key := "key-" + strconv.Itoa(i) + "-"
+ logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+ go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+ }
+
+ // Sync with all the writers
+ keyMap := waitWriters(parallelWriters, true, doneCh)
+ cancel()
+ logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+ // check table entries for 2 minutes
+ ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+ checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
+ cancel()
+}
+
+// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
+func doWriteDeleteLeaveJoin(ips []string, args []string) {
+ networkName := args[0]
+ tableName := args[1]
+ parallelWriters, _ := strconv.Atoi(args[2])
+ writeTimeSec, _ := strconv.Atoi(args[3])
+
+ // Start parallel writers that will create and delete unique keys
+ doneCh := make(chan resultTuple, parallelWriters)
+ defer close(doneCh)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+ for i := 0; i < parallelWriters; i++ {
+ key := "key-" + strconv.Itoa(i) + "-"
+ logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+ go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+ }
+
+ // Sync with all the writers
+ keyMap := waitWriters(parallelWriters, true, doneCh)
+ cancel()
+ logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"])
+
+ // check table entries for 2 minutes
+ ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+ checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+ cancel()
+}
+
+// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
+func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
+ networkName := args[0]
+ tableName := args[1]
+ parallelWriters, _ := strconv.Atoi(args[2])
+ writeTimeSec, _ := strconv.Atoi(args[3])
+
+ // Start parallel writers that will create and delete unique keys
+ doneCh := make(chan resultTuple, parallelWriters)
+ defer close(doneCh)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+ for i := 0; i < parallelWriters; i++ {
+ key := "key-" + strconv.Itoa(i) + "-"
+ logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+ go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+ }
+
+ // Sync with all the writers
+ keyMap := waitWriters(parallelWriters, true, doneCh)
+ cancel()
+ logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+ // The writers will leave the network
+ for i := 0; i < parallelWriters; i++ {
+ logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
+ go leaveNetwork(ips[i], servicePort, networkName, doneCh)
+ }
+ waitWriters(parallelWriters, false, doneCh)
+
+ // Give some time
+ time.Sleep(100 * time.Millisecond)
+
+ // The writers will join the network
+ for i := 0; i < parallelWriters; i++ {
+ logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
+ go joinNetwork(ips[i], servicePort, networkName, doneCh)
+ }
+ waitWriters(parallelWriters, false, doneCh)
+
+ // check table entries for 2 minutes
+ ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+ checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+ cancel()
+}
+
+// write-wait-leave networkName tableName numParallelWriters writeTimeSec
+func doWriteWaitLeave(ips []string, args []string) {
+ networkName := args[0]
+ tableName := args[1]
+ parallelWriters, _ := strconv.Atoi(args[2])
+ writeTimeSec, _ := strconv.Atoi(args[3])
+
+ // Start parallel writers that will create and delete unique keys
+ doneCh := make(chan resultTuple, parallelWriters)
+ defer close(doneCh)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+ for i := 0; i < parallelWriters; i++ {
+ key := "key-" + strconv.Itoa(i) + "-"
+ logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+ go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+ }
+
+ // Sync with all the writers
+ keyMap := waitWriters(parallelWriters, true, doneCh)
+ cancel()
+ logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+ // The writers will leave the network
+ for i := 0; i < parallelWriters; i++ {
+ logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
+ go leaveNetwork(ips[i], servicePort, networkName, doneCh)
+ }
+ waitWriters(parallelWriters, false, doneCh)
+
+ // check table entries for 2 minutes
+ ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+ checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+ cancel()
+}
+
+// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
+func doWriteWaitLeaveJoin(ips []string, args []string) {
+ networkName := args[0]
+ tableName := args[1]
+ parallelWriters, _ := strconv.Atoi(args[2])
+ writeTimeSec, _ := strconv.Atoi(args[3])
+ parallelLeaver, _ := strconv.Atoi(args[4])
+
+ // Start parallel writers that will create and delete unique keys
+ doneCh := make(chan resultTuple, parallelWriters)
+ defer close(doneCh)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+ for i := 0; i < parallelWriters; i++ {
+ key := "key-" + strconv.Itoa(i) + "-"
+ logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+ go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+ }
+
+ // Sync with all the writers
+ keyMap := waitWriters(parallelWriters, true, doneCh)
+ cancel()
+ logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+ keysExpected := keyMap[totalWrittenKeys]
+ // The Leavers will leave the network
+ for i := 0; i < parallelLeaver; i++ {
+ logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
+ go leaveNetwork(ips[i], servicePort, networkName, doneCh)
+ // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
+ keysExpected -= keyMap[ips[i]]
+ }
+ waitWriters(parallelLeaver, false, doneCh)
+
+ // Give some time
+ time.Sleep(100 * time.Millisecond)
+
+ // The writers will join the network
+ for i := 0; i < parallelLeaver; i++ {
+ logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
+ go joinNetwork(ips[i], servicePort, networkName, doneCh)
+ }
+ waitWriters(parallelLeaver, false, doneCh)
+
+ // check table entries for 2 minutes
+ ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+ checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
+ cancel()
+}
+
+var cmdArgChec = map[string]int{
+ "debug": 0,
+ "fail": 0,
+ "ready": 2,
+ "join": 2,
+ "leave": 2,
+ "join-network": 3,
+ "leave-network": 3,
+ "cluster-peers": 3,
+ "write-delete-unique-keys": 4,
+}
+
+// Client is a client
+func Client(args []string) {
+ logrus.Infof("[CLIENT] Starting with arguments %v", args)
+ command := args[0]
+
+ if len(args) < cmdArgChec[command] {
+ log.Fatalf("Command %s requires %d arguments, aborting...", command, cmdArgChec[command])
+ }
+
+ switch command {
+ case "debug":
+ time.Sleep(1 * time.Hour)
+ os.Exit(0)
+ case "fail":
+ log.Fatalf("Test error condition with message: error error error")
+ }
+
+ serviceName := args[1]
+ ips, _ := net.LookupHost("tasks." + serviceName)
+ logrus.Infof("got the ips %v", ips)
+ if len(ips) == 0 {
+ log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName)
+ }
+ servicePort = args[2]
+ commandArgs := args[3:]
+ logrus.Infof("Executing %s with args:%v", command, commandArgs)
+ switch command {
+ case "ready":
+ doReady(ips)
+ case "join":
+ doJoin(ips)
+ case "leave":
+
+ case "cluster-peers":
+ // cluster-peers
+ doClusterPeers(ips, commandArgs)
+
+ case "join-network":
+ // join-network networkName
+ doJoinNetwork(ips, commandArgs)
+ case "leave-network":
+ // leave-network networkName
+ doLeaveNetwork(ips, commandArgs)
+ case "network-peers":
+ // network-peers networkName maxRetry
+ doNetworkPeers(ips, commandArgs)
+
+ case "write-unique-keys":
+ // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
+ doWriteUniqueKeys(ips, commandArgs)
+ case "write-delete-unique-keys":
+ // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
+ doWriteDeleteUniqueKeys(ips, commandArgs)
+ case "write-delete-leave-join":
+ // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
+ doWriteDeleteLeaveJoin(ips, commandArgs)
+ case "write-delete-wait-leave-join":
+ // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
+ doWriteDeleteWaitLeaveJoin(ips, commandArgs)
+ case "write-wait-leave":
+ // write-wait-leave networkName tableName numParallelWriters writeTimeSec
+ doWriteWaitLeave(ips, commandArgs)
+ case "write-wait-leave-join":
+ // write-wait-leave networkName tableName numParallelWriters writeTimeSec
+ doWriteWaitLeaveJoin(ips, commandArgs)
+ default:
+ log.Fatalf("Command %s not recognized", command)
+ }
+}
diff --git a/libnetwork/cmd/networkdb-test/dbserver/ndbServer.go b/libnetwork/cmd/networkdb-test/dbserver/ndbServer.go
new file mode 100644
index 0000000000..322228270f
--- /dev/null
+++ b/libnetwork/cmd/networkdb-test/dbserver/ndbServer.go
@@ -0,0 +1,111 @@
+package dbserver
+
+import (
+ "errors"
+ "fmt"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "strconv"
+
+ "github.com/docker/libnetwork/cmd/networkdb-test/dummyclient"
+ "github.com/docker/libnetwork/diagnostic"
+ "github.com/docker/libnetwork/networkdb"
+ "github.com/sirupsen/logrus"
+)
+
+var nDB *networkdb.NetworkDB
+var server *diagnostic.Server
+var ipAddr string
+
+var testerPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
+ "/myip": ipaddress,
+}
+
+func ipaddress(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "%s\n", ipAddr)
+}
+
+// Server starts the server
+func Server(args []string) {
+ logrus.Infof("[SERVER] Starting with arguments %v", args)
+ if len(args) < 1 {
+ log.Fatal("Port number is a mandatory argument, aborting...")
+ }
+ port, _ := strconv.Atoi(args[0])
+ var localNodeName string
+ var ok bool
+ if localNodeName, ok = os.LookupEnv("TASK_ID"); !ok {
+ log.Fatal("TASK_ID environment variable not set, aborting...")
+ }
+ logrus.Infof("[SERVER] Starting node %s on port %d", localNodeName, port)
+
+ ip, err := getIPInterface("eth0")
+ if err != nil {
+ logrus.Errorf("%s There was a problem with the IP %s\n", localNodeName, err)
+ return
+ }
+ ipAddr = ip
+ logrus.Infof("%s uses IP %s\n", localNodeName, ipAddr)
+
+ server = diagnostic.New()
+ server.Init()
+ conf := networkdb.DefaultConfig()
+ conf.Hostname = localNodeName
+ conf.AdvertiseAddr = ipAddr
+ conf.BindAddr = ipAddr
+ nDB, err = networkdb.New(conf)
+ if err != nil {
+ logrus.Infof("%s error in the DB init %s\n", localNodeName, err)
+ return
+ }
+
+ // Register network db handlers
+ server.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
+ server.RegisterHandler(nil, testerPaths2Func)
+ server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func)
+ server.EnableDiagnostic("", port)
+ // block here
+ select {}
+}
+
+func getIPInterface(name string) (string, error) {
+ ifaces, err := net.Interfaces()
+ if err != nil {
+ return "", err
+ }
+ for _, iface := range ifaces {
+ if iface.Name != name {
+ continue // not the name specified
+ }
+
+ if iface.Flags&net.FlagUp == 0 {
+ return "", errors.New("Interfaces is down")
+ }
+
+ addrs, err := iface.Addrs()
+ if err != nil {
+ return "", err
+ }
+ for _, addr := range addrs {
+ var ip net.IP
+ switch v := addr.(type) {
+ case *net.IPNet:
+ ip = v.IP
+ case *net.IPAddr:
+ ip = v.IP
+ }
+ if ip == nil || ip.IsLoopback() {
+ continue
+ }
+ ip = ip.To4()
+ if ip == nil {
+ continue
+ }
+ return ip.String(), nil
+ }
+ return "", errors.New("Interfaces does not have a valid IPv4")
+ }
+ return "", errors.New("Interface not found")
+}
diff --git a/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go b/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go
new file mode 100644
index 0000000000..7bec1c8b02
--- /dev/null
+++ b/libnetwork/cmd/networkdb-test/dummyclient/dummyClient.go
@@ -0,0 +1,120 @@
+package dummyclient
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+
+ events "github.com/docker/go-events"
+ "github.com/docker/libnetwork/diagnostic"
+ "github.com/docker/libnetwork/networkdb"
+ "github.com/sirupsen/logrus"
+)
+
+// DummyClientPaths2Func exported paths for the client
+var DummyClientPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
+ "/watchtable": watchTable,
+ "/watchedtableentries": watchTableEntries,
+}
+
+const (
+ missingParameter = "missing parameter"
+)
+
+type tableHandler struct {
+ cancelWatch func()
+ entries map[string]string
+}
+
+var clientWatchTable = map[string]tableHandler{}
+
+func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ diagnostic.DebugHTTPForm(r)
+ if len(r.Form["tname"]) < 1 {
+ rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
+ diagnostic.HTTPReply(w, rsp, &diagnostic.JSONOutput{})
+ return
+ }
+
+ tableName := r.Form["tname"][0]
+ if _, ok := clientWatchTable[tableName]; ok {
+ fmt.Fprintf(w, "OK\n")
+ return
+ }
+
+ nDB, ok := ctx.(*networkdb.NetworkDB)
+ if ok {
+ ch, cancel := nDB.Watch(tableName, "", "")
+ clientWatchTable[tableName] = tableHandler{cancelWatch: cancel, entries: make(map[string]string)}
+ go handleTableEvents(tableName, ch)
+
+ fmt.Fprintf(w, "OK\n")
+ }
+}
+
+func watchTableEntries(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ diagnostic.DebugHTTPForm(r)
+ if len(r.Form["tname"]) < 1 {
+ rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
+ diagnostic.HTTPReply(w, rsp, &diagnostic.JSONOutput{})
+ return
+ }
+
+ tableName := r.Form["tname"][0]
+ table, ok := clientWatchTable[tableName]
+ if !ok {
+ fmt.Fprintf(w, "Table %s not watched\n", tableName)
+ return
+ }
+
+ fmt.Fprintf(w, "total elements: %d\n", len(table.entries))
+ i := 0
+ for k, v := range table.entries {
+ fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, v)
+ i++
+ }
+}
+
+func handleTableEvents(tableName string, ch *events.Channel) {
+ var (
+ // nid string
+ eid string
+ value []byte
+ isAdd bool
+ )
+
+ logrus.Infof("Started watching table:%s", tableName)
+ for {
+ select {
+ case <-ch.Done():
+ logrus.Infof("End watching %s", tableName)
+ return
+
+ case evt := <-ch.C:
+ logrus.Infof("Recevied new event on:%s", tableName)
+ switch event := evt.(type) {
+ case networkdb.CreateEvent:
+ // nid = event.NetworkID
+ eid = event.Key
+ value = event.Value
+ isAdd = true
+ case networkdb.DeleteEvent:
+ // nid = event.NetworkID
+ eid = event.Key
+ value = event.Value
+ isAdd = false
+ default:
+ log.Fatalf("Unexpected table event = %#v", event)
+ }
+ if isAdd {
+ // logrus.Infof("Add %s %s", tableName, eid)
+ clientWatchTable[tableName].entries[eid] = string(value)
+ } else {
+ // logrus.Infof("Del %s %s", tableName, eid)
+ delete(clientWatchTable[tableName].entries, eid)
+ }
+ }
+ }
+}
diff --git a/libnetwork/cmd/networkdb-test/testMain.go b/libnetwork/cmd/networkdb-test/testMain.go
new file mode 100644
index 0000000000..0cd8c29942
--- /dev/null
+++ b/libnetwork/cmd/networkdb-test/testMain.go
@@ -0,0 +1,24 @@
+package main
+
+import (
+ "log"
+ "os"
+
+ "github.com/docker/libnetwork/cmd/networkdb-test/dbclient"
+ "github.com/docker/libnetwork/cmd/networkdb-test/dbserver"
+ "github.com/sirupsen/logrus"
+)
+
+func main() {
+ logrus.Infof("Starting the image with these args: %v", os.Args)
+ if len(os.Args) < 1 {
+ log.Fatal("You need at least 1 argument [client/server]")
+ }
+
+ switch os.Args[1] {
+ case "server":
+ dbserver.Server(os.Args[2:])
+ case "client":
+ dbclient.Client(os.Args[2:])
+ }
+}