Skip to content

Commit

Permalink
Use client-go for applying MachineDeployments and for waiting machine…
Browse files Browse the repository at this point in the history
…-controller to become Running (#204)

* Use client-go for creating MachineDeployments

* Use client-go for waiting for machine-controller

* Sync vendor

* Rename Interface -> Client

* Increase delay and timeout

* Plural client argument names
  • Loading branch information
xmudrii authored and kubermatic-bot committed Feb 15, 2019
1 parent a49b5d0 commit 7ddba2f
Show file tree
Hide file tree
Showing 20 changed files with 1,388 additions and 124 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 0 additions & 10 deletions pkg/installer/installation/prerequisites.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/kubermatic/kubeone/pkg/config"
"github.com/kubermatic/kubeone/pkg/installer/util"
"github.com/kubermatic/kubeone/pkg/ssh"
"github.com/kubermatic/kubeone/pkg/templates/machinecontroller"
)

const dockerVersion = "18.09.2"
Expand All @@ -23,15 +22,6 @@ func installPrerequisites(ctx *util.Context) error {

func generateConfigurationFiles(ctx *util.Context) error {
ctx.Configuration.AddFile("cfg/cloud-config", ctx.Cluster.Provider.CloudConfig)

if len(ctx.Cluster.Workers) > 0 {
machines, deployErr := machinecontroller.MachineDeployments(ctx.Cluster)
if deployErr != nil {
return fmt.Errorf("failed to create worker machine configuration: %v", deployErr)
}
ctx.Configuration.AddFile("workers.yaml", machines)
}

return nil
}

Expand Down
44 changes: 11 additions & 33 deletions pkg/installer/installation/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"fmt"
"time"

"github.com/kubermatic/kubeone/pkg/config"
"github.com/kubermatic/kubeone/pkg/installer/util"
"github.com/kubermatic/kubeone/pkg/ssh"
"github.com/kubermatic/kubeone/pkg/templates/machinecontroller"
)

Expand All @@ -16,37 +14,17 @@ func createWorkerMachines(ctx *util.Context) error {
return nil
}

return ctx.RunTaskOnLeader(func(ctx *util.Context, _ *config.HostConfig, conn ssh.Connection) error {
ctx.Logger.Infoln("Waiting for machine-controller to come up…")

cmd := fmt.Sprintf(
`kubectl -n "%s" get pods -l '%s=%s' -o jsonpath='{.items[0].status.phase}'`,
machinecontroller.WebhookNamespace,
machinecontroller.WebhookAppLabelKey,
machinecontroller.WebhookAppLabelValue,
)
if !ctx.Runner.WaitForCondition(cmd, 1*time.Minute, util.IsRunning) {
return errors.New("machine-controller-webhook did not come up")
}

cmd = fmt.Sprintf(
`kubectl -n "%s" get pods -l '%s=%s' -o jsonpath='{.items[0].status.phase}'`,
machinecontroller.MachineControllerNamespace,
machinecontroller.MachineControllerAppLabelKey,
machinecontroller.MachineControllerAppLabelValue,
)
if !ctx.Runner.WaitForCondition(cmd, 1*time.Minute, util.IsRunning) {
return errors.New("machine-controller did not come up")
}

// it can still take a bit before the MC is actually ready
time.Sleep(10 * time.Second)
ctx.Logger.Infoln("Waiting for machine-controller to come up…")
if err := machinecontroller.WaitForWebhook(ctx.Clientset.CoreV1()); err != nil {
return fmt.Errorf("machine-controller-webhook did not come up: %v", err)
}
if err := machinecontroller.WaitForMachineController(ctx.Clientset.CoreV1()); err != nil {
return errors.New("machine-controller did not come up")
}

ctx.Logger.Infoln("Creating worker machines…")
_, _, err := ctx.Runner.Run(`kubectl apply -f ./{{ .WORK_DIR }}/workers.yaml`, util.TemplateVariables{
"WORK_DIR": ctx.WorkDir,
})
// it can still take a bit before the MC is actually ready
time.Sleep(10 * time.Second)

return err
})
ctx.Logger.Infoln("Creating worker machines…")
return machinecontroller.DeployMachineDeployments(ctx)
}
19 changes: 11 additions & 8 deletions pkg/templates/ark/ark.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func Deploy(ctx *util.Context) error {
if ctx.APIExtensionClientset == nil {
return errors.New("kubernetes apiextension clientset not initialized")
}
if ctx.RESTConfig == nil {
return errors.New("kubernetes rest config not initialized")
}

// Kubernetes clientsets
coreClient := ctx.Clientset.CoreV1()
Expand Down Expand Up @@ -118,10 +121,10 @@ func Deploy(ctx *util.Context) error {
return nil
}

func ensureBackupStorageLocation(backupLocationInterface arkclientset.BackupStorageLocationInterface, required *arkv1.BackupStorageLocation) error {
existing, err := backupLocationInterface.Get(required.Name, metav1.GetOptions{})
func ensureBackupStorageLocation(backupStorageLocationsClient arkclientset.BackupStorageLocationInterface, required *arkv1.BackupStorageLocation) error {
existing, err := backupStorageLocationsClient.Get(required.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err = backupLocationInterface.Create(required)
_, err = backupStorageLocationsClient.Create(required)
return err
}
if err != nil {
Expand All @@ -135,14 +138,14 @@ func ensureBackupStorageLocation(backupLocationInterface arkclientset.BackupStor
return nil
}

_, err = backupLocationInterface.Update(existing)
_, err = backupStorageLocationsClient.Update(existing)
return err
}

func ensureVolumeSnapshotLocation(snapshotLocationInterface arkclientset.VolumeSnapshotLocationInterface, required *arkv1.VolumeSnapshotLocation) error {
existing, err := snapshotLocationInterface.Get(required.Name, metav1.GetOptions{})
func ensureVolumeSnapshotLocation(volumeSnapshotLocationsClient arkclientset.VolumeSnapshotLocationInterface, required *arkv1.VolumeSnapshotLocation) error {
existing, err := volumeSnapshotLocationsClient.Get(required.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err = snapshotLocationInterface.Create(required)
_, err = volumeSnapshotLocationsClient.Create(required)
return err
}
if err != nil {
Expand All @@ -156,6 +159,6 @@ func ensureVolumeSnapshotLocation(snapshotLocationInterface arkclientset.VolumeS
return nil
}

_, err = snapshotLocationInterface.Update(existing)
_, err = volumeSnapshotLocationsClient.Update(existing)
return err
}
20 changes: 20 additions & 0 deletions pkg/templates/machinecontroller/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package machinecontroller

import (
"errors"
"fmt"
"net"
"time"

"github.com/kubermatic/kubeone/pkg/config"
"github.com/kubermatic/kubeone/pkg/installer/util"
Expand All @@ -14,6 +16,8 @@ import (
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
corev1types "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
)

Expand Down Expand Up @@ -134,6 +138,22 @@ func Deploy(ctx *util.Context) error {
return nil
}

// WaitForMachineController waits for machine-controller-webhook to become running
func WaitForMachineController(corev1Client corev1types.CoreV1Interface) error {
return wait.Poll(5*time.Second, 3*time.Minute, func() (bool, error) {
machineControllerPods, err := corev1Client.Pods(WebhookNamespace).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", MachineControllerAppLabelKey, MachineControllerAppLabelValue),
})
if err != nil {
return false, err
}
if machineControllerPods.Items[0].Status.Phase == corev1.PodRunning {
return true, nil
}
return false, nil
})
}

func machineControllerServiceAccount() *corev1.ServiceAccount {
return &corev1.ServiceAccount{
TypeMeta: metav1.TypeMeta{
Expand Down
58 changes: 50 additions & 8 deletions pkg/templates/machinecontroller/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import (
"fmt"

"github.com/kubermatic/kubeone/pkg/config"
"github.com/kubermatic/kubeone/pkg/installer/util"
"github.com/kubermatic/kubeone/pkg/templates"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
clustercommon "sigs.k8s.io/cluster-api/pkg/apis/cluster/common"
clusterv1alpha1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
clusterclientset "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset"
clustertypes "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1"
)

type providerSpec struct {
Expand All @@ -24,20 +29,36 @@ type providerSpec struct {
OperatingSystemSpec interface{} `json:"operatingSystemSpec"`
}

// MachineDeployments returns YAML manifests for MachineDeployments
func MachineDeployments(cluster *config.Cluster) (string, error) {
deployments := make([]interface{}, 0)
// DeployMachineDeployments deploys MachineDeployments that create appropriate machines
func DeployMachineDeployments(ctx *util.Context) error {
if ctx.Clientset == nil {
return errors.New("kubernetes clientset not initialized")
}
if ctx.RESTConfig == nil {
return errors.New("kubernetes rest config not initialized")
}

// Create Cluster-API clientset
clusterapiClientset, err := clusterclientset.NewForConfig(ctx.RESTConfig)
if err != nil {
return err
}
clusterapiClient := clusterapiClientset.ClusterV1alpha1()

for _, workerset := range cluster.Workers {
deployment, err := createMachineDeployment(cluster, workerset)
// Apply MachineDeployments
for _, workerset := range ctx.Cluster.Workers {
deployment, err := createMachineDeployment(ctx.Cluster, workerset)
if err != nil {
return "", err
return err
}

deployments = append(deployments, deployment)
err = ensureMachineDeployment(clusterapiClient.MachineDeployments(deployment.Namespace), deployment)
if err != nil {
return err
}
}

return templates.KubernetesToYAML(deployments)
return nil
}

func createMachineDeployment(cluster *config.Cluster, workerset config.WorkerConfig) (*clusterv1alpha1.MachineDeployment, error) {
Expand Down Expand Up @@ -113,6 +134,27 @@ func createMachineDeployment(cluster *config.Cluster, workerset config.WorkerCon
}, nil
}

func ensureMachineDeployment(machineDeploymentsClient clustertypes.MachineDeploymentInterface, required *clusterv1alpha1.MachineDeployment) error {
existing, err := machineDeploymentsClient.Get(required.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err = machineDeploymentsClient.Create(required)
return err
}
if err != nil {
return err
}

modified := false
templates.MergeStringMap(&modified, &existing.ObjectMeta.Annotations, required.ObjectMeta.Annotations)
templates.MergeStringMap(&modified, &existing.ObjectMeta.Labels, required.ObjectMeta.Labels)
if equality.Semantic.DeepEqual(required.Spec, existing.Spec) && !modified {
return nil
}

_, err = machineDeploymentsClient.Update(existing)
return err
}

func machineSpec(cluster *config.Cluster, workerset config.WorkerConfig, provider config.ProviderName) (map[string]interface{}, error) {
var err error

Expand Down
Loading

0 comments on commit 7ddba2f

Please sign in to comment.