A simple Tcp Client/Server using Reactor

Reactor is a foundation for asynchronous applications on the JVM. It provides abstractions for Java, Groovy and other JVM languages to make building event and data-driven applications easier. It’s also really fast. On modest hardware, it’s possible to process around 15,000,000 events per second with the fastest non-blockingDispatcher. Other dispatchers are available to provide the developer with a range of choices from thread-pool style, long-running task execution to non-blocking, high-volume task dispatching.

Since the first milestone of the project, an easy-to-use TCP client and server are included. Powered by Netty, a Reactor-powered syslog server can ingest something like 1 million messages per sec on server-grade hardware. Reactor TCP support includes a simple Codec facility, which is easily extensible beyond the default set of codecs provided in core and designed to be lightweight by using Reactor’s Buffer class, which provides things like extremely efficient views over data, as well as a slew of helper methods for working with standard Java NIO ByteBuffers–but without the pain of dealing with the ByteBuffer directly.Reactor’s TCP support comes with JSON right out of the box.

So, ok, let’s have a look!

import java.util.Date;

import org.junit.Test;

import reactor.core.Environment;
import reactor.function.Consumer;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.TcpServer;
import reactor.tcp.encoding.json.JsonCodec;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.netty.NettyTcpServer;
import reactor.tcp.spec.TcpClientSpec;
import reactor.tcp.spec.TcpServerSpec;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class Main {

  public static class SimpleMessage {

   private String content;

   private Date date;

   @JsonCreator
  public SimpleMessage(@JsonProperty("content") String content,
    @JsonProperty("date") Date date) {
   super();
   this.content = content;
   this.date = date;
  }

   public Date getDate() {
   return date;
  }

   public String getContent() {
   return content;
  }

   @Override
  public String toString() {
   return "SimpleMessage [content=" + content + ", date=" + date + "]";
  }

  }

  @Test
 public void request() throws InterruptedException {

   // Create Environment in which Reactors operate
  Environment env = new Environment();

   TcpServer<SimpleMessage, String> server = new TcpServerSpec<SimpleMessage, String>(
    NettyTcpServer.class)
    .env(env)
    .dispatcher(Environment.RING_BUFFER)
    .listen("localhost", 15151)
    .codec(new JsonCodec<SimpleMessage, String>(SimpleMessage.class))
    .consume(new Consumer<TcpConnection<SimpleMessage, String>>() {

      public void accept(
       TcpConnection<SimpleMessage, String> connection) {
      connection.in().consume(new Consumer<SimpleMessage>() {
       public void accept(SimpleMessage data) {
        System.err.println("Receiving a data! -> "
          + data);
       }
      });
     }
    }).get().start();

   TcpClient<String, SimpleMessage> client = new TcpClientSpec<String, SimpleMessage>(
    NettyTcpClient.class).env(env)
    .dispatcher(Environment.RING_BUFFER)
    .connect("localhost", 15151)
    .codec(new JsonCodec<String, SimpleMessage>(String.class))
    .get();

   final SimpleMessage message = new SimpleMessage("A simple message",
    new Date());

   TcpConnection<String, SimpleMessage> tcpConnection = client.open()
    .await();

   tcpConnection.send(message, new Consumer<Boolean>() {

    public void accept(Boolean data) {
    System.err.println("'" + message + "' sended");
   }
  });

   Thread.sleep(1000);
  tcpConnection.close();

   System.err.println("All data have been send");

   client.close().await();
  server.shutdown().await();
 }
}

Don’t forget the Maven deps:

  <dependency>
   <groupId>org.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>1.0.0.M1</version>
  </dependency>
  <dependency>
   <groupId>org.projectreactor</groupId>
   <artifactId>reactor-tcp</artifactId>
   <version>1.0.0.M1</version>
  </dependency>

More infos:
http://springsource.org/2013/05/13/reactor-a-foundation-for-asynchronous-applications-on-the-jvm/
http://springsource.org/2013/05/13/reactor-a-foundation-for-asynchronous-applications-on-the-jvm/

Related Posts

Leave a comment

About privacy:

This site uses Akismet to reduce spam. Learn how your comment data is processed.