Spring Integration
2023-04-29 19:24
스프링(Spring)과 기업 통합 패턴(Enterprise Integration Patterns, EIP)을 사용하여 어플리케이션 간 통합을 구현하는 Spring Integration에 대해 설명합니다. Spring Integration은 스프링 핵심 요소인 DI, AOP, PSA를 사용하여 고도로 추상화된 모델을 제공합니다. 함께 알아봅시다.
개요
스프링 기본 개념 및 설계 원칙을 이해하고 있으면 이해하기 수월합니다.
완전한 통합 애플리케이션을 만드는것은 상당히 지난한 작업을 필요로 한다. 많은 기업들은 완전한 하나의 애플리케이션이 아니라 각기 역할을 가지는 여러 애플리케이션으로 나누어 비즈니스를 해결한다. 이처럼 여러 애플리케이션으로 나누어졌을 때 각각의 애플리케이션은 담당 비즈니스에 특화하여 만들어질수 있는 유연성이 생기게 된다. 고객(사용자 또는 파트너)이 기업의 애플리케이션을 사용할때 기업 제품의 비즈니스 복잡성은 신경쓰지 않는데, 이는 기업의 여러 애플리케이션간의 상호작용을 통해 문제가 해결되기 때문이다. 예를 들어, 고객이 쇼핑몰에서 상품을 주문하는 것은 회원 인증 / 재고 확인 / 결제 인증 / 결제 승인 / 송장 발급 등 여러가지 애플리케이션간 상호작용을 통해 이루어진다.애플리케이션을 관통하는 고객 비즈니스 시나리오에서 애플리케이션 간의 데이터 공유를 지원하려면 애플리케이션 통합이 필요하다. Spring Integration의 목표와 원칙은 다음과 같다.
목표
- EIP 를 구현하기 위한 고수준 추상화 모델 제공
- Spring 기반 애플리케이션 내에서 비동기식 메시지 기반 동작
원칙
- 느슨한 결합과 비 침투성
- 기능성 / 비 기능성 관심사의 분리
- 스프링 핵심 요소인 DI를 이용하여 기능과 비 기능 영역의 분리를 목표로 한다. 따라서 통합을 위한 코드는 비 침투적이며, 비즈니스 코드에 집중할 수 있게 된다.
- 재사용성 / 이식성 제고를 위한 추상화
기업 통합 패턴(Enterprise Integration Patterns, EIP)이란 다양한 기업 통합 문제를 해결하기 위해서 패턴화 된 모델들을 말한다. 기업 통합 패턴을 알고 있다면 보다 자연스러운 모델 사용이 가능하겠지만, 우리는 스프링에 익숙하다고 가정한 만큼 이후 소개할 구성요소를 먼저 훑어 보도록 하자.
주요 컴포넌트
스프링 애플리케이션은 계층형 구조(Layered Architecture)를 따라 계층별 관심사를 분리하고 인터페이스에 따른 느슨한 결합을 지향한다. Spring Integration은 수직적 관점을 포함하면서도 메시지가 앱 내/외로 흘러다녀야 하기 때문에 메시지 기반의 아키텍쳐를 구현하고 있으며 이는 Pipes and Filters 라는 추상화된 모델을 따르고 있다. 아래 그림은 Spring Integration의 필터 모델과 파이프 모델이 계층형 구조에도 어떤 식으로 잘 녹아드는지 보여주고 있다.
Filter : 메세지를 소비하거나 생성하는 컴포넌트 Pipe : 필터간 메세지를 전송하는 컴포넌트
Pipes and Filters 모델은 다음과 같은 구조를 가진다.
메세지
Spring Integration에서 사용하는 단순한 메세지 모델이다.
Header
- ID
- timestamp
- 기타 Values
Payload
- 무엇이든 가능
Message Channel
Spring Integration에서 제공하는 Pipe(필터간 메세지를 전송하는 컴포넌트) 모델이다. 메세지를 송수신 하는 필터 모델간의 느슨한 결합을 도와준다.
스프링에서 제공하는 파이프 모델은 크게 두가지의 형태로 생산자와 소비자를 연결한다.
- Point to Point 인 1:1 방식
- Pub / Sub 인 1:N 방식
MessageChannel Interface
Spring Integration에서 제공하는 기본 채널 인터페이스
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
다른 Filter 모델에 메세지를 보낸다.
PollableChannel Interface
메세지 채널을 확장한다. Pollable 기능이 추가되었고 필요에 따라 메세지를 버퍼링하여 받을 수 있다.
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
채널의 필요에 따라서 메시지를 보내는것 뿐만 아니라 Filter 모델 또는 데이터 원천으로부터 메시지를 받아올 수도 있음을 알 수 있다.
SubscribableChannel
기본 인터페이스를 확장한다. 구독자를 등록 또는 해지 할 수 있다.
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
앞서 파이프 모델은 크게 두가지(1:1 / 1:N)의 형태로 생산자와 소비자를 연결한다고 설명하였다.채널 인터페이스의 구현체들은 다음과 같다.
아래 코드에서 해당 인터페이스를 구현한 클래스가 N개의 소비자를 어떻게 연결하고 메세지를 발송하는지 알아보자.
AbstractSubscribableChannel
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
implements SubscribableChannel, SubscribableChannelManagement {
@Override
public int getSubscriberCount() {
return getRequiredDispatcher().getHandlerCount();
}
//디스패처(발송 대리자) 에 핸들러를 등록합니다.
@Override
public boolean subscribe(MessageHandler handler) {
MessageDispatcher dispatcher = getRequiredDispatcher();
boolean added = dispatcher.addHandler(handler);
adjustCounterIfNecessary(dispatcher, added ? 1 : 0);
return added;
}
//디스패처(발송 대리자)에 등록된 핸들러를 해지합니다.
@Override
public boolean unsubscribe(MessageHandler handle) {
MessageDispatcher dispatcher = getRequiredDispatcher();
boolean removed = dispatcher.removeHandler(handle);
this.adjustCounterIfNecessary(dispatcher, removed ? -1 : 0);
return removed;
}
private void adjustCounterIfNecessary(MessageDispatcher dispatcher, int delta) {
if (delta != 0 && logger.isInfoEnabled()) {
logger.info("Channel '" + getFullChannelName() + "' has " + dispatcher.getHandlerCount()
+ " subscriber(s).");
}
}
//디스패처(발송 대리자)에 메세지를 전달합니다.
@Override
protected boolean doSend(Message<?> message, long timeout) {
try {
return getRequiredDispatcher().dispatch(message);
}
catch (MessageDispatchingException ex) {
String description = ex.getMessage() + " for channel '" + getFullChannelName() + "'.";
throw new MessageDeliveryException(message, description, ex);
}
}
private MessageDispatcher getRequiredDispatcher() {
MessageDispatcher dispatcher = getDispatcher();
Assert.state(dispatcher != null, "'dispatcher' must not be null");
return dispatcher;
}
protected abstract MessageDispatcher getDispatcher();
}
DirectChannel (Unicast)
public class DirectChannel extends AbstractSubscribableChannel {
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
private volatile Integer maxSubscribers;
. . .
}
PublishSubscribeChannel (Brodcast)
public class PublishSubscribeChannel extends AbstractExecutorChannel implements BroadcastCapableChannel {
. . .
@Override
protected BroadcastingDispatcher getDispatcher() {
return (BroadcastingDispatcher) this.dispatcher;
}
}
PublishSubscribeChannel
1개의 생산자와 N 개의 소비자를 연결할때 사용하는 채널이다. 실제 구현 코드에서는 MessageChannel 인터페이스의 send 메소드를 호출하면 N개의 소비자에 메세지를 보낸다. PollableChannel 구현체와 다르게 send를 통해서 생산자가 소비자에게 메세지를 푸쉬하는것을 기억하도록 하자. 아래는 채널이 사용하는 디스패처(발송 대리자) 구현체의 일부분이다.
BrodCastingDispatcher
@Override // NOSONAR complexity
public boolean dispatch(Message<?> message) {
. . . 생략 . . .
for (MessageHandler handler : handlers) {
if (this.applySequence) {
messageToSend = getMessageBuilderFactory()
.fromMessage(message)
.pushSequenceDetails(sequenceId, sequenceNumber++, sequenceSize)
.build();
if (message instanceof MessageDecorator) {
messageToSend = ((MessageDecorator) message).decorateMessage(messageToSend);
}
}
if (this.executor != null) {
//별도의 쓰래드 풀이 존재하는 경우.
Runnable task = createMessageHandlingTask(handler, messageToSend);
this.executor.execute(task);
dispatched++;
}
else {
// 존재하지 않으면 순차적으로 브로드 캐스팅
if (this.invokeHandler(handler, messageToSend)) {
dispatched++;
}
}
}
. . . 생략 . . .
}
Message Endpoints
Spring Integration에서 제공하는 Filter 모델이다. 메세지를 생산하거나 수신한다.Message Endpoints 는 Pipes and Filters 패턴에서 Filter 역할을 담당한다. 스프링에서 정의한 Filter 역할을 수행하는 모델들은 Filter의 범주에서 기능이 조금씩 다를뿐이므로 빠르게 확인해보자.
Message Transformer
메세지를 변환하는 엔드포인트이며, 페이로드를 변환한다.헤더의 값을 Create/Update/Delete하는 역할을 수행한다.
Message Filter
수신한 메세지를 Filtering하여 출력 채널에 전달 여부를 결정한다. (Java Stream의 filter를 떠올리면 된다.)필요에 따라선 예외를 발생시킬 수 있다.
Message Router
다음 메세지를 수신해야 하는 채널을 결정한다.
Spliter
메세지를 나누어 여러 채널로 보낼 수 있다.일반적으로 복합 메세지를 세분화된 메세지 그룹으로 나누는데 사용한다.
Aggregator
세분화된 메세지 그룹을 합친다.병합에 대한 정책을 정해야 하기 때문에 복잡도가 높다.
Service Activator
입력 채널의 메세지를 서비스와 연결하는 역할을 수행한다.입력 메세지는 메세지 핸들러에 의해 처리되고 출력 채널로 전달된다.
명시적으로 출력 채널을 설정하지 않은 경우 메세지에 설정된 반환 주소로 메세지를 전달한다.
Channel Adapter
메세지 채널을 다른 시스템과 연결해주는 어댑터 역할을 한다.
다른 시스템은 파일 시스템이 될 수도, Redis와 같은 DB 가 될 수도 있다.
어댑터는 입력 또는 출력의 역할을 모두 수행할 수 있다.
Spring Integration 구성
- Spring Integration을 사용하는 가장 상위의 컨텍스트에 선언해야 한다. (이는 하위 컨텍스트에 사용하고 있다고 하더라도 Spring Integration 사용이 가능하다.)
- 스프링 프레임워크는 Spring integration 네임스페이스 요소를 찾게 되면 관련 빈들(Task Scheduler/ Channel Creator 등등)을 자동으로 등록한다.
- @EnableIntegration 어노테이션을 사용시 Spring Integration의 인프라 구성요소를 ApplicationContext 에 등록한다.
등록되는 빈에 대해서는 다음 링크를 통해 자세한 내용을 보도록 하자.
Overview of Spring Integration Framework
주의사항
Spring Integration은 구성요소 작성시 POJO의 형태로 작성하는것을 권장한다. 특히 POJO 작성시 Spring 애플리케이션 LifeCycle과 관련된 메소드를 사용하지 말아야 한다. 이는 초기화가 완료되지 않은 경우 메세지 발송 실패와 같은 부작용이 발생 할 수 있다. 다음 예제를 참고하자.
@ServiceActivator
public String myService(String payload) { ... }
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
Practice
실제 프로젝트에 적용한 예제를 기준으로 통합 코드를 이해 해 보자.이벤트 생성과 소비가 분리되있음을 기억하자.
선언부
@Slf4j
@Configuration
public class EventChannelConfig {
//파이프 모델
@Bean
public PublishSubscribeChannel helloMessageChannel() {
return new PublishSubscribeChannel();
}
//필터 모델
//채널에 메세지가 발생하면 리스닝하여 소비한다
@Bean
public EventDrivenConsumer eventDrivenConsumer() {
EventDrivenConsumer consumer = new EventDrivenConsumer(helloMessageChannel(), helloMessageHandler());
return consumer;
}
//메세지 로거
@Bean
public MessageHandler helloMessageHandler() {
return message -> {
HelloApplicationMessage payload = (HelloApplicationMessage) message.getPayload();
log.warn("[HelloMessageHandler] helloMessageHandler {}", payload.getMessage());
};
}
//메세지 생성자 채널에 메세지를 전달한다
public static class HelloMessagePublisher {
public void publish(SubscribableChannel channel) {
HelloApplicationMessage message = new HelloApplicationMessage("hello world");
GenericMessage genericMessage = new GenericMessage<>(message);
channel.send(genericMessage);
}
}
테스트 코드
@Slf4j
@SpringBootTest
public class HelloMessagePublisherTest {
@Qualifier("helloMessageChannel")
@Autowired
private PublishSubscribeChannel helloMessageChannel;
@Qualifier("eventDrivenConsumer")
@Autowired
private EventDrivenConsumer eventDrivenConsumer;
@Qualifier("helloMessageHandler")
@Autowired
private MessageHandler helloMessageHandler;
@Test
public void testHelloMessageLog() throws InterruptedException {
EventChannelConfig.HelloMessagePublisher publisher = new EventChannelConfig.HelloMessagePublisher();
publisher.publish(helloMessageChannel);
Thread.sleep(1000L);
}
}
테스트 결과
맺음말
통합이란 무엇인가라는 간단한 질문에서부터 시작한 Spring Integration의 정리는 사실 프로젝트에 적용되었을때만 하더라도 오버 엔지니어링이라고 생각했었다. 하지만 MSA기반의 아키텍쳐를 지향하는 오늘날 통합은 어찌보면 당연히 해야하는 과제이고 방법의 차이일 뿐이라고 생각한다. Spring Integration은 스프링의 설계 원칙을 계승하여 만들어졌기 때문에 이미 스프링에 익숙한 사람이라면 통합에 대해 EIP 부터 볼 필요 없이 단순히 Spring Integration만 잘 구현하더라도 패턴화 된 통합을 쉽게 이룰 수 있을거라 생각한다.