본문 바로가기
도메인 주도 설계

Notification 구현 in IDDD_Samples - 메시지에 기반한 알림 발행

by simplify-len 2020. 8. 8.

 NotificationService는 메시징 인프라를 통해 DomainEvent 인스턴스를 발행하는 한 가지 방법을 제공.

- 책에서 말하는 Notification 구현
- 실제 프로젝트에서 Notification 구현
public class NotificationService {
    
    public void publishNotifications(){
        PublishedMessageTracker publishedNotificationTracker = 
                this.publishedMessageTracker();
        
        List<Notification> notifications = 
                this.listUnpublishedNotifications(
                        publishedNotificationTracker.mostRecentPublishedNotificationId()
                );

		MessageProducer messageProducer = this.messageProducer();

        try {
            for (Notification notification : notifications) {
                this.publish(notification, messageProducer);
            }

            this.trackMostRecentPublishedMessage(
                            publishedMessageTracker,
                            notifications);
        } finally {
            messageProducer.close();
        }
    }
    ...
}

publishNotifications() 메서드는 우선 PublishedMessageTracker를 가져온다.

이는 앞서 어떤 이벤트가 발행됐는지에 대한 기록을 저장하는 객체.

...
public class PublishedMessageTracker {
	private long mostRecentPulishedMessageId;
    private long trackerId;
    private String type;
    ...
}

 

 이 클래스가 도메인 모델의 일부가 아닌 애플리케이션에 속한다는 점에 주목.

trackerId는 단지 이 객체의 고유 식별자(본질적으로 엔터티)일 뿐이다. 

type 특성은 이벤트가 발행된 토픽/채널의 타입에 관한 String 설명을 갖고있다.

type 특성과 이벤트 식별자는 우리가 하나의 동일한 알림을 여러 시점에 걸쳐서 얼마든 많은 수의 토픽/채널로 발행할 수 있도록 해준다.

//    ...
    private PublishedMessageTracker publishedMessageTracker(){
        Query query = this.session().createQuery(
                "from PublishedMessageTracker as _obj_" + "where _obj_.type = ?"
        );
        query.setParameter(0, EXCHANGE_NAME);

        PublishedMessageTracker publishedMessageTracker =
                (PublishedMessageTracker) query.uniqueResult();
        
        return  publishedMessageTracker;
    }
    ...

 

다음으로, listUnpublishedNotifications() 메서드는 발행되지 않은 모든 Notification 인스턴스의 정렬된 목록을 쿼리하는 책임을 수행한다.

private List<Notification> listUnpublishedNotifications(
            long aMostRecentPublishedMessageId) {
        List<StoredEvent> storedEvents =
            this.eventStore().allStoredEventsSince(aMostRecentPublishedMessageId);

        List<Notification> notifications =
            this.notificationsFrom(storedEvents);

        return notifications;
    }

 실제론 매개변수 aMostRecentPublishedMessageId 보다 큰 eventId 값을 갖는 StoredEvent인스턴스를 찾기 위해 EventStore를 쿼리. EventStore에서 반환된 대상은 Notification인스턴스의 새로운 컬렉션을 생성하기 위해 사용

for (Notification notification : notifications) {
     this.publish(notification, messageProducer);
}		

디스패치한다.

protected void publish(
            Notification notification,
            MessageProducer messageProducer) {

        MessageParameters messageParameters = MessageParameters.durableTextParameters(
                notification.typeName(), Long.toString(notification.notificationId()), notification.occurredOn()
        );

        String notification = NotificationService.objectSerializer().serializer(notification);
        messageProducer.send(notification, messageParameters);
}

 

실제 프로젝트에서 메시지에 기반한 알림 발행이란?

일단 해당 부분은 common/port/adapter/notification/RabbitMQNotificationPublisher.java 에 존재한다.

위 책에서 나온 내용과 거의 동일한데, 메세지큐를 RabbitMQ를 활용한다면 아래와 같다.

//   Copyright 2012,2013 Vaughn Vernon
//
//   Licensed under the Apache License, Version 2.0 (the "License");
//   you may not use this file except in compliance with the License.
//   You may obtain a copy of the License at
//
//       http://www.apache.org/licenses/LICENSE-2.0
//
//   Unless required by applicable law or agreed to in writing, software
//   distributed under the License is distributed on an "AS IS" BASIS,
//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//   See the License for the specific language governing permissions and
//   limitations under the License.

package com.saasovation.common.port.adapter.notification;

import java.util.ArrayList;
import java.util.List;

import com.saasovation.common.domain.model.DomainEvent;
import com.saasovation.common.event.EventStore;
import com.saasovation.common.event.StoredEvent;
import com.saasovation.common.notification.Notification;
import com.saasovation.common.notification.NotificationPublisher;
import com.saasovation.common.notification.NotificationSerializer;
import com.saasovation.common.notification.PublishedNotificationTracker;
import com.saasovation.common.notification.PublishedNotificationTrackerStore;
import com.saasovation.common.port.adapter.messaging.rabbitmq.ConnectionSettings;
import com.saasovation.common.port.adapter.messaging.rabbitmq.Exchange;
import com.saasovation.common.port.adapter.messaging.rabbitmq.MessageParameters;
import com.saasovation.common.port.adapter.messaging.rabbitmq.MessageProducer;

public class RabbitMQNotificationPublisher implements NotificationPublisher {

    private EventStore eventStore;
    private String exchangeName;

    private PublishedNotificationTrackerStore publishedNotificationTrackerStore;

    public RabbitMQNotificationPublisher(
            EventStore anEventStore,
            PublishedNotificationTrackerStore aPublishedNotificationTrackerStore,
            Object aMessagingLocator) {

        super();

        this.setEventStore(anEventStore);
        this.setExchangeName((String) aMessagingLocator);
        this.setPublishedNotificationTrackerStore(aPublishedNotificationTrackerStore);
    }

    @Override
    public void publishNotifications() {
        PublishedNotificationTracker publishedNotificationTracker =
                this.publishedNotificationTrackerStore().publishedNotificationTracker();

        List<Notification> notifications =
            this.listUnpublishedNotifications(
                    publishedNotificationTracker.mostRecentPublishedNotificationId());

        MessageProducer messageProducer = this.messageProducer();

        try {
            for (Notification notification : notifications) {
                this.publish(notification, messageProducer);
            }

            this.publishedNotificationTrackerStore()
                .trackMostRecentPublishedNotification(
                    publishedNotificationTracker,
                    notifications);
        } finally {
            messageProducer.close();
        }
    }

    @Override
    public boolean internalOnlyTestConfirmation() {
        throw new UnsupportedOperationException("Not supported by production implementation.");
    }

    private EventStore eventStore() {
        return this.eventStore;
    }

    private void setEventStore(EventStore anEventStore) {
        this.eventStore = anEventStore;
    }

    private String exchangeName() {
        return this.exchangeName;
    }

    private void setExchangeName(String anExchangeName) {
        this.exchangeName = anExchangeName;
    }

    private List<Notification> listUnpublishedNotifications(
            long aMostRecentPublishedMessageId) {
        List<StoredEvent> storedEvents =
            this.eventStore().allStoredEventsSince(aMostRecentPublishedMessageId);

        List<Notification> notifications =
            this.notificationsFrom(storedEvents);

        return notifications;
    }

    private MessageProducer messageProducer() {

        // creates my exchange if non-existing
        Exchange exchange =
            Exchange.fanOutInstance(
                    ConnectionSettings.instance(),
                    this.exchangeName(),
                    true);

        // create a message producer used to forward events
        MessageProducer messageProducer = MessageProducer.instance(exchange);

        return messageProducer;
    }

    private List<Notification> notificationsFrom(List<StoredEvent> aStoredEvents) {
        List<Notification> notifications =
            new ArrayList<Notification>(aStoredEvents.size());

        for (StoredEvent storedEvent : aStoredEvents) {
            DomainEvent domainEvent = storedEvent.toDomainEvent();

            Notification notification =
                new Notification(storedEvent.eventId(), domainEvent);

            notifications.add(notification);
        }

        return notifications;
    }

    private void publish(
            Notification aNotification,
            MessageProducer aMessageProducer) {

        MessageParameters messageParameters =
            MessageParameters.durableTextParameters(
                    aNotification.typeName(),
                    Long.toString(aNotification.notificationId()),
                    aNotification.occurredOn());

        String notification =
            NotificationSerializer
                .instance()
                .serialize(aNotification);

        aMessageProducer.send(notification, messageParameters);
    }

    private PublishedNotificationTrackerStore publishedNotificationTrackerStore() {
        return publishedNotificationTrackerStore;
    }

    private void setPublishedNotificationTrackerStore(PublishedNotificationTrackerStore publishedNotificationTrackerStore) {
        this.publishedNotificationTrackerStore = publishedNotificationTrackerStore;
    }
}

 

먼저 통으로 코드삽입을 해서 미안하다. 그러나, 책에서 말하는 것을 복습한다는 생각으로 천천히 다시 보면 좋을 것같아 첨부한다.

생각해볼만 것은

어떠할 때 레스트풀 리소스로 알림을 발행하고, 또는 메세지큐를 활용해서 발행하는걸까?

 

댓글