Microservices and gRPC: Use Atomix as service discovery

gRPC is a modern open source high performance RPC framework initiated by Google and supported by many languages and platforms (C++, Java, Go, Node, Ruby, Python and C# across Linux, Windows, and Mac). It is used by many projects (etcd/CoreOS, containerd/Docker, cockroachdb/Cockroach Labs…) and has reached a significant milestone with its 1.0 release.

Used in a distributed environments where a large number of microservices are running, gRPC supports rich cloud oriented features like:
– load balancing/discovery
– tracing
– health checking
– authentication

For now, gRPC supports only DNS as the default name-system and can simply be used as:

ManagedChannel channel = ManagedChannelBuilder
.forTarget("dns:///dns.server.example.com:8080")
.nameResolverFactory(new DnsNameResolverProvider())
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.usePlaintext(true)
.build();

The goal of this article is to show how to wire gRPC with Atomix. Atomix is a framework for coordinating distributed systems built on the Raft consensus algorithm. Atomix provides some blocks that solve many common distributed systems problems including group membership, leader election, distributed concurrency control, partitioning, and replication. In internal, Atomix uses an other product: Copycat, which is a fault-tolerant state machine replication framework.

NOTE
If you want to have a look on what you can do with Atomix, here is a list of examples.

In order to use Atomix, we will build a little infrastructure with:
– a service discovery
– a gRPC server
– and a gRPC client

We could include the service discovery in the gRPC server but the idea here is to dissociate the roles, and eventually reuse the service discovery for other services.

We begin by adding the dependencies:

<properties>
    <atomix.version>1.0.0-rc9</atomix.version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.atomix</groupId>
        <artifactId>atomix-all</artifactId>
        <version>${atomix.version}</version>
    </dependency>
</dependencies>

First of all, the service discovery:

import io.atomix.AtomixReplica;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.netty.NettyTransport;
import io.atomix.copycat.server.storage.Storage;

import java.net.InetAddress;
import java.util.UUID;

public class AtomixServer {

    public static void main(String[] args) throws Exception {
        Address address = new Address("184.3.145.57", 12345);
        AtomixReplica replica = AtomixReplica.builder(address)
                .withTransport(new NettyTransport())
                .withStorage(Storage.builder()
                        .withDirectory(System.getProperty("user.dir") + "/logs/" + UUID.randomUUID().toString())
                        .build())
                .build();
        replica.bootstrap();
    }
}

Very simple, all the logic is managed by Atomix.

For the client and the server, we will reuse the HelloWorld example from gRPC based on this proto file:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

On the server side, we need to modify the HelloWorldServer in order to register the instance on Atomix:


import io.atomix.Atomix; import io.atomix.AtomixClient; import io.atomix.catalyst.transport.Address; import io.atomix.catalyst.transport.netty.NettyTransport; import io.atomix.group.DistributedGroup; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class HelloWorldServer { private static final Logger logger = LoggerFactory.getLogger(HelloWorldServer.class); /* The port on which the server should run */ private final int port; /* The list of address of the Atomix servers */ private final List<Address> cluster; private Server server; public HelloWorldServer(int port, List<Address> cluster) { this.port = port; this.cluster = cluster; } public static void main(String[] args) throws Exception { List<Address> cluster = new ArrayList<>(); cluster.add(new Address(InetAddress.getLocalHost().getHostName(), 12345)); // Starting a first server HelloWorldServer server1 = new HelloWorldServer(50051, cluster); server1.start(); // Starting a second server HelloWorldServer server2 = new HelloWorldServer(50052, cluster); server2.start(); server1.blockUntilShutdown(); server2.blockUntilShutdown(); } private void start() throws Exception { InetSocketAddress publishAddress = new InetSocketAddress("184.3.145.57", port); server = ServerBuilder .forPort(port) .addService(new GreeterImpl()) .build() .start(); logger.info("Server started, listening on " + port); // Register AtomixClient client = AtomixClient.builder().withTransport(new NettyTransport()).build(); Atomix atomix = client.connect(cluster).get(); DistributedGroup group = atomix.getGroup("service-helloworld").get(); // Add the address in metadata group.join(Collections.singletonMap("address", publishAddress)).get(); // SDH Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { HelloWorldServer.this.stop(); } }); } private void stop() { if (server != null) { server.shutdown(); } } private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } private class GreeterImpl extends GreeterGrpc.GreeterImplBase { @Override public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) { System.err.println("Go a request, hello!!!"); HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); } } }

Now on the client side, we need to create two custom implementations for io.grpc.NameResolver.Factory and io.grpc.NameResolver.

import com.google.common.base.Splitter;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.atomix.catalyst.transport.Address;
import javax.annotation.Nullable;
import java.util.stream.Collectors;

public class AtomixNameResolverFactory extends NameResolver.Factory {

    @Nullable
    @Override
    public NameResolver newNameResolver(URI uri, Attributes attributes) {
        String authority = uri.getAuthority();
        List<Address> cluster = Splitter.on(",").withKeyValueSeparator(':').split(authority)
                .entrySet().stream()
                .map(entry -> new Address(entry.getKey(), entry.getValue() != null ? Integer.valueOf(entry.getValue()) : 12345))
                .collect(Collectors.toList());
        return new AtomixNameResolver(authority, cluster, uri.getPath().substring(1));
    }

    @Override
    public String getDefaultScheme() {
        return "atomix";
    }
}
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.netty.NettyTransport;
import io.atomix.group.DistributedGroup;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public static class AtomixNameResolver extends NameResolver {

    private final static Logger LOGGER = LoggerFactory.getLogger(AtomixNameResolver.class);
    private final String authority;

    private final List<Address> cluster;
    private final String service;
    @GuardedBy("this")
    private boolean shutdown;
    @GuardedBy("this")
    private Listener listener;
    @GuardedBy("this")
    private AtomixClient client;

    public AtomixNameResolver(String authority, List<Address> cluster, String service) {
        this.authority = authority;
        this.cluster = cluster;
        this.service = service;
    }

    @Override
    public String getServiceAuthority() {
        return this.authority;
    }

    @Override
    public void start(Listener listener) {
        Preconditions.checkState(this.listener == null, "already started");
        this.listener = Preconditions.checkNotNull(listener, "listener");

        client = AtomixClient.builder().withTransport(new NettyTransport()).build();

        DistributedGroup group;
        try {
            Atomix atomix = client.connect(cluster).get();
            group = atomix.getGroup(service).get();
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            listener.onError(Status.UNAVAILABLE.withCause(e));
            return;
        }
        group.onJoin(m -> refreshServers(listener, group));
        group.onLeave(m -> refreshServers(listener, group));

        refreshServers(listener, group);
    }

    private void refreshServers(Listener listener, DistributedGroup group) {
        List<ResolvedServerInfo> servers = null;
        try {
            servers = group.members().stream()
                    .map(member -> member.<Map<String, InetSocketAddress>>metadata().get().get("address"))
                    .map(address -> new ResolvedServerInfo(address, Attributes.EMPTY))
                    .collect(Collectors.toList());
            LOGGER.warn("Servers: {}", servers);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            listener.onError(Status.UNAVAILABLE.withCause(e));
        }
        listener.onUpdate(Collections.singletonList(servers), Attributes.EMPTY);
    }

    public final synchronized void shutdown() {
        if (!this.shutdown) {
            this.shutdown = true;
            try {
                client.close().get();
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }
    }
}

Now all we have to do:

ManagedChannel channel = ManagedChannelBuilder
.forTarget("atomix://host1:1111,host2:1111/example-service")
.nameResolverFactory(new AtomixNameResolverFactory())
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.usePlaintext(true)
.build();

Sources:
http://atomix.io/atomix/docs/configuration/
http://www.grpc.io/blog/principles
https://github.com/grpc/grpc/blob/master/doc/naming.md

Credits:
“03102014” by JF DIGONNET is licensed under CC BY 2.0

Related Posts

Leave a comment