Ratpack, MongoDB i RxJava

Ratpack, MongoDB i RxJava

Rozwinięcie tematu nieblokującego I/O w Javie - integrujemy serwer HTTP Ratpack z bazą MongoDB i RxJavą.

piątek, 8 czerwca 2018

Informatyka

W poprzednim artykule Ratpack: nieblokujący serwer HTTP przedstawiałem bibliotekę Ratpack, implementację serwera HTTP wykorzystującą nieblokujące operacje I/O. Najwyższa pora na kontynuację tematu. Zajmiemy się dzisiaj integracją Ratpacka z innymi narzędziami i spróbujemy stworzyć aplikację, która będzie komunikowała się z prawdziwą bazą danych. Wykorzystamy w tym celu MongoDB, ponieważ jego sterownik posiada także asynchroniczne API, a do połączenia go z serwerem HTTP użyjemy biblioteki RxJava. RxJava pozwala nam na stworzenie asynchronicznego strumienia do przesyłania danych z jednego miejsca do drugiego, na którym mamy pozapinany ciąg filtrów oraz transformacji. Brzmi ciekawie? Zatem do dzieła!

Zasada działania

Aplikacja demonstracyjna będzie służyła do zarządzania artykułami. Możliwe będzie zarówno odczytanie listy artykułów, jak i zapisanie dowolnej liczby nowych artykułów do bazy. Pozwoli nam to zilustrować dwa podstawowe typy operacji wykonywanych przez aplikacje REST-owe: odczyt oraz zapis, które mają odmienny sposób przetwarzania danych. Łańcuchy operacji skonstruujemy przy pomocy RxJava, a ich ogólny schemat przedstawiłem poniżej:

Ilustracja
Łańcuch przetwarzania w demonstracyjnej aplikacji.

Strzałki oznaczają kierunek przepływu danych, zaś prostokąty to wykonywane na nich operacje. Kolorem czerwonym zaznaczyłem miejsca, w których integrujemy się z MongoDB, zaś niebieskim - operacje wykonywane przez Ratpack.

Jeśli chodzi o odczyt, nasz łańcuch jest bardzo prosty. Źródłem danych jest tutaj MongoDB, zaś miejscem docelowym - renderer Ratpacka, który generuje docelowy dokument JSON i umieszcza go w odpowiedzi HTTP. Po drodze wykonujemy transformację dokumentu BSON (Binary JSON) na obiekt reprezentujący artykuł. Ratpack będzie pobierał dane strumieniowo, serializując jeden artykuł po drugim w czasie gdy baza będzie zajęta wyciąganiem ich z bazy.

Operacja zapisu jest dużo bardziej skomplikowana - w odpowiedzi chcielibyśmy wysłać dokument JSON pokazujący wynik wstawiania poszczególnych artykułów do bazy, dlatego Ratpack będzie tu zarówno źródłem danych (JSON z artykułami), jak i ich końcowym odbiorcą. Baza danych będzie tylko jednym z etapów przetwarzania w łańcuchu. Zwróćmy uwagę, że na diagramie zaznaczyłem tutaj dwie kreski. Chodzi o to, że o ile Ratpack potrafi ładnie strumieniować odczytywane dane, to nie posiada żadnego API do strumieniowego parsowania JSON-ów. Od strony technicznej napisanie takiego parsera i użycie go razem z Ratpackiem jest możliwe, ale wymaga napisania dużej ilości niskopoziomowego kodu do składania buforów bajtów i wyszukiwania w nich granic poszczególnych obiektów reprezentujących artykuły. Dlatego aplikacja demonstracyjna będzie napisana nieco prościej. Będziemy najpierw parsować cały dokument JSON, tworząc w pamięci listę artykułów do dodania, a dopiero następnie rozpoczynając jej zapis. Jednak kod odpowiedzialny za interakcję z MongoDB jest od początku przystosowany do użycia w wariancie ze strumieniowaniem.

Bardzo krótki wstęp do RxJava

ReactiveX to uniwersalne API do programowania reaktywnego, posiadające implementacje dla wielu popularnych języków programowania. Dzięki takiemu podejściu każda implementacja posiada identyczne nazewnictwo oraz semantykę, co jest ogromnym ułatwieniem dla programistów, zwłaszcza gdy pracują w kilku technologiach. Co więcej, fundamentalne interfejsy są ustandaryzowane także na poziomie poszczególnych języków (projekt Reactive Streams), co daje nam dwie korzyści. Po pierwsze, w jednej aplikacji możemy mieć dwie różne implementacje API, które będą mogły ze sobą bezkonfliktowo współpracować. Po drugie, biblioteki pragnące mieć integrację z reaktywnym API nie muszą posiadać zależności od pełnej, konkretnej implementacji. W świecie Javy sytuacja jest bardzo klarowna. Istnieje javowa implementacja Reactive Streams, która stała się od Javy 9 częścią biblioteki standardowej (różnica to jedynie nazwy pakietów). Na jej bazie zbudowana jest biblioteka RxJava 2, będąca wzorcową implementacją ReactiveX dla Javy.

Zacznijmy od bardzo prostego przykładu, który dokona sumowania sekwencji liczb i wyświetlenia wyniku:

Flowable.range(1, 1000)
    .filter(i -> i % 2 == 0)
    .map(number -> number + 1)
    .reduce(0, (result, next) -> result + next)
    .subscribe(result -> System.out.println("Result: " + result));

Na pierwszy rzut oka przykład wygląda bardzo podobnie do javowego API strumieni i rzeczywiście, jeśli ograniczamy się tylko do podstawowej funkcjonalności, to obie implementacje robią praktycznie to samo. Ciekawe rzeczy zaczynają się, gdy poczytamy sobie dokładniej, czym to magiczne Flowable jest. Otóż strumień Flowable wspiera tzw. backpressure (polska nazwa zaczerpnięta z już istniejącej terminologii technicznej to przeciwciśnienie, ale chyba żaden informatyk jeszcze nie wpadł na jej używanie - będę zatem pierwszy :)). Wyobraźmy sobie sytuację, gdy nadawca i odbiorca pracują równolegle - może się okazać, że nadawca produkuje dane dużo szybciej niż my jesteśmy w stanie je przetwarzać i musimy zrobić coś z rosnącą górką danych czekających na obsługę. Dzięki Flowable, nadawca jest świadom tego, jak szybko powinien wypychać nowe dane tak, aby odbiorca nadążył. Jeśli nadawca nie jest w stanie się dostosować, mamy do swej dyspozycji wiele strategii radzenia sobie z problemem (np. buforując dane, albo odrzucając część z nich). Wynika też z tego, że RxJava znakomicie radzi sobie z obsługą wielu wątków i tak jest w istocie. Zmodyfikujmy powyższy przykład tak, aby redukcja była wykonywana w innym wątku:

CountDownLatch latch = new CountDownLatch(1);
Flowable.range(1, 1000)
    .filter(i -> i % 2 == 0)
    .observeOn(Schedulers.computation())
    .map(number -> number + 1)
    .reduce(0, (result, next) -> result + next)
    .subscribe(result -> {
        System.out.println("Result: " + result);
        latch.countDown();
    });
System.out.println("Done");
latch.await();

I gotowe - wszystko sprowadziło się do dodania .observeOn() do łańcucha - od tego miejsca wszystkie kolejne transformacje wykonywane są na oddzielnym wątku przeznaczonym do obliczeń.

Asynchroniczne MongoDB

Dokumentowa baza MongoDB posiada specjalny sterownik dla Javy, który implementuje nieblokujące API. Możemy wybrać jedną spośród dwóch zależności:

  • mongodb-driver-async - bardziej niskopoziomowa biblioteka udostępniająca po prostu API asynchroniczne, korzystające z CompletableFuture
  • mongodb-driver-reactivestreams - nakładka na powyższą bibliotekę, która integruje się z RxJavą.

Oznacza to, że możliwe jest pobieranie danych bezpośrednio do strumieni RxJavy, bez konieczności pisania dodatkowego kodu do integracji. Super!

Uwaga na importy - obie biblioteki posiadają klasy o takich samych nazwach, ale różniące się pakietami. Jeśli podczas pisania przykładu u siebie dostaniecie komunikat błędu o tym, że np. w klasie MongoCollection nie ma metody o spodziewanej sygnaturze, będzie to oznaczało, że zaimportowaliście wersję z mongodb-driver-async, a nie z mongodb-driver-reactivestreams.

Przygotowania do implementacji

Rozpocznijmy zatem pisanie demonstracyjnej aplikacji od przygotowania środowiska i stworzenia podstawowych obiektów reprezentujących nasz model danych. Oto skrypt budowania dla Gradle'a:

buildscript {
    repositories {
        jcenter()
    }
    dependencies {
        classpath "io.ratpack:ratpack-gradle:1.5.4"
    }
}

plugins {
    id 'io.franzbecker.gradle-lombok' version '1.14'
}

group 'com.zyxist.example'
version '1.0-SNAPSHOT'

apply plugin: 'application'
apply plugin: 'idea'
apply plugin: 'io.ratpack.ratpack-java'

sourceCompatibility = 1.8
mainClassName = 'com.zyxist.example.RxRatpackApp'

repositories {
    mavenCentral()
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.1.14'
    compile 'org.mongodb:mongodb-driver-reactivestreams:1.8.0'
}

W przykładzie korzystać będziemy z RxJavy 2 - to ona implementuje Reactive Streams. W obiegu istnieje także RxJava 1, która nie jest już rozwijana i pozbawiona integracji z Reactive Streams. Wspominam o tym, ponieważ Ratpack posiada API integracyjne do RxJavy, ale w wersji pierwszej. Korzystając z "dwójki", musimy ręcznie mapować obiekty Promise na strumienie.

Będziemy też potrzebować jakiejś reprezentacji artykułu oraz wyniku operacji:

@Data
public class Article {
    @JsonProperty
    private int id;
    @JsonProperty
    private String title;
    @JsonProperty
    private String content;
    @JsonProperty
    private List<String> references = new ArrayList<>();

    public Article addReference(String uri) {
        this.references.add(uri);
        return this;
    }
}
public class Result {
    @JsonProperty
    public final boolean success;
    @JsonProperty
    public final String message;

    public static Result success() {
        return new Result(true, "");
    }

    public static Result failure(String message) {
        return new Result(false, message);
    }

    public Result(boolean success, String message) {
        this.success = success;
        this.message = message;
    }
}

Komunikacja z MongoDB

Następnym krokiem będzie stworzenie klasy DatabaseService, w której zaimplementujemy komunikację z MongoDB. Stwórzmy sobie najpierw ogólną strukturę całej klasy. W jej skład wejdą potrzebne pola, stałe oraz metody do (un)marshallingu artykułów:

public class DatabaseService {
    private static final String ARTICLE_COLLECTION = "articles";

    private static final String ID_PROP = "id";
    private static final String TITLE_PROP = "title";
    private static final String CONTENT_PROP = "content";
    private static final String REFERENCES_PROP = "references";

    private final MongoClient mongoClient;
    private final MongoDatabase mongoDatabase;

    public DatabaseService(String name) {
        this.mongoClient = MongoClients.create();
        this.mongoDatabase = mongoClient.getDatabase(name);
    }

    private Document unmarshallArticle(Article article) {
        Document doc = new Document();
        doc.append(ID_PROP, article.getId());
        doc.append(TITLE_PROP, article.getTitle());
        doc.append(CONTENT_PROP, article.getContent());
        doc.append(REFERENCES_PROP, article.getReferences());

        return doc;
    }

    private Article marshallArticle(Document document) {
        Article article = new Article();
        article.setId(document.getInteger(ID_PROP));
        article.setTitle(document.get(TITLE_PROP).toString());
        article.setContent(document.get(CONTENT_PROP).toString());
        document.get(REFERENCES_PROP, List.class).forEach(it -> article.addReference(it.toString()));
        return article;
    }
}

Odczyt z bazy, jak pamiętamy z diagramu, jest nieskomplikowany i rzeczywiście, API MongoDB udostępnia nam w klasie MongoCollection metodę find(), która zwraca nic innego, jak reaktywnego nadawcę: obiekt implementujący interfejs Publisher. Możemy z niego bardzo szybko zrobić strumień artykułów:

public Flowable<Article> fetchAll(int limit) {
    MongoCollection<Document> articleCollection = mongoDatabase.getCollection(ARTICLE_COLLECTION);
    return Flowable
        .fromPublisher(articleCollection().limit(limit))
        .map(this::marshallArticle);
}

Sprawa z zapisem jest bardziej złożona - tym razem operacja na bazie jest tylko częścią łańcucha, a nasza metoda będzie zarówno przyjmować Flowable, jak i go zwracać:

public Flowable<Result> insertAll(Flowable<Article> articles) {
    MongoCollection<Document> articleCol = mongoDatabase.getCollection(ARTICLE_COLLECTION);
    return articles
        .map(this::unmarshallArticle)
        .flatMap(articleCol::insertOne)
        .map(success -> Result.success())
        .onErrorReturn(ex -> Result.failure(ex.getMessage()));
}

API RxJavy posiada ogromną ilość dostępnych operatorów i rzadko kiedy zachodzi potrzeba implementowania obiektów Subscriber czy Publisher we własnym zakresie. W tym przypadku udało nam się wszystko zrealizować w całości przy pomocy gotowych klocków. Zauważmy, że insertAll() tak naprawdę zwraca oryginalny łańcuch, tyle że udekorowany kolejnymi transformacjami odpowiedzialnymi za przetłumaczenie artykułu na BSON, właściwy zapis do bazy oraz obsługę błędów. Wyposażeni w taką implementację możemy przystąpić do pisania właściwego serwera HTTP.

Składamy wszystko w całość

W tym miejscu będziemy dokonywać tłumaczenia pomiędzy światem obietnic Ratpacka (Promise), a RxJavą. Dla celów czytelności kod obsługi obu wspieranych metod HTTP wydzieliłem do osobnych klas. Zajmijmy się najpierw odczytem:

public class ArticleListHandler {
    private static final int DEFAULT_LIMIT = 1000;
    private final DatabaseService service;

    public ArticleListHandler(DatabaseService service) {
        this.service = Objects.requireNonNull(service);
    }

    public void handleRequest(Context context) {
        int limit = DEFAULT_LIMIT;
        if (context.getPathTokens().containsKey("limit")) {
            limit = context.getPathTokens().asInt("limit");
        }
        context.render(Jackson.chunkedJsonList(context, service.fetchAll(limit)));
    }
}

Całą magię z generowaniem dokumentów JSON robi za nas metoda chunkedJsonList() - na szczęście dla nas używa ona już interfejsów z Reactive Streams, więc możemy bezpośrednio przekazać do niej Publishera utworzonego przez RxJavę 2. Reaktywna natura tej metody objawi się szczególnie wtedy, gdy będziemy wyświetlać BARDZO duże zbiory danych. Zauważymy wtedy, że serwer będzie wysyłał kolejne partie wynikowego JSON-a kawałek po kawałku, co będzie potwierdzeniem, że parser działa strumieniowo i nie potrzebuje mieć całej kolekcji wcześniej zbuforowanej w pamięci. Tak jak wspominałem, niestety nie mamy analogicznej metody do parsowania dokumentu JSON w żądaniu HTTP (co nie znaczy, że nie da się jej napisać). Dlatego na potrzeby zapisu najpierw zbudujemy kolekcję artykułów w pamięci, a później zabierzemy się za jej zapisywanie do bazy.

public class ArticleWriteHandler {
    private final DatabaseService databaseService;

    public ArticleWriteHandler(DatabaseService databaseService) {
        this.databaseService = databaseService;
    }

    public void handleRequest(Context context) {
        context
            .parse(fromJson(listOf(Article.class))).then(articles -> Promise
                .async(upstream -> executeDatabaseFlow(articles, upstream))
                .then(results -> context.render(json(results)))
        );
    }

    private void executeDatabaseFlow(List<Article> articles, Downstream<Object> upstream) {
        databaseService
            .insertAll(Flowable.fromIterable(articles))
            .collect(LinkedList::new, LinkedList::add)
            .subscribe(upstream::success);
    }
}

Dowolną kolekcję możemy bardzo łatwo przekształcić w łańcuch transformacji przy pomocy metody Flowable.fromIterable(). Zauważmy, że na potrzeby odczytu z bazy tworzymy dodatkowy asynchroniczny Promise i wynik uzyskiwany na końcu pracy łańcucha transformacji wrzucamy do ratpackowego obiektu Downstream sygnalizującego, że wynik jest już dostępny do renderingu. Cała ta magia wynika z tego, że właściwe renderowanie do JSON-a musimy zrealizować wewnętrz Promise.then(); jeśli o tym zapomnimy, dostaniemy wyjątek informujący o tym, że albo nic się nie udało wyrenderować, albo że próbujemy dwukrotnie stworzyć odpowiedź na już obsłużone żądanie. To udziwnienie jest nam tak naprawdę niepotrzebne. Możemy zrezygnować z collect(), dodatkowego Promise'a i użyć... chunkedJsonList(), który wszystko zrobi za nas. Pozostawiam to jako ćwiczenie i podpowiem tylko, że końcowy kod jest krótszy od przedstawionego powyżej.

Mając oba handlery, pozostało nam już tylko zrobić docelową aplikację:

public class RxRatpackApp {
    private static final String DB_NAME = "ratpack";

    public static void main(String args[]) {
        try {
            DatabaseService service = new DatabaseService(DB_NAME);
            ArticleListHandler articleListHandler = new ArticleListHandler(service);
            ArticleWriteHandler articleWriteHandler = new ArticleWriteHandler(service);

            RatpackServer.start(s -> s
                .handlers(h ->
                    h.path("articles", ctx -> ctx.byMethod(m -> m
                        .get(articleListHandler::handleRequest)
                        .post(articleWriteHandler::handleRequest)
                    ))
                )
            );
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }
}

Możemy teraz odpalić przykład i przetestować go w akcji.

Zakończenie

Kiedy musiałem pierwszy raz pożenić Ratpacka z RxJavą i MongoDB, wydawało się to dość trudne, jednak po lepszym poznaniu wszystkich narzędzi okazuje się, że nie jest to wcale tak straszne, na jakie wygląda. Umieszczona pośrodku RxJava to znakomite narzędzie do zarządzania wykonywanymi na danych transformacjami, głównie dzięki dobremu wsparciu dla wielowątkowości oraz implementacji przeciwciśnienia. Zauważmy, że zdecydowana większość żądań HTTP będzie obsługiwana w sposób podobny do zaprezentowanego w przykładzie, bowiem co możemy zrobić, gdy takie żądanie do nas przyjdzie? Albo coś skądś odczytać i wyświetlić, albo coś zmienić i wyświetlić wynik. Dlatego wszelkie inne operacje to już są tak naprawdę wariacje na temat zaprezentowanych tutaj rozwiązań. Mam nadzieję, że to wprowadzenie przyda się Wam w poznawaniu asynchronicznego świata i pozwoli szybko zacząć. Przed nami zostało już tylko jedno zagadnienie: integracja z relacyjnymi bazami danych, gdzie wciąż jesteśmy skazani na blokujące JDBC. Omówimy je w kolejnym artykule.

Tomasz Jędrzejewski

Programista Javy, lider techniczny. W wolnych chwilach podróżuje, realizując od kilku lat projekty długodystansowych wypraw pieszych.

Autor zdjęcia nagłówkowego: Marcel., CC-BY-2.0

zobacz inne wpisy w temacie

Informatyka

poprzedni wpis Async await w Javie następny wpis Struktura testów jednostkowych

Komentarze (3)

av

Igor Nosowski

Bardzo ciekawy wpis. Z ratpackiem jeszcze nie działałem, ale chyba czas najwyższy.

av

Hubert

Cześć, czy mógłbyś wrzucić cały kod aplikacji na githuba? Bo doszedłem do
wywołania metody "articleCollection().limit(limit)" i nie wiem co ona robi i co zwraca. I nie mogę skompilować kodu.

av

Zyx

Sprawdź czy zaimportowałeś właściwą klasę. Tak jak pisałem (i ostrzegałem na początku :)), są dwie biblioteki: mongodb-driver-async oraz bazująca na niej mongodb-driver-reactivestreams. Niestety obie posiadają klasy o tych samych nazwach (np. MongoCollection), ale w innych pakietach i z innymi metodami. Jeśli zaimportujesz klasę z tej pierwszej, a nie z drugiej, to kod Ci się nie skompiluje.

Kodu niestety nie mogę teraz wrzucić na GitHub z przyczyn technicznych, a dlaczego - patrz najnowsze artykuły na blogu :).

Skomentuj

Od 3 do 40 znaków.

Wymagany, anonimizowany po zatwierdzeniu komentarza.

Odpowiedz na pytanie.

Edycja Podgląd

Od 10 do 8000 znaków.

Wszystkie komentarze są moderowane i muszą być zatwierdzone przed publikacją.

Klikając "Wyślij komentarz" wyrażasz zgodę na przetwarzanie podanych w nim danych osobowych do celów moderacji i publikacji komentarza, zgodnie z polityką prywatności: polityka prywatności