MQTT 의 이해부터 테스트까지 (feat. POS 연동)

손유진
테이블링 기술블로그
15 min readNov 20, 2023

--

안녕하세요. 백엔드팀 손유진입니다.

테이블링은 테이블에서 QR/NFC 를 태그하여 메뉴를 확인하고 주문할 수 있는 테이블오더 서비스를 제공하고 있습니다. 테이블오더로 주문한 내역은 POS 에 자동으로 전달되어 매장은 관리 리소스를 효율적으로 활용할 수 있는 장점을 가지고 있습니다.

여기서 우리는 POS Agent 와 테이블링 내부 서버의 안정적인 통신을 위해 MQTT 적용을 고민하고 있습니다. 이에 따라 이 글에서는 MQTT 에 대해 학습한 내용과 테스트 과정을 공유하고자 합니다.

MQTT 는 낮은 대역폭과 리소스가 제한된 환경에서 안정적으로 통신을 할 수 있는 경량 메시지 프로토콜입니다. 작은 데이터 패킷을 전송하여 효율성이 높고 오버헤드와 전력 소비가 작아 IoT와 M2M 통신에 많이 사용되고 있습니다.

MQTT 의 이해

Publisher/Subscriber 모델

MQTT 는 클라이언트와 브로커로 구성되어 있으며, 클라이언트가 Topic 에 메시지를 발행하면 해당 Topic 을 구독 중인 클라이언트가 메시지를 수신할 수 있습니다.

클라이언트는 여러 Topic 을 구독할 수 있고 모든 클라이언트가 발행과 구독을 할 수 있어 Publisher 와 Subscriber 의 구분이 따로 없습니다.

Publish-Subscribe 방식

Topic은 브로커가 연결된 클라이언트에 대한 메시지를 필터링 하기 위해 사용되며 슬래시(/) 로 구분하는 계층적인 구조를 가집니다.

Topic 예시 (출처: https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/)

QoS 제어

다양한 QoS(Quality Of Service) 레벨을 지원하여 메시지 전달의 보장을 제어할 수 있습니다.

QoS 0

  • Fire and forget.
  • Publisher 가 메시지를 보내고, 이상이 없으면 바로 완료.
  • 더 이상 관여하지 않음.

QoS 1

  • At least once.
  • Publisher 가 메시지를 보내면 Subscriber 가 메시지 확인 응답할 때까지 여러 번 전송.
  • 메시지 중복이 허용되는 상황에서 사용.

QoS 2

  • Exactly once.
  • 가장 안정적인 메시지 전송이 필요할 때 사용.
  • 메시지가 브로커에 정확히 한 번 전달됨.

지속성과 안정성

영구 세션(Persistent Session)을 통해 클라이언트에 전송되었지만 수신이 확인되지 않은 메시지를 다시 전달할 수 있습니다.

영구 세션을 활성화 하기 위해서는 MQTT 버전에 따라 클라이언트 옵션에 차이가 있지만 공통적으로 클린 세션(Clean Session)을 비활성화 하고 메시지의 QoS 레벨을 1이상으로 전송할 때 메시지가 다시 전달됩니다.

MQTT 버전 별 CleanSession 설정 (출처: https://www.emqx.com/en/blog/mqtt5-new-feature-clean-start-and-session-expiry-interval)

MQTT 활용 예시

MQTT 활용 예시 (출처: https://www.hivemq.com/blog/mqtt-essentials-part-1-introducing-mqtt/)

이 이미지는 모바일 장치로 “open door” 메시지를 발행하면 해당 Topic 을 구독 중인 클라이언트가 수신하여 처리한 후 성공 여부를 응답 Topic에 전달해 모바일 앱에 완료를 알리는 과정을 보여줍니다.

패킷의 responseTopic 은 클라이언트가 메시지 발행 후 응답 메시지를 받을 Topic 이고, correlationData 는 메시지에 대한 식별자로 사용됩니다.

이러한 Request/Response 패턴은 MQTT 5.0 버전에서 도입되었으며, 실제 서비스에서 클라이언트가 처리 완료 여부를 사용자에게 알리는 데 유용하게 활용됩니다.

MQTT 통신 테스트

목적

테이블오더와 POS 시스템 연동 과정

현재 테이블오더가 들어오면 WebSocket 으로 POS 에 설치된 Agent 에게 메시지를 전달하여 주문 정보를 연동하고 있습니다.

이 때 소켓 연결이 간헐적으로 끊기면서 메시지를 수신하지 못해 POS 에 주문을 전송하지 못하는 이슈가 있었습니다. 이 문제를 해결하기 위해 MQTT 적용 시 메시지 전달이 신뢰성 있게 보장되는지 중점적으로 테스트해 보았습니다.

테스트 환경 구성

브로커는 Amazon MQ의 ActiveMQ를 활용하고, NestJS 프레임워크를 이용하여 구현해 보았습니다.
(ActiveMQ 는 MQTT v3.1.1 과 v3.1 만 지원합니다.)

  1. 필수 패키지 설치
$ npm i --save @nestjs/microservices
$ npm i --save mqtt

2. Client (Publisher)

  • app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';
import { PublisherService } from './service/publisher.service';

@Module({
imports: [ConfigModule.forRoot()],
controllers: [AppController],
providers: [
{
provide: 'MQTT_EXAMPLE',
inject: [ConfigService],
useFactory: (configService: ConfigService) =>
ClientProxyFactory.create({
transport: Transport.MQTT,
options: {
clientId: 'tabling-mqtt-example-app_01',
protocol: configService.get('MQTT_PROTOCOL'),
url: configService.get('MQTT_URL'),
username: configService.get('MQTT_USERNAME'),
password: configService.get('MQTT_PASSWORD'),
clean: false,
subscribeOptions: {
qos: 2,
},
},
}),
},
PublisherService,
],
})
export class AppModule {}
  • publisher.service.ts
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { QoS } from '@nestjs/microservices/external/mqtt-options.interface';
import { logger } from '../main';

@Injectable()
export class PublisherService {
constructor(
@Inject('MQTT_EXAMPLE') private readonly mqttClient: ClientProxy,
) {}

async exec(topic: string, message: string, qos: QoS = 0) {
const record = new MqttRecordBuilder(message).setQoS(qos).build();

return this.mqttClient
.emit(topic, record)
.toPromise()
.then(() => {
logger.log(
`\n
-- 메시지 발행 성공 --
Topic: ${topic}
Payload: ${message}
QoS: ${qos}
-----------------------`,
);
})
.catch((error) => logger.error(error));
}
}
  • app.controller
import { Body, Controller, Delete, Get, Post } from '@nestjs/common';
import { PublisherService } from './service/publisher.service';
import { logger } from './main';
import { QoS } from "@nestjs/microservices/external/mqtt-options.interface";

@Controller()
export class AppController {
constructor(
private readonly publisherService: PublisherService
) {}

@Post('publish')
async publishMessage(@Body() body: { topic: string; message: string; qos: QoS }) {
const { topic, message, qos } = body;
await this.publisherService.exec(topic, message, qos);
return 'SUCCESS!';
}
}

3. Client (Subscriber)

  • main.ts
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
import { ConfigService } from '@nestjs/config';
import { Logger } from '@nestjs/common';

export const configService = new ConfigService();
export const logger = new Logger('MQTT Example');

/**
* clientId: 고유한 클라이언트ID, 중복된 클라이언트 ID로 연결 시 이전 연결이 종료됩니다.
* clean: 영구 세션(Persistent Session) 활성화 여부
* subscribeOptions.qos: 클라이언트가 수신할 수 있는 최대 QoS 레벨
*/
const mqttOptions: MicroserviceOptions = {
transport: Transport.MQTT,
options: {
clientId: 'tabling-mqtt-example-main_01',
protocol: configService.get('MQTT_PROTOCOL'),
url: configService.get('MQTT_URL'),
username: configService.get('MQTT_USERNAME'),
password: configService.get('MQTT_PASSWORD'),
clean: false,
subscribeOptions: {
qos: 2,
},
},
};

async function bootstrap() {
const app = await NestFactory.create(AppModule);

app.connectMicroservice(mqttOptions);

await app.startAllMicroservices();
await app.listen(3000);
}

bootstrap();
  • app.controller.ts
import { Body, Controller, Post } from '@nestjs/common';
import { Ctx, MessagePattern, MqttContext, Payload, Transport } from '@nestjs/microservices';
import { logger } from '../../main';

@Controller()
export class AppController {
constructor() {}

@MessagePattern('tabling/lounge/light', Transport.MQTT)
handleMessage(@Payload() data: number[], @Ctx() context: MqttContext) {
logger.log(`\n
-- 메시지 수신 성공 --
Topic: ${context.getTopic()}
Payload: ${data.toString()}
---------------------`);
}
}

결과

테스트 로그

이전에 연결했던 클라이언트 ID 로 재연결 시 연결이 끊겨있는 동안 발행되었던 메시지들을 수신하는 것을 확인할 수 있었습니다.

마지막으로 공부할 때 제가 궁금했던 점과 발표 중 나왔던 질문에 대한 답변을 정리했습니다.

[Q1] WebSocket 통신과 무슨 차이점이 있나요?

WebSocket 은 웹 브라우저와 서버 간의 양방향 통신을 위해 사용하는 프로토콜로 MQTT 와는 사용 목적이 다릅니다.

만약 MQTT 를 브라우저 환경에서 사용한다면 브라우저는 기본적으로 MQTT 를 지원하지 않기 때문에 WebSocket 을 통해 브라우저에 MQTT 메시지를 보내야 합니다. (많은 MQTT 클라이언트 라이브러리에서 WebSocket 을 지원합니다.)

또 우리의 요건 측면에서 큰 차이점은 WebSocket 은 QoS 옵션과 메시지 대기열을 지원하지 않기 때문에 메시지의 전달을 보장하기 어렵다는 점이 있습니다.

[Q2] Message Queue와 무슨 차이점이 있나요?

Message Queue는 메시지 전달을 보장하기 위해서 1:1 통신을 해야하는 반면 MQTT는 Pub/Sub 모델로 여러 구독자가 모두 동일한 메시지를 수신할 수 있습니다.

ActiveMQ 의 경우 Topic 뿐만 아니라 Queue 로도 메시지를 전달할 수 있기 때문에 목적에 맞게 적절한 방식을 고려할 수 있습니다.

[Q3] Kafka 와 무슨 차이점이 있나요?

Kafka 는 대용량의 데이터 스트림 처리를 위해 사용하는 플랫폼으로 실시간 데이터 처리 및 저장이 필요한 대규모 애플리케이션에 적합합니다.

대량의 IoT 데이터를 처리 및 저장하여 보다 정밀한 분석을 하기 위해 MQTT와 Kafka를 결합하여 사용하기도 하며, MQTT 브로커도 스케일링이 가능하고 대용량 메시지 처리와 로드 밸런싱을 지원하기 때문에 상황에 따라 적절한 대응 방법을 선택할 수 있습니다.

마치며

메시지 기반으로 통신하는 방식이 다양하기 때문에 그중 하나인 MQTT 에 대해 알아가시는 데에 도움이 되었길 바라며, 가볍게 통신 테스트할 때는 무료로 제공되는 Public Broker도 활용할 수 있다는 소소한 정보 공유드립니다.

현재는 적용 여부를 검토 중인 단계이지만 적용 후 긍적적인 결과를 기대하며 마무리하겠습니다.

긴 글 읽어주셔서 감사합니다!

테이블링 백엔드팀에서는 기술적인 고민을 함께 나누며, 서비스를 지속적으로 발전시켜 갈 실력 있는 동료분을 모시고 있습니다. 보다 상세한 내용은 채용 공고를 확인 부탁드리며, 많은 관심과 지원 부탁드리겠습니다.

--

--