Ratpack, JPA i RxJava

Ratpack, JPA i RxJava

Kontynuacja serii poświęconej nieblokującemu I/O w Javie - tym razem pokażę, jak RxJava i serwer HTTP Ratpack mogą współpracować z blokującym API takim, jak JPA.

niedziela, 13 stycznia 2019

Informatyka

Od ostatniej informatycznej publikacji na temat nieblokującego I/O w Javie minęło ponad pół roku, bowiem zrobiłem sobie w międzyczasie kilkumiesięczne wakacje (polecam!). Najwyższy czas jednak powrócić do pisania. W ostatnim artykule omawiałem integrację nieblokującego serwera HTTP Ratpack z bazą MongoDBoraz bibliotekę RxJava. Dziś zajmiemy się bardzo podobnym tematem, a mianowicie zmodyfikujemy tamten przykład tak, aby mógł współpracować z dużo starszym API dostępowym do relacyjnych baz danych znanym jako JDBC - oczywiście przykrytym warstwą ORM i standardem JPA. Oba interfejsy są w całości blokujące, tj. praktycznie każde wywołanie metody wykonującej operację I/O powoduje przerwanie wykonania bieżącego wątku. Na wersję nieblokującą musimy jeszcze trochę poczekać - Oracle pracuje nad następcą JDBC zwanym ADBA: Asynchronous Database Access API, ale jest ono wciąż na etapie projektowania. Dopóki nie wejdzie ono do którejś z przyszłych wersji Javy, musimy radzić sobie inaczej.

Przypomnienie

Przypomnijmy sobie naszą demonstracyjną aplikację z poprzedniego artykułu. Służyła ona do zarządzania artykułami w bazie. Możliwe było zarówno odczytywanie istniejących artykułów, jak i dodawanie nowych, a obie operacje wywoływaliśmy poprzez API REST-owe. Ich schemat wraz z ilustracją łańcucha przetwarzania skonstruowanego w RxJava wyglądał następująco:

Ilustracja
Łańcuch przetwarzania w demonstracyjnej aplikacji.

Integracja MongoDB z reaktywnym kodem była trywialna, ponieważ odpowiednie wsparcie posiadał sterownik tej bazy. Dodając do tego bibliotekę Ratpack uzyskiwaliśmy serwis, w którym całe I/O było robione w sposób nieblokujący i na niewielkiej liczbie wątków mogło obsługiwać spory ruch. W tradycyjnym, blokującym podejściu mielibyśmy pewną liczbę wątków, po jednym dla każdego połączenia HTTP, a przełączaniem się pomiędzy zadaniami zajmowałby się procesor do spółki z systemem operacyjnym. Choć taki model programowania jest bardzo prosty, płacimy za to słabą skalowalnością. W Javie pojedynczy wątek na starcie zabiera (domyślnie) 2 MB pamięci. Nie jest to dużo w skali kilku, kilkudziesięciu połączeń, ale przy kilku tysiącach zaczyna być odczuwalne. Ponadto na współczesnych procesorach sama operacja przełączania się jest bardzo kosztowna; dużo kosztowniejsza niż w przypadku sprzętu sprzed 20 lat. Kiedyś wykonywałem zadanie, które doskonale to obrazuje - chodziło o wykonanie prostych testów wydajnościowych serwisu, na którym współpracowało między sobą kilkadziesiąt wątków (dlaczego tak, a nie inaczej - to inna historia). Serwis uruchamiany był jednak na maszynie wirtualnej, której przydzielony był tylko jeden rdzeń procesora, więc o żadnej równoległości nie mogło być mowy. Gdy puściłem testowy ruch symulujący to, co mogło się dziać w najbardziej intensywnych momentach, popatrzyłem na to, co robił CPU. A ten 50% czasu spędzał na przełączaniu się między wątkami...

W polu istnieje ogromna liczba interfejsów I/O, które z założenia są blokujące. W szczególności jedynym ustandaryzowanym sposobem komunikacji z relacyjnymi bazami danych w Javie jest interfejs JDBC oraz standard JPA. Nowego API nie uświadczymy wcześniej niż za kilka lat. Autorską alternatywę próbuje budować również Pivotal i dostępna jest już działająca wersja, lecz tutaj z kolei jesteśmy ograniczeni małą liczbą wspieranych systemów bazodanowych. Kiedy zależy nam na dobrej integracji z technologiami wokół, jesteśmy skazani na JDBC. Schemat integracji reaktywnego/asynchronicznego kodu z takimi technologiami jest zawsze taki sam:

  1. tworzymy oddzielną pulę wątków I/O dedykowaną do wykonywania blokującego kodu,
  2. delegujemy tam wszystkie blokujące operacje z wątków, które nie mają się blokować,
  3. gdy wątki I/O przygotują wyniki, odbieramy je i wracamy do pracy na wątkach nieblokujących.

Brzmi to skomplikowanie, jednak z ogromną pomocą przychodzi nam tutaj sama RxJava, która udostępnia nam ku temu odpowiednie narzędzia. Jeśli spojrzymy ponownie na nasz schemat, nasze zadanie sprowadza się do przepisania bloczków Find i Insert oraz powiedzenia, który fragment łańcucha ma się wykonywać na naszej specjalnej puli wątków.

Przygotowanie naszej aplikacji

Aby dodać obsługę JDBC do naszej aplikacji demonstracyjnej, nie musimy w ogóle zmieniać kodu obsługującego API REST-owe. W oryginalnej wersji nasze handlery odwoływały się bezpośrednio do obiektu klasy DatabaseService, która kryła kod do komunikacji z MongoDB. Możemy ukryć ją za interfejsem Storage o następującej budowie tak, by móc ją łatwo podmienić na coś innego:

public interface Storage {
    Flowable<Article> fetchAll(int limit);
    Flowable<Result> insertAll(Flowable<Article> articles);
}

Przypomnijmy, że Flowable reprezentuje łańcuch operacji z biblioteki RxJava, który przetwarza strumień obiektów o dowolnej (potencjalnie nieskończonej) długości i wspiera przeciwciśnienie. Dzięki przeciwciśnieniu konsument (serwer HTTP) może kontrolować prędkość, z jaką producent (baza danych) produkuje wyniki. Tak naprawdę z perspektywy serwera nie obchodzi nas w ogóle, skąd biorą się artykuły i dokąd one trafiają - mamy po prostu łańcuchy operacji, z którymi się integrujemy i aby dodać obsługę relacyjnej bazy danych, musimy jedynie przygotować drugi koniec. Jego tworzenie rozpoczniemy od dodania odpowiednich encji JPA.

ArticleEntity:

@Entity
@Data
@NoArgsConstructor
public class ArticleEntity {
    @Id
    @Column
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "article_generator")
    @SequenceGenerator(name = "article_generator", sequenceName = "article_seq", allocationSize=50)
    private Integer id;

    @Column
    private String title;

    @Column
    private String content;

    @OneToMany(mappedBy = "article", cascade = CascadeType.ALL)
    private List<ReferenceEntity> references = new ArrayList<>();

    public static ArticleEntity fromDTO(Article article) {
        ArticleEntity entity = new ArticleEntity();
        entity.setTitle(article.getTitle());
        entity.setContent(article.getContent());
        for (String ref: article.getReferences()) {
            ReferenceEntity referenceEntity = new ReferenceEntity();
            referenceEntity.setReference(ref);
            referenceEntity.setArticle(entity);
            entity.getReferences().add(referenceEntity);
        }
        return entity;
    }

    public Article toDTO() {
        Article article = new Article();
        article.setId(this.getId());
        article.setTitle(this.getTitle());
        article.setContent(this.getContent());
        this.getReferences().forEach(refEntity -> article.addReference(refEntity.getReference()));
        return article;
    }
}

ReferenceEntity:

@Entity
@Data
@NoArgsConstructor
public class ReferenceEntity {
    @Id
    @Column
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "reference_generator")
    @SequenceGenerator(name = "reference_generator", sequenceName = "reference_seq", allocationSize=50)
    private Integer id;

    @Column
    private String reference;

    @ManyToOne
    private ArticleEntity article;
}

Ani JDBC, ani JPA nie wiedzą o istnieniu RxJavy, zatem musimy samodzielnie napisać implementację interfejsu Publisher, która wykona zapytanie SELECT:

public class SelectQueryPublisher implements Publisher<ArticleEntity> {
    private final Session session;
    private final String hqlQuery;
    private final int limit;

    public SelectQueryPublisher(Session session, String hqlQuery, int limit) {
        this.session = Objects.requireNonNull(session);
        this.hqlQuery = hqlQuery;
        this.limit = limit;
    }

    @Override
    public void subscribe(Subscriber<? super ArticleEntity> subscriber) {
        try {
            Query<ArticleEntity> query = session.createQuery(hqlQuery, ArticleEntity.class);
            query.setFetchSize(limit);
            ScrollableResults scroll = query.scroll();
            while (scroll.next()) {
                subscriber.onNext((ArticleEntity) scroll.get(0));
            }
            subscriber.onComplete();
        } catch (Exception exception) {
            subscriber.onError(exception);
        } finally {
            session.close();
        }
    }
}

Wydaje się proste, bowiem co można zepsuć w implementacji interfejsu z jedną metodą, zwłaszcza że po użyciu zobaczymy, że wszystko nawet działa? Niestety - kontrakt specyfikacji Reactive Streams dla implementacji interfejsu Publisher, który będzie gwarantował poprawne działanie we wszystkich sytuacjach (także dostępu z wielu wątków), jest bardzo skomplikowany. W bibliotece znajdziemy nawet specjalne narzędzia do testowania zgodności z kontraktem i powyższa implementacja oblałaby prawie wszystkie testy. Dlatego nie możemy jej użyć. Znacznie łatwiej jest napisać logikę obsługującą zapytanie SELECT w implementacji interfejsu Iterable, a następnie użyć metody Flowable.fromIterable() do przekonwertowania jej na reaktywny łańcuch. Skasujmy więc powyższy kod i zastąpmy go nową implementacją:

public class SelectQueryIterable implements Iterable<ArticleEntity> {
    private final Session session;
    private final String hqlQuery;
    private final int limit;

    public SelectQueryIterable(Session session, String hqlQuery, int limit) {
        this.session = Objects.requireNonNull(session);
        this.hqlQuery = hqlQuery;
        this.limit = limit;
    }

    @Override
    public Iterator<ArticleEntity> iterator() {
        return new Iterator<ArticleEntity>() {
            private ScrollableResults scroll;
            private ArticleEntity toReturn;

            @Override
            public boolean hasNext() {
                return (null != (toReturn = tryGetNext(getScroll())));
            }

            @Override
            public ArticleEntity next() {
                return tryGetNext(getScroll());
            }

            private ScrollableResults getScroll() {
                if (null == scroll) {
                    Query<ArticleEntity> query = session.createQuery(hqlQuery, ArticleEntity.class);
                    query.setFetchSize(limit);
                    scroll = query.scroll();
                }
                return scroll;
            }

            private ArticleEntity tryGetNext(ScrollableResults currentScroll) {
                if (null != toReturn) {
                    try {
                        return toReturn;
                    } finally {
                        toReturn = null;
                    }
                }
                if (currentScroll.next()) {
                    return (ArticleEntity) currentScroll.get(0);
                }
                currentScroll.close();
                session.close();
                return null;
            }
        };
    }
}

Niestety ze względu na specyfikę kursorów Hibernate'a, ten kod także nie jest zbyt czytelny, niemniej swoje zadanie spełni. Idea polega na tym, że zapytanie HQL wykonamy leniwie przy pierwszym odwołaniu do hasNext() bądź next(). Ponadto musimy obsłużyć sytuację, gdy ktoś zawoła next() bez wcześniejszego wywołania hasNext() - zajmuje się tym metoda tryGetNext(). Na dłuższą metę warto tę logikę opakować w ogólną implementację i jedynie dostarczać do niej w formie strategii gotowe zapytanie do wykonania.

Do obsługi wstawiania także napiszemy trochę autorskiego kodu. Nasza przykładowa aplikacja będzie wspierała zapisywanie artykułów w paczkach po 100, przy czym każda paczka zostanie dodana w osobnej transakcji. Tutaj dla odmiany RxJava bardzo nam pomoże. Klasa BatchInsert nie będzie implementować żadnego specjalnego interfejsu - po prostu umieścimy w niej logikę wstawiania do bazy tak, by nie mieszała się nam z kodem do budowy łańcucha operacji.

public class BatchInsert {
    private final SessionFactory sessionFactory;

    public BatchInsert(SessionFactory sessionFactory) {
        this.sessionFactory = Objects.requireNonNull(sessionFactory);
    }

    public List<Result> batchInsert(List<ArticleEntity> articles) {
        try (Session session = sessionFactory.openSession()) {
            Transaction transaction = session.beginTransaction();
            try {
                articles.forEach(session::merge);
                session.flush();
                transaction.commit();
                return createBatchResult(articles.size(), Result::success);
            } catch (RuntimeException exception) {
                transaction.rollback();
                return createBatchResult(articles.size(), () -> Result.failure(exception.getMessage()));
            }
        }
    }

    private List<Result> createBatchResult(int size, Supplier<Result> resultSupplier) {
        List<Result> results = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            results.add(resultSupplier.get());
        }
        return results;
    }
}

Nasze API REST-owe zakłada, że wstawiając 100 artykułów, na wyjściu dostaniemy 100 obiektów Result opisujących wynik wstawiania każdego artykułu. W MongoDB ma to duży sens, ponieważ każdy artykuł wstawiany jest niezależnie i możemy sobie wyobrazić sytuację, w której dodanie 98 artykułów się powiodło, a publikacja dwóch została z jakiegoś powodu wstrzymana. Transakcja działa trochę inaczej - tutaj albo dodajemy wszystko, albo nie dodajemy nic. W kodzie jest to odwzorowane tak, że jeśli robimy commit(), to generujemy N sukcesów, a jeśli dostaliśmy wyjątek, to N niepowodzeń.

Najwyższa pora poskładać wszystko razem i zaimplementować zaprezentowany na początku interfejs Storage. Zacznijmy od ogólnego kodu serwisu RelationalStorage i inicjalizacji Hibernate'a:

public class RelationalStorage implements Storage {
    private static final int BATCH_INSERT_SIZE = 100;

    private final SessionFactory sessionFactory;
    private final Scheduler blockingIOScheduler;

    public RelationalStorage(Scheduler scheduler) {
        this.blockingIOScheduler = Objects.requireNonNull(scheduler);
        final StandardServiceRegistry registry = new StandardServiceRegistryBuilder()
            .configure()
            .build();
        try {
            this.sessionFactory = new MetadataSources(registry).buildMetadata().buildSessionFactory();
        } catch (RuntimeException exception) {
            StandardServiceRegistryBuilder.destroy(registry);
            throw exception;
        }
    }
}

Tym, co nas w powyższym kodzie musi zainteresować, jest tajemniczy argument konstruktora, czyli obiekt klasy Scheduler. Klasa ta pochodzi z ekosystemu RxJava i jest niczym innym, jak pulą wątków przystosowaną do pracy w środowisku reaktywnym. W naszym przypadku będzie to pula wątków do wykonywania blokujących operacji I/O. Tak wyposażeni możemy przystąpić do zaimplementowania obu metod wymaganych przez nasz interfejs Storage. Zacznijmy od czytania danych:

@Override
public Flowable<Article> fetchAll(int limit) {
    Session session = sessionFactory.openSession();
    return Flowable
        .fromIterable(new SelectQueryIterable(session, "FROM ArticleEntity", limit))
        .map(ArticleEntity::toDTO)
        .subscribeOn(blockingIOScheduler);
}

Ogromną siłą RxJavy (i całej specyfikacji Reactive Streams) jest łatwość modelowania współbieżności. To sprawia, że obsługa blokującego I/O robi się banalna - wystarczy bowiem użyć operatora subscribeOn() do skierowania wykonania całego łańcucha do puli wątków I/O. Wspomniany operator może być wywołany w dowolnym miejscu, również przed .map(). Tak zbudowany łańcuch zwracamy do kodu obsługującego API REST-owe. Ratpack samodzielnie zadba o przesłanie wyników do wątku obsługi połączeń, odpowiednio konfigurując swoją część łańcucha. Spójrzmy zatem na obsługę dodawania. Podobnie, jak wariancie dla MongoDB, zarówno producentem, jak i końcowym konsumentem będzie tutaj Ratpack, my zaś wypełnimy tylko środkową część łańcucha, która wstawi artykuły do bazy i "zamieni" je na wyniki:

@Override
public Flowable<Result> insertAll(Flowable<Article> articles) {
    BatchInsert inserter = new BatchInsert(sessionFactory);
    return articles
        .map(ArticleEntity::fromDTO)
        .buffer(BATCH_INSERT_SIZE)
        .observeOn(blockingIOScheduler)
        .map(inserter::batchInsert)
        .flatMapIterable(results -> results)
        .onErrorReturn(ex -> Result.failure(ex.getMessage()));
}

Kluczowym operatorem jest tutaj .observeOn(), który również służy do sterowania tym, na jakim wątku będziemy wykonywać obliczenia. Jednak w przeciwieństwie do .subscribeOn(), tutaj mówimy, że operacje od tego miejsca mają wykonywać się na puli wątków I/O. Pozostałe operatory wykonują transformację artykułów, grupowanie ich w paczki (operator .buffer()) oraz konwersję łańcucha paczek wyników na łańcuch pojedynczych wyników (.flatMapIterable()). Podobnie jak w poprzednim przykładzie, także i tutaj Ratpack samodzielnie zadba o to, by na końcu obsługa powróciła do wątku obsługi połączeń HTTP.

Na koniec przypatrzmy się jeszcze, w jaki sposób utworzyć reaktywną pulę wątków:

Scheduler reactiveScheduler = Schedulers.from(Executors.newFixedThreadPool(BLOCKING_IO_THREAD_NUM));

Podsumowanie

Najważniejszymi informacjami, które powinniśmy zapamiętać z tego artykułu, są:

  1. aby obsłużyć blokujące I/O w reaktywnym, asynchronicznym kodzie, musimy je wydelegować do wykonania na oddzielnej puli wątków, gdzie dopuszczamy blokowanie,
  2. unikajmy pisania własnej implementacji interfejsu Publisher, gdyż kontrakt gwarantujący jej poprawne działanie w każdych warunkach nie jest trywialny do spełnienia - zamiast tego użyjmy np. Iterable oraz metody Flowable.fromIterable(),
  3. mamy do dyspozycji dwa świetne operatory do sterowania tym, na jakim wątku wykonuje się łańcuch operacji lub jego część: .subscribeOn() oraz .observeOn(),
  4. implementacja funkcjonalności batch insert w reaktywnym środowisku jest trywialna dzięki operatorowi .buffer().

Zakończenie

To koniec naszej przygody z Ratpackiem, ale bynajmniej nie z programowaniem reaktywnym ani z asynchronicznym I/O. W planie mam napisanie jeszcze dwóch artykułów. Pierwszy z nich będzie miał raczej charakter popularno-naukowy i pokaże historię rozwoju asynchroniczności oraz kilka interesujących szczegółów implementacyjnych na poziomie systemu operacyjnego i JVM. Drugi natomiast będzie reimplementacją stworzonego tutaj przykładu w Spring Framework + WebFlux. Pełen kod źródłowy stworzonej tutaj aplikacji, który obsługuje zarówno MongoDB, jak i Hibernate'a, znaleźć można na GitHubie: github.com/zyxist/ratpack-rxjava-mongodb-jdbc.

Tomasz Jędrzejewski

Programista Javy, lider techniczny. W wolnych chwilach podróżuje, realizując od kilku lat projekty długodystansowych wypraw pieszych.

Autor zdjęcia nagłówkowego: Jared Tarbell, CC-BY-2.0

zobacz inne wpisy w temacie

Informatyka

poprzedni wpis Struktura testów jednostkowych następny wpis zone84 na start

Komentarze (0)

Skomentuj

Od 3 do 40 znaków.

Wymagany, anonimizowany po zatwierdzeniu komentarza.

Odpowiedz na pytanie.

Edycja Podgląd

Od 10 do 8000 znaków.

Wszystkie komentarze są moderowane i muszą być zatwierdzone przed publikacją.

Klikając "Wyślij komentarz" wyrażasz zgodę na przetwarzanie podanych w nim danych osobowych do celów moderacji i publikacji komentarza, zgodnie z polityką prywatności: polityka prywatności