Skip to content

MQTT 3.1.1

MQTT は IoT デバイスで広く使われるパブリッシュ/サブスクライブ型のメッセージングプロトコルです。ワイヤフォーマットはコンパクトで、すべてのコントロールパケットが 1 バイトの固定ヘッダ、継続ビット可変長整数(Remaining Length)、型固有のペイロードという構成です。パケット型は先頭バイトの上位 4 ビットにエンコードされており、独立したフィールドではありません。この特徴が wirespec の式ベースカプセルタグの格好のデモになります。

プロトコルの背景

MQTT 3.1.1(OASIS 標準)は 14 種類のコントロールパケット型を定義しています。クライアントにとって重要なのは CONNECT/CONNACK(セッション確立)、PUBLISH/PUBACK(メッセージ配信)、SUBSCRIBE/SUBACK(トピック購読)、PINGREQ/PINGRESP(キープアライブ)、DISCONNECT です。パケット型ごとに固有のペイロード構造があり、一部は先頭ヘッダバイトのフラグビットに応じた条件付きフィールドを持ちます。

Remaining Length フィールドは、各バイトの MSB を継続フラグ、下位 7 ビットをデータとする可変長エンコーディングです。最大 4 バイトで 268,435,455 までの長さを表現できます。

wirespec 定義

wire
@endian big
module mqtt

# 継続ビット VarInt -- MQTT Remaining Length (MQTT 3.1.1 §2.2.3)
type MqttLength = varint {
    continuation_bit: msb,    # MSB = 1 は後続バイトがあることを意味する
    value_bits: 7,             # バイトあたり 7 ビットのペイロード
    max_bytes: 4,              # 4 バイトを超えると → WIRESPEC_ERR_OVERFLOW
    byte_order: little,        # 最下位グループが先
}

# UTF-8 文字列: u16be 長さプレフィックス + データバイト
packet MqttString {
    length: u16,
    data: bytes[length],
}

# バイナリブロブ: u16be 長さプレフィックス + データ(UTF-8 検証なし)
# Will Message (§3.1.3.3) と Password (§3.1.3.5) 用
packet MqttBytes {
    length: u16,
    data: bytes[length],
}

# MQTT コントロールパケット -- 固定ヘッダ + Remaining Length + ボディ
capsule MqttPacket {
    type_and_flags: u8,
    remaining_length: MqttLength,
    payload: match (type_and_flags >> 4) within remaining_length {
        1 => Connect {
            protocol_name:  MqttString,
            protocol_level: u8,
            connect_flags:  u8,
            keep_alive:     u16,
            client_id:      MqttString,
            will_topic:     if connect_flags & 0x04 { MqttString },
            will_message:   if connect_flags & 0x04 { MqttBytes },
            username:       if connect_flags & 0x80 { MqttString },
            password:       if connect_flags & 0x40 { MqttBytes },
        },
        2 => ConnAck {
            ack_flags:   u8,
            return_code: u8,
        },
        3 => Publish {
            topic: MqttString,
            let qos: u8 = (type_and_flags & 0x06) >> 1,
            packet_id: if qos > 0 { u16 },
            payload: bytes[remaining],
        },
        4  => PubAck     { packet_id: u16 },
        8  => Subscribe  {
            require type_and_flags & 0x0F == 0x02,
            packet_id:     u16,
            subscriptions: bytes[remaining],
        },
        9  => SubAck     { packet_id: u16, return_codes: bytes[remaining] },
        12 => PingReq    {},
        13 => PingResp   {},
        14 => Disconnect {},
        _  => Unknown    { data: bytes[remaining] },
    },
}

使われている機能

機能該当箇所
継続ビット VarInttype MqttLength = varint { ... }
ネストしたパケット型フィールド型としての MqttStringMqttBytes
式ベースのカプセルタグmatch (type_and_flags >> 4) within ...
within バイト長スコーピングwithin remaining_length
条件フィールド(ifwill_topic: if connect_flags & 0x04 { ... }
let 導出フィールドlet qos: u8 = (type_and_flags & 0x06) >> 1
依存条件packet_id: if qos > 0 { u16 }
実行時バリデーションrequire type_and_flags & 0x0F == 0x02
bytes[remaining]payload: bytes[remaining]
空ブランチPingReq {}, PingResp {}, Disconnect {}

各機能の解説

継続ビット VarInt

wire
type MqttLength = varint {
    continuation_bit: msb,
    value_bits: 7,
    max_bytes: 4,
    byte_order: little,
}

varint { } ブロックは継続ビット方式の可変長整数を定義します。QUIC のプレフィックスマッチ VarInt とは別の方式です。パラメータでエンコーディングの詳細を制御します。

パラメータ意味
continuation_bitmsb各バイトのビット 7 が「後続バイトあり」を示す
value_bits7バイトあたり 7 ビットのデータ
max_bytes44 バイトで終端しなければ WIRESPEC_ERR_OVERFLOW
byte_orderlittle最下位 7 ビットグループが先

例えば、値 128 は 0x80 0x01(2 バイト)、値 16,383 は 0xFF 0x7F にエンコードされます。268,435,455(= 2^28 - 1)までが 4 バイトに収まります。

C では、継続バイトがなくなるまでループでバイトを読み取り、7 ビットグループを累積するヘルパーでデコードします。結果は uint32_t に格納されます。

式ベースのカプセルタグ

wire
capsule MqttPacket {
    type_and_flags: u8,
    remaining_length: MqttLength,
    payload: match (type_and_flags >> 4) within remaining_length {
        ...
    },
}

カプセルタグは単純なフィールド名ではなく、括弧付き式 (type_and_flags >> 4) です。wirespec はこの式を within より前に宣言されたヘッダフィールドのみで評価し、結果でディスパッチします。MQTT の「タイプニブル」(type_and_flags の上位 4 ビット)を、追加フィールドなしにディスパッチのキーとして使えます。

ヘッダフィールドに対する任意の算術・ビット演算式がカプセルタグとして有効です。

within スコーピング

wire
payload: match (type_and_flags >> 4) within remaining_length {

within remaining_length はペイロードのパースを正確に remaining_length バイトに制限します。そのバイト数でサブカーソルを作成し、各ブランチはサブカーソル内でパースされます。カーソル長より少ないバイトしか消費しなかった場合は WIRESPEC_ERR_TRAILING_DATA、カーソル境界を超えて読もうとした場合は WIRESPEC_ERR_SHORT_BUFFER を返します。

これは MQTT のフレーミングそのものです。受信側は remaining_length バイトをバッファに読み込み、その中から型固有のペイロードをパースします。

条件フィールド(if

wire
will_topic:   if connect_flags & 0x04 { MqttString },
will_message: if connect_flags & 0x04 { MqttBytes },
username:     if connect_flags & 0x80 { MqttString },
password:     if connect_flags & 0x40 { MqttBytes },

if COND { T } フィールドは IR 上 Option[T] になります。C では 2 つのフィールドに展開されます。

c
bool has_will_topic;
mqtt_mqtt_string_t will_topic;  /* has_will_topic が true のときのみ有効 */

パース時に条件を評価し、false ならフィールドをスキップして has_will_topicfalse にします。シリアライズ時は has_will_topictrue なら書き込み、false なら省略します。

条件には、同一スコープ内で上方に宣言されたフィールドを参照できます。if でゲートされたフィールドの参照には ??(コアレスク)演算子を使います。

let 導出フィールド

wire
3 => Publish {
    topic: MqttString,
    let qos: u8 = (type_and_flags & 0x06) >> 1,
    packet_id: if qos > 0 { u16 },
    payload: bytes[remaining],
},

let フィールドはワイヤ上にバイトを持たず、生成される構造体に格納され、後続フィールドやバリデーションから参照できます。ここでは qos を外側の type_and_flags ヘッダバイトから導出し(先行ヘッダフィールドなのでブランチ内からアクセス可能)、直ちに packet_id のガードに使っています。

QoS レベルは固定ヘッダビットに既にエンコードされているため、ペイロードで再エンコードする必要はなく、冗長性を排除できます。

require による実行時バリデーション

wire
8 => Subscribe {
    require type_and_flags & 0x0F == 0x02,
    packet_id: u16,
    subscriptions: bytes[remaining],
},

MQTT 3.1.1 は SUBSCRIBE パケットの type_and_flags 下位 4 ビットが 0x02 であることを要求しています。require はこれをパース時に強制し、条件が偽なら WIRESPEC_ERR_CONSTRAINT を返します。チェックはフィールドリスト内の記述位置で生成されるため、上方のフィールドを参照できます。

コンパイル

bash
wirespec compile examples/mqtt/mqtt.wspec -o build/

build/mqtt.hbuild/mqtt.c が生成されます。

生成される C API

c
wirespec_result_t mqtt_mqtt_packet_parse(
    const uint8_t *buf, size_t len,
    mqtt_mqtt_packet_t *out, size_t *consumed);

wirespec_result_t mqtt_mqtt_packet_serialize(
    const mqtt_mqtt_packet_t *in,
    uint8_t *buf, size_t cap, size_t *written);

使用例

c
#include "mqtt.h"

void handle_mqtt(const uint8_t *buf, size_t len) {
    mqtt_mqtt_packet_t pkt;
    size_t consumed;

    wirespec_result_t r = mqtt_mqtt_packet_parse(buf, len, &pkt, &consumed);
    if (r != WIRESPEC_OK) return;

    switch (pkt.kind) {
    case MQTT_MQTT_PACKET_CONNECT:
        /* pkt.connect.client_id.data は wirespec_bytes_t */
        on_connect(pkt.connect.client_id.data.ptr,
                   pkt.connect.client_id.data.len);
        if (pkt.connect.has_will_topic) {
            set_will(pkt.connect.will_topic.data.ptr,
                     pkt.connect.will_topic.data.len);
        }
        break;
    case MQTT_MQTT_PACKET_PUBLISH:
        /* pkt.publish.qos は let で導出されたフィールド */
        on_publish(pkt.publish.topic.data.ptr,
                   pkt.publish.topic.data.len,
                   pkt.publish.payload.ptr,
                   pkt.publish.payload.len,
                   pkt.publish.qos);
        break;
    case MQTT_MQTT_PACKET_PINGREQ:
        send_pingresp();
        break;
    default:
        break;
    }
}

次のステップ

  • BLE ATT -- リトルエンディアン、型エイリアス、列挙型フィールド
  • TLS 1.3 -- 列挙型タグ、u24 プリミティブ、バイト境界付き fill 配列
  • クラシックプロトコル -- UDP、TCP、Ethernet、IPv4