MQTT 3.1.1
MQTT は IoT デバイスで広く使われるパブリッシュ/サブスクライブ型のメッセージングプロトコルです。ワイヤフォーマットはコンパクトで、すべてのコントロールパケットが 1 バイトの固定ヘッダ、継続ビット可変長整数(Remaining Length)、型固有のペイロードという構成です。パケット型は先頭バイトの上位 4 ビットにエンコードされており、独立したフィールドではありません。この特徴が wirespec の式ベースカプセルタグの格好のデモになります。
プロトコルの背景
MQTT 3.1.1(OASIS 標準)は 14 種類のコントロールパケット型を定義しています。クライアントにとって重要なのは CONNECT/CONNACK(セッション確立)、PUBLISH/PUBACK(メッセージ配信)、SUBSCRIBE/SUBACK(トピック購読)、PINGREQ/PINGRESP(キープアライブ)、DISCONNECT です。パケット型ごとに固有のペイロード構造があり、一部は先頭ヘッダバイトのフラグビットに応じた条件付きフィールドを持ちます。
Remaining Length フィールドは、各バイトの MSB を継続フラグ、下位 7 ビットをデータとする可変長エンコーディングです。最大 4 バイトで 268,435,455 までの長さを表現できます。
wirespec 定義
@endian big
module mqtt
# 継続ビット VarInt -- MQTT Remaining Length (MQTT 3.1.1 §2.2.3)
type MqttLength = varint {
continuation_bit: msb, # MSB = 1 は後続バイトがあることを意味する
value_bits: 7, # バイトあたり 7 ビットのペイロード
max_bytes: 4, # 4 バイトを超えると → WIRESPEC_ERR_OVERFLOW
byte_order: little, # 最下位グループが先
}
# UTF-8 文字列: u16be 長さプレフィックス + データバイト
packet MqttString {
length: u16,
data: bytes[length],
}
# バイナリブロブ: u16be 長さプレフィックス + データ(UTF-8 検証なし)
# Will Message (§3.1.3.3) と Password (§3.1.3.5) 用
packet MqttBytes {
length: u16,
data: bytes[length],
}
# MQTT コントロールパケット -- 固定ヘッダ + Remaining Length + ボディ
capsule MqttPacket {
type_and_flags: u8,
remaining_length: MqttLength,
payload: match (type_and_flags >> 4) within remaining_length {
1 => Connect {
protocol_name: MqttString,
protocol_level: u8,
connect_flags: u8,
keep_alive: u16,
client_id: MqttString,
will_topic: if connect_flags & 0x04 { MqttString },
will_message: if connect_flags & 0x04 { MqttBytes },
username: if connect_flags & 0x80 { MqttString },
password: if connect_flags & 0x40 { MqttBytes },
},
2 => ConnAck {
ack_flags: u8,
return_code: u8,
},
3 => Publish {
topic: MqttString,
let qos: u8 = (type_and_flags & 0x06) >> 1,
packet_id: if qos > 0 { u16 },
payload: bytes[remaining],
},
4 => PubAck { packet_id: u16 },
8 => Subscribe {
require type_and_flags & 0x0F == 0x02,
packet_id: u16,
subscriptions: bytes[remaining],
},
9 => SubAck { packet_id: u16, return_codes: bytes[remaining] },
12 => PingReq {},
13 => PingResp {},
14 => Disconnect {},
_ => Unknown { data: bytes[remaining] },
},
}使われている機能
| 機能 | 該当箇所 |
|---|---|
| 継続ビット VarInt | type MqttLength = varint { ... } |
| ネストしたパケット型 | フィールド型としての MqttString、MqttBytes |
| 式ベースのカプセルタグ | match (type_and_flags >> 4) within ... |
within バイト長スコーピング | within remaining_length |
条件フィールド(if) | will_topic: if connect_flags & 0x04 { ... } |
let 導出フィールド | let qos: u8 = (type_and_flags & 0x06) >> 1 |
| 依存条件 | packet_id: if qos > 0 { u16 } |
| 実行時バリデーション | require type_and_flags & 0x0F == 0x02 |
bytes[remaining] | payload: bytes[remaining] |
| 空ブランチ | PingReq {}, PingResp {}, Disconnect {} |
各機能の解説
継続ビット VarInt
type MqttLength = varint {
continuation_bit: msb,
value_bits: 7,
max_bytes: 4,
byte_order: little,
}varint { } ブロックは継続ビット方式の可変長整数を定義します。QUIC のプレフィックスマッチ VarInt とは別の方式です。パラメータでエンコーディングの詳細を制御します。
| パラメータ | 値 | 意味 |
|---|---|---|
continuation_bit | msb | 各バイトのビット 7 が「後続バイトあり」を示す |
value_bits | 7 | バイトあたり 7 ビットのデータ |
max_bytes | 4 | 4 バイトで終端しなければ WIRESPEC_ERR_OVERFLOW |
byte_order | little | 最下位 7 ビットグループが先 |
例えば、値 128 は 0x80 0x01(2 バイト)、値 16,383 は 0xFF 0x7F にエンコードされます。268,435,455(= 2^28 - 1)までが 4 バイトに収まります。
C では、継続バイトがなくなるまでループでバイトを読み取り、7 ビットグループを累積するヘルパーでデコードします。結果は uint32_t に格納されます。
式ベースのカプセルタグ
capsule MqttPacket {
type_and_flags: u8,
remaining_length: MqttLength,
payload: match (type_and_flags >> 4) within remaining_length {
...
},
}カプセルタグは単純なフィールド名ではなく、括弧付き式 (type_and_flags >> 4) です。wirespec はこの式を within より前に宣言されたヘッダフィールドのみで評価し、結果でディスパッチします。MQTT の「タイプニブル」(type_and_flags の上位 4 ビット)を、追加フィールドなしにディスパッチのキーとして使えます。
ヘッダフィールドに対する任意の算術・ビット演算式がカプセルタグとして有効です。
within スコーピング
payload: match (type_and_flags >> 4) within remaining_length {within remaining_length はペイロードのパースを正確に remaining_length バイトに制限します。そのバイト数でサブカーソルを作成し、各ブランチはサブカーソル内でパースされます。カーソル長より少ないバイトしか消費しなかった場合は WIRESPEC_ERR_TRAILING_DATA、カーソル境界を超えて読もうとした場合は WIRESPEC_ERR_SHORT_BUFFER を返します。
これは MQTT のフレーミングそのものです。受信側は remaining_length バイトをバッファに読み込み、その中から型固有のペイロードをパースします。
条件フィールド(if)
will_topic: if connect_flags & 0x04 { MqttString },
will_message: if connect_flags & 0x04 { MqttBytes },
username: if connect_flags & 0x80 { MqttString },
password: if connect_flags & 0x40 { MqttBytes },if COND { T } フィールドは IR 上 Option[T] になります。C では 2 つのフィールドに展開されます。
bool has_will_topic;
mqtt_mqtt_string_t will_topic; /* has_will_topic が true のときのみ有効 */パース時に条件を評価し、false ならフィールドをスキップして has_will_topic を false にします。シリアライズ時は has_will_topic が true なら書き込み、false なら省略します。
条件には、同一スコープ内で上方に宣言されたフィールドを参照できます。if でゲートされたフィールドの参照には ??(コアレスク)演算子を使います。
let 導出フィールド
3 => Publish {
topic: MqttString,
let qos: u8 = (type_and_flags & 0x06) >> 1,
packet_id: if qos > 0 { u16 },
payload: bytes[remaining],
},let フィールドはワイヤ上にバイトを持たず、生成される構造体に格納され、後続フィールドやバリデーションから参照できます。ここでは qos を外側の type_and_flags ヘッダバイトから導出し(先行ヘッダフィールドなのでブランチ内からアクセス可能)、直ちに packet_id のガードに使っています。
QoS レベルは固定ヘッダビットに既にエンコードされているため、ペイロードで再エンコードする必要はなく、冗長性を排除できます。
require による実行時バリデーション
8 => Subscribe {
require type_and_flags & 0x0F == 0x02,
packet_id: u16,
subscriptions: bytes[remaining],
},MQTT 3.1.1 は SUBSCRIBE パケットの type_and_flags 下位 4 ビットが 0x02 であることを要求しています。require はこれをパース時に強制し、条件が偽なら WIRESPEC_ERR_CONSTRAINT を返します。チェックはフィールドリスト内の記述位置で生成されるため、上方のフィールドを参照できます。
コンパイル
wirespec compile examples/mqtt/mqtt.wspec -o build/build/mqtt.h と build/mqtt.c が生成されます。
生成される C API
wirespec_result_t mqtt_mqtt_packet_parse(
const uint8_t *buf, size_t len,
mqtt_mqtt_packet_t *out, size_t *consumed);
wirespec_result_t mqtt_mqtt_packet_serialize(
const mqtt_mqtt_packet_t *in,
uint8_t *buf, size_t cap, size_t *written);使用例
#include "mqtt.h"
void handle_mqtt(const uint8_t *buf, size_t len) {
mqtt_mqtt_packet_t pkt;
size_t consumed;
wirespec_result_t r = mqtt_mqtt_packet_parse(buf, len, &pkt, &consumed);
if (r != WIRESPEC_OK) return;
switch (pkt.kind) {
case MQTT_MQTT_PACKET_CONNECT:
/* pkt.connect.client_id.data は wirespec_bytes_t */
on_connect(pkt.connect.client_id.data.ptr,
pkt.connect.client_id.data.len);
if (pkt.connect.has_will_topic) {
set_will(pkt.connect.will_topic.data.ptr,
pkt.connect.will_topic.data.len);
}
break;
case MQTT_MQTT_PACKET_PUBLISH:
/* pkt.publish.qos は let で導出されたフィールド */
on_publish(pkt.publish.topic.data.ptr,
pkt.publish.topic.data.len,
pkt.publish.payload.ptr,
pkt.publish.payload.len,
pkt.publish.qos);
break;
case MQTT_MQTT_PACKET_PINGREQ:
send_pingresp();
break;
default:
break;
}
}次のステップ
- BLE ATT -- リトルエンディアン、型エイリアス、列挙型フィールド
- TLS 1.3 -- 列挙型タグ、
u24プリミティブ、バイト境界付き fill 配列 - クラシックプロトコル -- UDP、TCP、Ethernet、IPv4