본문 바로가기
개발 관련됨/개발 이슈를 해결함

Message Relay 를 PollingPublisher 방식으로 구현하기

by simplify-len 2022. 6. 19.

 

 

트랜잭션 아웃박스 패턴

 

이벤트 소싱을 하기 위해서는 트랜잭션 아웃박스 패턴을 구현해야 합니다. 당연히 트랜잭션 아웃박스 패턴에 대한 이해가 있어야 합니다. 다만, 해당 포스트에서는 자세한 트랜잭션 아웃박스 패턴을 다루지 않을 계획입니다.

 

다시 트랜잭션 아웃박스 패턴를 표현하는 위 그림에서 네모난 부분이 바로 메세지릴레이를 수행하는 곳입니다.

 

데이터베이스에 특정 데이터가 Insert 되면, 변경분에 대해서 MessageRelay 는 특정 데이터를 읽어 Message Broker 에게 전달한다.

 

이 때 MessageRelay 의 구현방식은 2가지가 있습니다.

1. Polling publisher

2. Transaction log tailing

 

1. Polling publisher 는 일정 주기마다 변경분을 조회해서 메세지를 전달하는 방식이고, 2. Transaction log tailing  MySQL binlog, Postgres WAL 등을 활용해 DB 의 Raw 한 영역에서 Offset을 조회하는 방식입니다.

 

그러나, 대부분 Message Relay 를 구현할 경우, Polling publisher을 추천하지 않습니다.

그 이유는 크게 3가지입니다.

1. 순서를 보장할 수 없습니다.

2. 주기적으로 계속 데이터를 가져오기 때문에, 가져올 데이터가 없을 경우에도 리소스가 발생합니다.

3. NoSQL 에서는 해당 방식이 적용되기 힘듭니다.

 

그러나, 이번 이벤트스토어를 구축하면서 Polling publisher 방식을 활용했습니다. Transaction log tailing에 비해 러닝커브가 크지 않고, 개발자가 통제가능한 환경을 손쉽게 만들 수 있기 때문입니다.

 

단점을 알기 때문에, 이 단점을 최대한 해결할 수 있는 방안으로 문제를 해결합니다.

 

어떻게 구현했는가?

MessageRelay OverView

 

구체적인 알고리즘

 

개발을 하기 전에 Sheet 로 어떻게 알고리즘을 구현해야 하는지 그렸던 부분입니다.

 

핵심은 '어디까지 메세지를 전송했는가?' 라는 것을 표현하는 것입니다. Offset(most recent proccess position, begin, size) 을 코드로 표현되어져야 합니다.

 

class SimplePullingEventsTest {

    SimplePullingEvents sut;
    
    List<String> eventsCollections;
    @BeforeEach
    void setUp() {
        eventsCollections = new ArrayList<>();
        sut = new SimplePullingEvents(eventsCollections);
    }

    @Test
    void next() {
        PulledOffset pulledOffset = sut.next();
        PulledOffset expected = PulledOffset.of(0,0,0);
        Integer size = expected.getSize();
        Integer begin = expected.getBegin();
        Integer mostRecentProcessPosition = expected.getPosition();

        assertThatOffsetTriple(pulledOffset, mostRecentProcessPosition, begin, size);
    }

    @Test
    void next2() {
        eventsCollections.add("1");

        PulledOffset pulledOffset = sut.next();

        PulledOffset expected = PulledOffset.of(0,1,1);
        Integer size = expected.getSize();
        Integer begin = expected.getBegin();
        Integer mostRecentProcessPosition = expected.getPosition();

        assertThatOffsetTriple(pulledOffset, mostRecentProcessPosition, begin, size);

    }

    @Test
    void next3() {
        eventsCollections.add("1");

        sut.next();

        eventsCollections.add("2");
        eventsCollections.add("3");

        PulledOffset pulledOffset = sut.next();

        PulledOffset expected = PulledOffset.of(1,2,2);
        Integer size = expected.getSize();
        Integer begin = expected.getBegin();
        Integer mostRecentProcessPosition = expected.getPosition();

        assertThatOffsetTriple(pulledOffset, mostRecentProcessPosition, begin, size);

    }

    @Test
    void next4() {
        eventsCollections.add("1");

        sut.next();

        eventsCollections.add("2");
        eventsCollections.add("3");

        sut.next();

        eventsCollections.add("4");
        eventsCollections.add("5");
        eventsCollections.add("6");
        eventsCollections.add("7");
        eventsCollections.add("8");

        PulledOffset pulledOffset = sut.next();

        PulledOffset expected = PulledOffset.of(3,4,5);
        Integer size = expected.getSize();
        Integer begin = expected.getBegin();
        Integer mostRecentProcessPosition = expected.getPosition();

        assertThatOffsetTriple(pulledOffset, mostRecentProcessPosition, begin, size);

    }

    @Test
    void next5() {
        eventsCollections.add("1");

        sut.next();

        eventsCollections.add("2");
        eventsCollections.add("3");

        sut.next();

        eventsCollections.add("4");
        eventsCollections.add("5");
        eventsCollections.add("6");
        eventsCollections.add("7");
        eventsCollections.add("8");
        sut.next();
        eventsCollections.add("9");
        eventsCollections.add("10");
        eventsCollections.add("11");

        PulledOffset pulledOffset = sut.next();

        PulledOffset expected = PulledOffset.of(8,9,3);
        Integer size = expected.getSize();
        Integer begin = expected.getBegin();
        Integer mostRecentProcessPosition = expected.getPosition();

        assertThatOffsetTriple(pulledOffset, mostRecentProcessPosition, begin, size);

    }

    private void assertThatOffsetTriple(PulledOffset pulledOffset,
                                        Integer mostRecentProcessPosition, Integer begin, Integer size) {
        assertThat(pulledOffset.getSize()).isEqualTo(size);
        assertThat(pulledOffset.getBegin()).isEqualTo(begin);
        assertThat(pulledOffset.getPosition()).isEqualTo(mostRecentProcessPosition);
    }
}

위 테스트 코드를 통과시키며 Sheet 에 있던 내용을 코드로 구현했습니다.

@NoArgsConstructor
public class SimplePullingEvents {

    private final Stack<PulledOffset> prevPulledOffset = new Stack<>();

    private List<String> eventsCollections;

    public SimplePullingEvents(List<String> eventsCollections) {
        this.eventsCollections = eventsCollections;
        prevPulledOffset.push(PulledOffset.of(0, 0, 0));

    }

    public PulledOffset next() {
        if (eventsCollections.isEmpty()) {
            return PulledOffset.of(0, 0, 0);
        }
        PulledOffset prevOffset = prevOffset();
        int prevMostRecentProcessPosition = prevOffset.getPosition() + prevOffset.getSize();
        int begin = prevMostRecentProcessPosition + 1;

        List<String> strings = eventsCollections.subList(begin - 1, eventsCollections.size());
        strings.stream().peek(System.out::println);
        int size = strings.size();

        PulledOffset currentOffset = PulledOffset.of(prevMostRecentProcessPosition, begin, size);
        prevPulledOffset.push(currentOffset);
        return currentOffset;
    }

    public PulledOffset prevOffset() {
        return prevPulledOffset.peek();
    }
}

 

Sheet 를 활용한 알고리즘을 코드로 표현을 하더라도 이것을 Spring Data 로 구현하는 것은 또다른 구현문제입니다.

 

이 부분에 대해서는 @Query  방식으로 문제를 해결했습니다.

@Query("SELECT * FROM events AS e WHERE e.occurred_at > :occurredAt AND e.occurred_at > to_timestamp('2022-06-13T18:50:00:00', 'YYYY-MM-DDTHH:MI:SS') ORDER BY e.occurred_at ASC")
List<EventsJdbcEntity> findAllEventsFromLatestOccurredAt(@Param("occurredAt") LocalDateTime occurredAt);

이렇게 하더라도, @Query 는 miliseconds 까지 고려하지 못하므로, 초당 데이터를 가져와서, 어플리케이션에서 한번 더 정렬을 시도해야 합니다.

 

[참고자료]

https://microservices.io/patterns/data/transaction-log-tailing.html

 

Microservices Pattern: Transaction log tailing

Microservices.io is brought to you by Chris Richardson. Experienced software architect, author of POJOs in Action, the creator of the original CloudFoundry.com, and the author of Microservices patterns. Chris helps clients around the world adopt the micros

microservices.io

 

댓글