Scaling: thinking in hardware and software

I remember learning about hardware when I was a Computer Science undergraduate. It was mostly taught separately from software with both subjects residing in different modules on the curriculum. To students without much industry experience, it can be easy to think that hardware and software are extremely different subject areas; the former populated with stuffy electronic engineers and silicon, and the latter being where the real computer science happens: data structures, algorithms and code.

At Brandwatch, we find that the best backend developers are the ones that think in hardware as much as software. We have very data-centric technology on the backend. We crawl, consume and process 70M+ matches a day from the Internet that match our users' search queries and we index and store them all in real-time. Continuing to scale this system is a challenge, and we have to think of the best ways to utilise our software and hardware so that the platform can continue scaling whilst maintaining speed and reliability.

More frequently we find that hardware planning is a bigger part of creating new features than just writing new software. Making good choices up front can not only make your feature extremely reliable and fast, but it can save a large amount of money on your IT budget too.

Data processing

A lot of our architecture processes data. We analyse matches from the web, classify them in a multitude of ways, and store them. More recently we've been analysing the last 24 hours of data with more scrutiny to try to predict whether something interesting is happening: for example, whether a hashtag is beginning to trend, or whether a particular story is being shared. This problem is made more complicated by the fact that we're not doing global data analysis like Twitter and Facebook do; instead we're inspecting tens thousands of individual data streams, since every query in the system is different.

The number of customer's queries, and therefore data streams, is increasing very rapidly. We currently have 120,000+ live queries, compared to about 70,000 this time last year. The year before that, it was 30,000. You can probably see the issue with scale here.

Writing software to do simple analysis of one data stream is straightforward. Let's do this now with an example of counting the top countries that matches are coming from. Perhaps you could have an application that receives web pages, tweets or Facebook updates, and then for each one you store the country it has been posted from.

public class CountryCounter {

    private Multiset<String> countries = HashMultiset.create();

    public void logCountry(String country) {
        countries.add(country);
    }

    public int occurencesOf(String country) {
        return countries.count(country);
    }
}

Simple! We can then write whatever algorithm we like to find the top country. We could even use a SortedMultiset for this, although enforcing the order of the collection could be expensive with a high rate of inserts. Even though the code is simple, there are a number of considerations when working with extremely large amounts of data. Ideally we don't want our application to stop running, since the stream of data is never going to stop. How will we keep the Multiset under control? In this case, aside from a potential future catastrophic political event, the number of countries in the world will never grow explosively. But what if we were tracking the top hashtags, or the top authors? These facets of data often have a very long tail. What impact does that have on RAM over time? Can we store the top items in RAM, but palm off the tail entries to a slower but cheaper storage? How would we cope if we had multiple instances contributing to this global counter? These are all exercises for the reader.

Let's get back to our specific example, which is processing data for tens of thousands of streams. In this case, we'd want pretty much the same code, but we'd want a Multiset for each search query in our system. They each have a unique ID, so that would be a sensible key to use:

public class QueryCountryCounters {

    private Map<Long, CountryCounter> queryCountryCounters = Maps.newHashMap();

    public void logCountry(long queryId, Match match) {
        CountryCounter countryCounter = queryCountryCounters.get(queryId);
        countryCounter.logCountry(match.getCountry());
    }
}

That'll do - we can assume there will always be a CountryCounter initialised for each query. But what if we want to count 30 other metrics for each query in the system? 30 Multisets for each query is beginning to look more difficult to fit in a sensibly-sized JVM heap, especially as garbage collection becomes slower when there's more to collect. One solution would be to buy the most powerful machine on the market to run this application, but that's expensive. Imagine the look on your CFO's face. It also won't scale very well if the number of queries grows at an exponential rate.

A better solution is to calculate the required RAM that we require, and divide it by a sensible JVM heap size. That'll be the number of instances of this application we'll need to run to solve the problem, as we can have each instance processing a unique set of the total number of queries. This also means you could run instances on cheaper commodity hardware if you wanted to. However, we've now created a different problem: if we were running tens or even hundreds of instances, how will they know which queries they should be working on?

Coordinating tasks with Apache Zookeeper and Curator

Leader election is a method of deciding which process is the designated assignee of a task in a distributed system. You can harness the power of leader election by using Apache Zookeeper and Curator. We'll assume from now that you've got a Zookeeper quorum running, but if you need some assistance then do look at the documentation on the Zookeeper website.

Zookeeper looks a bit like a file system, and you interact with data at specific paths, e.g. /brandwatch/feature_foo/data_bar. It's up to you to decide how you name and store data. Each item in the "filesystem" is a node, most commonly called a znode to make it clear that we're talking about Zookeeper. These znodes can also be ephemeral; that is, that when a connection is lost with the client that created them, the ephemeral nodes are deleted also.

First, we'll write some code to tell Zookeeper that we have a query that needs processing by our instances. We might want to write a new application that manages the creation and deletion of queries, since it only needs to happen in one place. Creating an instance of Curator in your code is simple:

CuratorFrameworkFactory  
            .builder()
            .connectString("localhost:19121)
            .namespace("/brandwatch/data_counters")
            .build()
            .start();

Then here's a method to create a new node at a specified path:

public void createZNode(String queryId) {  
        try {
            client.create().forPath("/queries/" + queryId);
        } catch (NodeExistsException e) {
            log.warn("Node {} was already created.”, queryId);
        }
}

If a query gets removed from the system, then the code looks pretty similar:

public void removeZNode(String queryId) {  
        try {
            client.delete().forPath("/queries/" + queryId);
        } catch (NoNodeException e) {
            log.warn("Node {} was already deleted.”, queryId);
        }
}

Electing a leader

Next, in the code for our counters, which will be running in multiple instances, we will listen for changes at the path. Firstly, let's use the LeaderLatchListener interface to define what happens when we win or lose leadership for a query. We'll assume that the createDataStructure() method initialises the Multisets we defined earlier and inserts the HashMap entry for that query. We'll also assume that removeDataStructure() removes it, allowing it to be garbage collected.

public class WorkerLeaderLatchListener implements LeaderLatchListener {

    @Override
    public void isLeader() {
        createDataStructure(queryId);
    }

    @Override
    public void notLeader() {
        removeDataStructure(queryId);
    }
}

Next we'll write a class that uses PathChildrenCacheListener to watch the parent node that is having child nodes added and removed by our other application, and it will receive callbacks via the childEvent method whenever this happens. Depending on the event, it will create or remove the LeaderLatchListener implementation that we've just written.

public class WorkerManager implements PathChildrenCacheListener {

    private Map<Integer, LeaderLatch> leaderLatches = newHashMap();

    @PostConstruct
    public void initialise() throws Exception {
        List<ChildData> currentData = newArrayList(initialisePathChildrenCache());
        for (ChildData childData : currentData) {
            long queryId = parseQueryIdFromPath(childData.getPath());
            startLeaderElection(queryId);
        }
    }

 private List<ChildData> initialisePathChildrenCache() throws Exception {
        pathChildrenCache.start(StartMode.BUILD_INITIAL_CACHE);
        pathChildrenCache.getListenable().addListener(this);
        return pathChildrenCache.getCurrentData();
 }

    @Override
    public void childEvent(CuratorFramework client, 
                           PathChildrenCacheEvent event) {
        ChildData childData = event.getData();
        switch (event.getType()) {
        case CHILD_ADDED:
            long queryId = parseQueryIdFromPath(childData.getPath());
            startLeaderElection(queryId);
            break;
        case CHILD_REMOVED:
        long queryId = parseQueryIdFromPath(childData.getPath());
        removeFromLeaderElection(queryId);
            break;
        default:
            break;
            // There are other cases to deal with, such as RECONNECT.
        }

   }

    private boolean haveLeaderLatchForQuery(int queryId) {
        return leaderLatches.containsKey(queryId);
    }

    private void startLeaderElection(final int queryId) {
        String leaderPath = "/queries/" + Integer.toString(queryId);
        LeaderLatch leaderLatch = createLeaderLatch(queryId, leaderPath);
        attemptToGetLeadership(queryId, leaderLatch);
    }

    void attemptToGetLeadership(final int queryId, LeaderLatch leaderLatch) {
        LeadershipAttempt leadershipAttempt = new LeadershipAttempt(leaderLatch);
        executorService.submit(leadershipAttempt);
    }

    LeaderLatch createLeaderLatch(final int queryId, String leaderPath) {
        LeaderLatch leaderLatch = new LeaderLatch(client, leaderPath);
        LeaderLatchListener leaderLatchListener = new WorkerLeaderLatchListener(queryId,
                workerContainer, workerRefusals, maximumWorkersAllowed);
        leaderLatch.addListener(leaderLatchListener);
        try {
            leaderLatch.start();
        } catch (Exception e) {
            log.error("Error when starting leadership election", e);
        }
        leaderLatches.put(queryId, leaderLatch);
        return leaderLatch;
    }

    void shutdown() throws IOException {
        for (LeaderLatch leaderLatch : leaderLatches.values()) {
            leaderLatch.close();
        }
        pathChildrenCache.close();
    }

}

And that's about it! The instances will now distribute work amongst themselves. If one instance dies, the queries that it was processing will now fail over to another instance. But what about the historical data that was being held in RAM in the other instance? Well, that's a problem for another time, I guess.

We're increasingly using distributed election of tasks in our infrastructure to allow us to horizontally scale applications. For example, using the above code, you can spin up 5 more instances and they would automatically join and participate in the leader election process. Magic! If you're interested in these sorts of problem, or if you know ways of doing this better, then we're always on the lookout for talented engineers to join us.