Examples
Practical snippets for the current API.
Shared Setup
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from rfc9420 import GroupConfig, DefaultCryptoProvider, MemoryStorageProvider
from rfc9420.extensions.extensions import build_capabilities_data
from rfc9420.messages.data_structures import Credential, Signature
from rfc9420.messages.key_packages import KeyPackage, LeafNode, LeafNodeSource
crypto = DefaultCryptoProvider()
storage = MemoryStorageProvider()
config = GroupConfig(crypto_provider=crypto, storage_provider=storage)
def make_key_package(identity: bytes):
kem_sk = X25519PrivateKey.generate()
kem_pk = kem_sk.public_key()
init_sk = X25519PrivateKey.generate()
init_pk = init_sk.public_key()
sig_sk = Ed25519PrivateKey.generate()
sig_pk = sig_sk.public_key()
sig_pk_raw = sig_pk.public_bytes_raw()
caps = build_capabilities_data(
ciphersuite_ids=[crypto.active_ciphersuite.suite_id],
supported_exts=[],
include_grease=False,
)
cred = Credential(identity=identity, public_key=sig_pk_raw)
leaf = LeafNode(
encryption_key=kem_pk.public_bytes_raw(),
signature_key=sig_pk_raw,
credential=cred,
capabilities=caps,
leaf_node_source=LeafNodeSource.KEY_PACKAGE,
)
kp_tbs = KeyPackage(leaf_node=leaf, init_key=init_pk.public_bytes_raw())
kp_sig = Signature(sig_sk.sign(kp_tbs.tbs_serialize()))
kp = KeyPackage(leaf_node=leaf, init_key=init_pk.public_bytes_raw(), signature=kp_sig)
return kp, init_sk.private_bytes_raw(), sig_sk.private_bytes_raw()
Create + Join + Message
from rfc9420.api.session import MLSGroupSession
kp_alice, init_alice, sig_alice = make_key_package(b"alice")
alice = MLSGroupSession.create_with_config(config, b"group-1", kp_alice)
kp_bob, init_bob, sig_bob = make_key_package(b"bob")
alice.add_member(kp_bob, sig_alice)
commit_bytes, welcomes = alice.commit(sig_alice)
bob = MLSGroupSession.join_from_welcome_with_config(config, welcomes[0], init_bob)
ct = alice.protect_application(b"hello")
sender, pt = bob.unprotect_application(ct)
print(sender, pt)
Process Commit From Another Member
from rfc9420 import get_commit_sender_leaf_index
sender = get_commit_sender_leaf_index(commit_bytes)
some_other_member_session.apply_commit(commit_bytes, sender)
Rotate Own Keys (Update Proposal)
new_kp, _new_init, new_sig_sk = make_key_package(b"alice")
new_leaf = new_kp.leaf_node
new_leaf = LeafNode(
encryption_key=new_leaf.encryption_key,
signature_key=new_leaf.signature_key,
credential=new_leaf.credential,
capabilities=new_leaf.capabilities,
leaf_node_source=LeafNodeSource.UPDATE,
)
hs = alice.update_self(new_leaf, sig_alice)
alice.process_proposal(hs, alice.own_leaf_index)
commit_bytes, _ = alice.commit(sig_alice)
sig_alice = new_sig_sk
Remove Member
hs = alice.remove_member(removed_index=1, signing_key=sig_alice)
alice.process_proposal(hs, alice.own_leaf_index)
commit_bytes, _ = alice.commit(sig_alice)
Persist + Restore Session
blob = alice.serialize()
alice_restored = MLSGroupSession.deserialize_with_config(config, blob)
SQLite Storage Provider
from rfc9420.backends.storage import SQLiteStorageProvider
from rfc9420 import GroupConfig, DefaultCryptoProvider
cfg = GroupConfig(
crypto_provider=DefaultCryptoProvider(),
storage_provider=SQLiteStorageProvider("mls_state.sqlite"),
)
Error Handling Pattern
from rfc9420 import InvalidCommitError, InvalidSignatureError, RFC9420Error
try:
session.apply_commit(commit_bytes, sender_leaf_index=sender)
except (InvalidCommitError, InvalidSignatureError) as e:
print(f"commit rejected: {e}")
except RFC9420Error as e:
print(f"mls failure: {e}")