Distributed Coordination with Kubernetes

At Parkster we’re in the process of migrating a large monolithic application into smaller services. We’ve already been using Kubernetes for quite a while and the only application that (previously) had not been moved to Kubernetes is the (still quite large) remnants of this monolith. Fully splitting the monolith into its smaller parts is a long term vision and during this time we could still benefit from some of the functionality that Kubernetes provides such as scheduling, service discovery, high availability, log collection and so on. But doing so requires work. What we’re going to explore in this blog is the problem of scheduled jobs that are started by the monolith periodically.

Dealing with Scheduled Jobs

The monolith, that previously ran as a single instance on a single node, contains a lot of scheduled jobs that updates the state in the database (and nowadays also publishes business events). The monolith is built in Java and make heavy use of Spring so a job might look like this:

@Scheduled(fixedRate = 60000)
public void doSomethingEveryMinute() {
    // Do something that updates the database state
}

Spring will then make sure that the `doSomethingEveryMinute` method is executed once every minute. The problem is that if we’re now going to host the monolith on Kubernetes running with multiple instances this job will be executed once per instance per minute instead of just once per minute. This can be a big problem if the job has side effects such as sending notification emails, updating the database or creating invoices etc. So how do we get around this? There are of course many alternatives, an obvious choice would be to make use of Kubernetes Jobs and let Kubernetes itself schedule the jobs periodically. The problem is that this functionality is only available in Kubernetes 1.4 and onward and 1.4 is not yet released. But even if we could use such a feature it might not be feasible from a technical standpoint. Our jobs were highly coupled to the existing code base and extracting each indvidual job to its own application would be error prone and very time consuming if done all at once. So our initial plan was to extract ALL scheduled jobs to an application that would only be running as one instance in the Kubernetes cluster. But due to the nature of the existing code and the high coupling even this turned out to be quite a struggle. What if there was an easy way that allowed us to keep the jobs in the monolith for now and replace them gradually as we extract functionality from the monolith into stand-alone services? Turned out there was 🙂

Leader Election in Kubernetes

To workaround this we would like some sort of distributed coordination. I.e. when the jobs are executed by Spring we’d like to just return (and NOT run the code associated with the job) if this node is not the “leader node” responsible for running the scheduled jobs. There are several projects that can help us deal with these kinds of things such as zookeeper and hazelcast. But it would be overkill for us to setup and maintain a zookeeper cluster just for the sake of determining which node should execute the schedule jobs. We needed something that was easier to manage, what if we could utilize Kubernetes for this? Kubernetes already deals with leader election under the covers (using the RAFT consensus algorithm) and it turns out that this functionality is exposed to end users by using the `gcr.io/google_containers/leader-elector` Docker image. There’s already a good blog post describing how this works in detail here so I won’t go into details here as well but I will talk about how we leveraged this image to solve our problem.

Solving the Problem

What we did was to bring along the `gcr.io/google_containers/leader-elector` container in our pod so that each instance of the monolith also ran an instance of the leader elector. This is almost a canonical example of the usefulness of a Kubernetes pod. Here’s an excerpt of the pod defined in our deployment resource:

spec:
  containers:
    - name: "monolith"
      image: "..."
      # The rest is commented out for brevity
    - name: elector
      image: gcr.io/google_containers/leader-elector:0.4
      imagePullPolicy: IfNotPresent
      args:
        - --election=monolith-jobs
        - --http=localhost:4040
      ports:
        - containerPort: 4040
          protocol: TCP

Here we start the leader elector along side our monolith. Note that we pass the argument `–election=monolith-jobs` as a the first argument. This means that the leader elector knows which “group” this container belongs to. All containers specifying this group will be a part of the leader election process and only one of these will be elected as the leader. The second argument of `–http=localhost:4040` is also very important. It opens a webserver on port 4040 which we could query to get the pod name of the current leader returned in this format:

{ "name" : "name-of-the-leader-pod" }

This is the trick that’ll we’ll use to determine whether or not to run our job. But how? All we have to do is to check if the name of the pod that is about to execute a schedule job is the same as the elected leader, if so we should go ahead and execute the job or else we should just return. For example:

@Autowired
ClusterLeaderService clusterLeaderService;

@Scheduled(fixedRate = 60000)
public void doSomethingEveryMinute() {
    if (clusterLeaderService.isThisInstanceLeader()) {
        // Do something that updates the database state
    } else {
      log.info("This node is not the cluster leader so won't execute the job");
      return;
    }    
}

So let’s see how the `ClusterLeaderService` could be implemented. First we must get a hold of the pod name from the application. Kubernetes stores the name of the pod in `/etc/hostname` which Java expose in the `HOSTNAME` environment variable so this is what we’ll make use of in this example. Another way is to use the Downward API to expose the pod name in an environment variable of choice. For example:

apiVersion: v1
kind: Pod
metadata:
  name: dapi-test-pod
spec:
  containers:
    - name: test-container
      image: gcr.io/google_containers/busybox
      command: [ "/bin/sh", "-c", "env" ]
      env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name

Here we can see that the `metadata.name` (i.e. the name of the pod) will be associated with the `MY_POD_NAME` environment variable. But now let’s see how an implementation of `ClusterLeaderService` might look like:

@Service
public class ClusterLeaderService {

	public boolean isThisInstanceLeader() {
		String myPodName = System.getenv("HOSTNAME");
		String leaderPodName = JsonPath.from(new URL("http://localhost:4040")).getString("name");

		return myPodName.equals(leaderPodName);

	}
}

In this example we’re using the `JsonPath` from the REST Assured project to query the elector web service and extract the name of the pod from the response. Then we simply compare the name of the local pod with the leader and if they’re equal we know that this instance is the leader! That’s it!

Conclusion

This has turned out to work well for us. If the leader node were to fail another one will be automatically elected. Be aware that this takes a while, maybe up to a minute or so. This is of course a trade off that one must be aware of. If it’s very important to never miss a job execution this option might not be viable for you. In our case it’s not such a big deal if a job is not executed exactly once every minute. It’s fine if this job is delayed for a minute or two in rare situations. I think this approach is easy enough and can be very valuable when migrating an existing application containing scheduled jobs that for various reasons are hard to extract.

5 thoughts on “Distributed Coordination with Kubernetes

  1. This is a very cool solution. Reading this post as of now (post 1.3 release) I already know that scheduling was postponed to 1.4 which means Sept.. However I might utilize this solution until then. I enjoyed reading your other Kuberneres posts. We mostly struggle with the inability to define endpoints for DNS names (only ipv4 endpoints are supported) and the DockerRoot inconsistency bug which impairs Fluentd log aggregation.

  2. Can you give me some example of “scheduled jobs”: “updates the state in the database (and nowadays also publishes business events)”.
    You will have many pods (replicas) in your cluster. What happens if the other pods don’t do the scheduled job in other pods. I think it will not synchronize. For example, you update IP address of a client, so this IP can not synchronize.
    Can you help me! Thank you

Leave a Reply to KubeWeekly #37 – KubeWeekly Cancel reply

Your email address will not be published. Required fields are marked *