From 6f0b6acddbcb7249756b1b12e093582d7a70b36a Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 21 Mar 2025 13:21:04 -0700 Subject: [PATCH] Add negentropy/diff --- packages/net2/src/diff.ts | 86 +++++ packages/net2/src/message.ts | 39 ++- packages/net2/src/negentropy.ts | 590 ++++++++++++++++++++++++++++++++ packages/net2/src/subscribe.ts | 10 +- 4 files changed, 712 insertions(+), 13 deletions(-) create mode 100644 packages/net2/src/diff.ts create mode 100644 packages/net2/src/negentropy.ts diff --git a/packages/net2/src/diff.ts b/packages/net2/src/diff.ts new file mode 100644 index 0000000..57d47d0 --- /dev/null +++ b/packages/net2/src/diff.ts @@ -0,0 +1,86 @@ +import {EventEmitter} from "events" +import {on, randomId} from "@welshman/lib" +import {SignedEvent, Filter} from "@welshman/util" +import {RelayMessage, isRelayNegErrMessage, isRelayNegMsgMessage} from "./message.js" +import {AbstractAdapter, AdapterEventType} from "./adapter.js" +import {Negentropy, NegentropyStorageVector} from "./negentropy.js" +import {TypedEmitter} from "./util.js" + +export enum DiffEventType { + Message = "diff:event:message", + Error = "diff:event:error", + Close = "diff:event:close", +} + +export type DiffEvents = { + [DiffEventType.Message]: (payload: {have: string[]; need: string[]}, url: string) => void + [DiffEventType.Error]: (error: string, url: string) => void + [DiffEventType.Close]: () => void +} + +export class Diff extends (EventEmitter as new () => TypedEmitter) { + _id = `NEG-${randomId().slice(0, 8)}` + _unsubscriber: () => void + _closed = false + + constructor( + readonly adapter: AbstractAdapter, + readonly events: SignedEvent[], + readonly filter: Filter, + ) { + super() + + const storage = new NegentropyStorageVector() + const neg = new Negentropy(storage, 50_000) + + for (const event of events) { + storage.insert(event.created_at, event.id) + } + + storage.seal() + + this._unsubscriber = on( + adapter, + AdapterEventType.Receive, + async (message: RelayMessage, url: string) => { + if (isRelayNegMsgMessage(message)) { + const [_, negid, msg] = message + + if (negid === this._id) { + const [newMsg, have, need] = await neg.reconcile(msg) + + this.emit(DiffEventType.Message, {have, need}, url) + + if (newMsg) { + adapter.send(["NEG-MSG", this._id, newMsg]) + } else { + this.close() + } + } + } + + if (isRelayNegErrMessage(message)) { + const [_, negid, msg] = message + + if (negid === this._id) { + this.emit(DiffEventType.Error, msg, url) + } + } + }, + ) + + neg.initiate().then((msg: string) => { + adapter.send(["NEG-OPEN", this._id, filter, msg]) + }) + } + + close() { + if (this._closed) return + + this.adapter.send(["NEG-CLOSE", this._id]) + this.emit(DiffEventType.Close) + this._unsubscriber() + + this._closed = true + } +} diff --git a/packages/net2/src/message.ts b/packages/net2/src/message.ts index 4b1d7e5..ff33dc8 100644 --- a/packages/net2/src/message.ts +++ b/packages/net2/src/message.ts @@ -4,37 +4,53 @@ import type {SignedEvent} from "@welshman/util" export enum RelayMessageType { Auth = "AUTH", - Event = "EVENT", Eose = "EOSE", + Event = "EVENT", + NegErr = "NEG-ERR", + NegMsg = "NEG-MSG", Ok = "OK", } export type RelayAuthPayload = [string] +export type RelayEosePayload = [string, SignedEvent] + export type RelayEventPayload = [string, SignedEvent] -export type RelayEosePayload = [string, SignedEvent] +export type RelayNegErrPayload = [string, string] + +export type RelayNegMsgPayload = [string, string] export type RelayOkPayload = [string, boolean, string] -export type RelayAuthMessage = ["AUTH", ...RelayAuthPayload] +export type RelayAuthMessage = [RelayMessageType.Auth, ...RelayAuthPayload] -export type RelayEventMessage = ["EVENT", ...RelayEventPayload] +export type RelayEoseMessage = [RelayMessageType.Eose, ...RelayEosePayload] -export type RelayEoseMessage = ["EOSE", ...RelayEosePayload] +export type RelayEventMessage = [RelayMessageType.Event, ...RelayEventPayload] -export type RelayOkMessage = ["OK", ...RelayOkPayload] +export type RelayNegErrMessage = [RelayMessageType.NegErr, ...RelayNegErrPayload] + +export type RelayNegMsgMessage = [RelayMessageType.NegMsg, ...RelayNegMsgPayload] + +export type RelayOkMessage = [RelayMessageType.Ok, ...RelayOkPayload] export type RelayMessage = any[] export const isRelayAuthMessage = (m: RelayMessage): m is RelayAuthMessage => m[0] === RelayMessageType.Auth +export const isRelayEoseMessage = (m: RelayMessage): m is RelayEoseMessage => + m[0] === RelayMessageType.Eose + export const isRelayEventMessage = (m: RelayMessage): m is RelayEventMessage => m[0] === RelayMessageType.Event -export const isRelayEoseMessage = (m: RelayMessage): m is RelayEoseMessage => - m[0] === RelayMessageType.Eose +export const isRelayNegErrMessage = (m: RelayMessage): m is RelayNegErrMessage => + m[0] === RelayMessageType.NegErr + +export const isRelayNegMsgMessage = (m: RelayMessage): m is RelayNegMsgMessage => + m[0] === RelayMessageType.NegMsg export const isRelayOkMessage = (m: RelayMessage): m is RelayOkMessage => m[0] === RelayMessageType.Ok @@ -42,3 +58,10 @@ export const isRelayOkMessage = (m: RelayMessage): m is RelayOkMessage => // client -> relay export type ClientMessage = any[] + +export enum ClientMessageType { + Auth = "AUTH", + Event = "EVENT", + NegClose = "NEG-CLOSE", + Req = "REQ", +} diff --git a/packages/net2/src/negentropy.ts b/packages/net2/src/negentropy.ts new file mode 100644 index 0000000..73a91b4 --- /dev/null +++ b/packages/net2/src/negentropy.ts @@ -0,0 +1,590 @@ +// (C) 2023 Doug Hoyte. MIT license +// @ts-nocheck + +const PROTOCOL_VERSION = 0x61 // Version 1 +const ID_SIZE = 32 +const FINGERPRINT_SIZE = 16 + +const Mode = { + Skip: 0, + Fingerprint: 1, + IdList: 2, +} + +class WrappedBuffer { + constructor(buffer) { + this._raw = new Uint8Array(buffer || 512) + this.length = buffer ? buffer.length : 0 + } + + unwrap() { + return this._raw.subarray(0, this.length) + } + + get capacity() { + return this._raw.byteLength + } + + extend(buf) { + if (buf._raw) buf = buf.unwrap() + if (typeof(buf.length) !== 'number') throw Error("bad length") + const targetSize = buf.length + this.length + if (this.capacity < targetSize) { + const oldRaw = this._raw + const newCapacity = Math.max(this.capacity * 2, targetSize) + this._raw = new Uint8Array(newCapacity) + this._raw.set(oldRaw) + } + + this._raw.set(buf, this.length) + this.length += buf.length + } + + shift() { + const first = this._raw[0] + this._raw = this._raw.subarray(1) + this.length-- + return first + } + + shiftN(n = 1) { + const firstSubarray = this._raw.subarray(0, n) + this._raw = this._raw.subarray(n) + this.length -= n + return firstSubarray + } +} + +function decodeVarInt(buf) { + let res = 0 + + while (1) { + if (buf.length === 0) throw Error("parse ends prematurely") + const byte = buf.shift() + res = (res << 7) | (byte & 127) + if ((byte & 128) === 0) break + } + + return res +} + +function encodeVarInt(n) { + if (n === 0) return new WrappedBuffer([0]) + + const o = [] + + while (n !== 0) { + o.push(n & 127) + n >>>= 7 + } + + o.reverse() + + for (let i = 0; i < o.length - 1; i++) o[i] |= 128 + + return new WrappedBuffer(o) +} + +function getByte(buf) { + return getBytes(buf, 1)[0] +} + +function getBytes(buf, n) { + if (buf.length < n) throw Error("parse ends prematurely") + return buf.shiftN(n) +} + + +class Accumulator { + constructor() { + this.setToZero() + + if (typeof window === 'undefined') { // node.js + const crypto = require('crypto') + this.sha256 = async (slice) => new Uint8Array(crypto.createHash('sha256').update(slice).digest()) + } else { // browser + this.sha256 = async (slice) => new Uint8Array(await crypto.subtle.digest("SHA-256", slice)) + } + } + + setToZero() { + this.buf = new Uint8Array(ID_SIZE) + } + + add(otherBuf) { + let currCarry = 0, nextCarry = 0 + const p = new DataView(this.buf.buffer) + const po = new DataView(otherBuf.buffer) + + for (let i = 0; i < 8; i++) { + const offset = i * 4 + const orig = p.getUint32(offset, true) + const otherV = po.getUint32(offset, true) + + let next = orig + + next += currCarry + next += otherV + if (next > 0xFFFFFFFF) nextCarry = 1 + + p.setUint32(offset, next & 0xFFFFFFFF, true) + currCarry = nextCarry + nextCarry = 0 + } + } + + negate() { + const p = new DataView(this.buf.buffer) + + for (let i = 0; i < 8; i++) { + const offset = i * 4 + p.setUint32(offset, ~p.getUint32(offset, true)) + } + + const one = new Uint8Array(ID_SIZE) + one[0] = 1 + this.add(one) + } + + async getFingerprint(n) { + const input = new WrappedBuffer() + input.extend(this.buf) + input.extend(encodeVarInt(n)) + + const hash = await this.sha256(input.unwrap()) + + return hash.subarray(0, FINGERPRINT_SIZE) + } +} + + +class NegentropyStorageVector { + constructor() { + this.items = [] + this.sealed = false + } + + insert(timestamp, id) { + if (this.sealed) throw Error("already sealed") + id = loadInputBuffer(id) + if (id.byteLength !== ID_SIZE) throw Error("bad id size for added item") + this.items.push({timestamp, id}) + } + + seal() { + if (this.sealed) throw Error("already sealed") + this.sealed = true + + this.items.sort(itemCompare) + + for (let i = 1; i < this.items.length; i++) { + if (itemCompare(this.items[i - 1], this.items[i]) === 0) throw Error("duplicate item inserted") + } + } + + unseal() { + this.sealed = false + } + + size() { + this._checkSealed() + return this.items.length + } + + getItem(i) { + this._checkSealed() + if (i >= this.items.length) throw Error("out of range") + return this.items[i] + } + + iterate(begin, end, cb) { + this._checkSealed() + this._checkBounds(begin, end) + + for (let i = begin; i < end; ++i) { + if (!cb(this.items[i], i)) break + } + } + + findLowerBound(begin, end, bound) { + this._checkSealed() + this._checkBounds(begin, end) + + return this._binarySearch(this.items, begin, end, (a) => itemCompare(a, bound) < 0) + } + + async fingerprint(begin, end) { + const out = new Accumulator() + out.setToZero() + + this.iterate(begin, end, (item, i) => { + out.add(item.id) + return true + }) + + return await out.getFingerprint(end - begin) + } + + _checkSealed() { + if (!this.sealed) throw Error("not sealed") + } + + _checkBounds(begin, end) { + if (begin > end || end > this.items.length) throw Error("bad range") + } + + _binarySearch(arr, first, last, cmp) { + let count = last - first + + while (count > 0) { + let it = first + const step = Math.floor(count / 2) + it += step + + if (cmp(arr[it])) { + first = ++it + count -= step + 1 + } else { + count = step + } + } + + return first + } +} + + +class Negentropy { + constructor(storage, frameSizeLimit = 0) { + if (frameSizeLimit !== 0 && frameSizeLimit < 4096) throw Error("frameSizeLimit too small") + + this.storage = storage + this.frameSizeLimit = frameSizeLimit + + this.lastTimestampIn = 0 + this.lastTimestampOut = 0 + } + + _bound(timestamp, id) { + return {timestamp, id: id ? id : new Uint8Array(0)} + } + + async initiate() { + if (this.isInitiator) throw Error("already initiated") + this.isInitiator = true + + const output = new WrappedBuffer() + output.extend([PROTOCOL_VERSION]) + + await this.splitRange(0, this.storage.size(), this._bound(Number.MAX_VALUE), output) + + return this._renderOutput(output) + } + + setInitiator() { + this.isInitiator = true + } + + async reconcile(query) { + const haveIds = [], needIds = [] + query = new WrappedBuffer(loadInputBuffer(query)) + + this.lastTimestampIn = this.lastTimestampOut = 0 // reset for each message + + const fullOutput = new WrappedBuffer() + fullOutput.extend([PROTOCOL_VERSION]) + + const protocolVersion = getByte(query) + if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw Error("invalid negentropy protocol version byte") + if (protocolVersion !== PROTOCOL_VERSION) { + if (this.isInitiator) throw Error("unsupported negentropy protocol version requested: " + (protocolVersion - 0x60)) + else return [this._renderOutput(fullOutput), haveIds, needIds] + } + + const storageSize = this.storage.size() + let prevBound = this._bound(0) + let prevIndex = 0 + let skip = false + + while (query.length !== 0) { + let o = new WrappedBuffer() + + const doSkip = () => { + if (skip) { + skip = false + o.extend(this.encodeBound(prevBound)) + o.extend(encodeVarInt(Mode.Skip)) + } + } + + const currBound = this.decodeBound(query) + const mode = decodeVarInt(query) + + const lower = prevIndex + let upper = this.storage.findLowerBound(prevIndex, storageSize, currBound) + + if (mode === Mode.Skip) { + skip = true + } else if (mode === Mode.Fingerprint) { + const theirFingerprint = getBytes(query, FINGERPRINT_SIZE) + const ourFingerprint = await this.storage.fingerprint(lower, upper) + + if (compareUint8Array(theirFingerprint, ourFingerprint) !== 0) { + doSkip() + await this.splitRange(lower, upper, currBound, o) + } else { + skip = true + } + } else if (mode === Mode.IdList) { + const numIds = decodeVarInt(query) + + const theirElems = {} // stringified Uint8Array -> original Uint8Array (or hex) + for (let i = 0; i < numIds; i++) { + const e = getBytes(query, ID_SIZE) + if (this.isInitiator) theirElems[e] = e + } + + if (this.isInitiator) { + skip = true + + this.storage.iterate(lower, upper, (item) => { + const k = item.id + + if (!theirElems[k]) { + // ID exists on our side, but not their side + if (this.isInitiator) haveIds.push(this.wantUint8ArrayOutput ? k : uint8ArrayToHex(k)) + } else { + // ID exists on both sides + delete theirElems[k] + } + + return true + }) + + for (const v of Object.values(theirElems)) { + // ID exists on their side, but not our side + needIds.push(this.wantUint8ArrayOutput ? v : uint8ArrayToHex(v)) + } + } else { + doSkip() + + const responseIds = new WrappedBuffer() + let numResponseIds = 0 + let endBound = currBound + + this.storage.iterate(lower, upper, (item, index) => { + if (this.exceededFrameSizeLimit(fullOutput.length + responseIds.length)) { + endBound = item + upper = index // shrink upper so that remaining range gets correct fingerprint + return false + } + + responseIds.extend(item.id) + numResponseIds++ + return true + }) + + o.extend(this.encodeBound(endBound)) + o.extend(encodeVarInt(Mode.IdList)) + o.extend(encodeVarInt(numResponseIds)) + o.extend(responseIds) + + fullOutput.extend(o) + o = new WrappedBuffer() + } + } else { + throw Error("unexpected mode") + } + + if (this.exceededFrameSizeLimit(fullOutput.length + o.length)) { + // frameSizeLimit exceeded: Stop range processing and return a fingerprint for the remaining range + const remainingFingerprint = await this.storage.fingerprint(upper, storageSize) + + fullOutput.extend(this.encodeBound(this._bound(Number.MAX_VALUE))) + fullOutput.extend(encodeVarInt(Mode.Fingerprint)) + fullOutput.extend(remainingFingerprint) + break + } else { + fullOutput.extend(o) + } + + prevIndex = upper + prevBound = currBound + } + + return [fullOutput.length === 1 && this.isInitiator ? null : this._renderOutput(fullOutput), haveIds, needIds] + } + + async splitRange(lower, upper, upperBound, o) { + const numElems = upper - lower + const buckets = 16 + + if (numElems < buckets * 2) { + o.extend(this.encodeBound(upperBound)) + o.extend(encodeVarInt(Mode.IdList)) + + o.extend(encodeVarInt(numElems)) + this.storage.iterate(lower, upper, (item) => { + o.extend(item.id) + return true + }) + } else { + const itemsPerBucket = Math.floor(numElems / buckets) + const bucketsWithExtra = numElems % buckets + let curr = lower + + for (let i = 0; i < buckets; i++) { + const bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0) + const ourFingerprint = await this.storage.fingerprint(curr, curr + bucketSize) + curr += bucketSize + + let nextBound + + if (curr === upper) { + nextBound = upperBound + } else { + let prevItem, currItem + + this.storage.iterate(curr - 1, curr + 1, (item, index) => { + if (index === curr - 1) prevItem = item + else currItem = item + return true + }) + + nextBound = this.getMinimalBound(prevItem, currItem) + } + + o.extend(this.encodeBound(nextBound)) + o.extend(encodeVarInt(Mode.Fingerprint)) + o.extend(ourFingerprint) + } + } + } + + _renderOutput(o) { + o = o.unwrap() + if (!this.wantUint8ArrayOutput) o = uint8ArrayToHex(o) + return o + } + + exceededFrameSizeLimit(n) { + return this.frameSizeLimit && n > this.frameSizeLimit - 200 + } + + // Decoding + + decodeTimestampIn(encoded) { + let timestamp = decodeVarInt(encoded) + timestamp = timestamp === 0 ? Number.MAX_VALUE : timestamp - 1 + if (this.lastTimestampIn === Number.MAX_VALUE || timestamp === Number.MAX_VALUE) { + this.lastTimestampIn = Number.MAX_VALUE + return Number.MAX_VALUE + } + timestamp += this.lastTimestampIn + this.lastTimestampIn = timestamp + return timestamp + } + + decodeBound(encoded) { + const timestamp = this.decodeTimestampIn(encoded) + const len = decodeVarInt(encoded) + if (len > ID_SIZE) throw Error("bound key too long") + const id = getBytes(encoded, len) + return {timestamp, id} + } + + // Encoding + + encodeTimestampOut(timestamp) { + if (timestamp === Number.MAX_VALUE) { + this.lastTimestampOut = Number.MAX_VALUE + return encodeVarInt(0) + } + + const temp = timestamp + timestamp -= this.lastTimestampOut + this.lastTimestampOut = temp + return encodeVarInt(timestamp + 1) + } + + encodeBound(key) { + const output = new WrappedBuffer() + + output.extend(this.encodeTimestampOut(key.timestamp)) + output.extend(encodeVarInt(key.id.length)) + output.extend(key.id) + + return output + } + + getMinimalBound(prev, curr) { + if (curr.timestamp !== prev.timestamp) { + return this._bound(curr.timestamp) + } else { + let sharedPrefixBytes = 0 + const currKey = curr.id + const prevKey = prev.id + + for (let i = 0; i < ID_SIZE; i++) { + if (currKey[i] !== prevKey[i]) break + sharedPrefixBytes++ + } + + return this._bound(curr.timestamp, curr.id.subarray(0, sharedPrefixBytes + 1)) + } + } +} + +function loadInputBuffer(inp) { + if (typeof(inp) === 'string') inp = hexToUint8Array(inp) + else if (__proto__ !== Uint8Array.prototype) inp = new Uint8Array(inp) // node Buffer? + return inp +} + +function hexToUint8Array(h) { + if (h.startsWith('0x')) h = h.substr(2) + if (h.length % 2 === 1) throw Error("odd length of hex string") + const arr = new Uint8Array(h.length / 2) + for (let i = 0; i < arr.length; i++) arr[i] = parseInt(h.substr(i * 2, 2), 16) + return arr +} + +const uint8ArrayToHexLookupTable = new Array(256) +{ + const hexAlphabet = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'] + for (let i = 0; i < 256; i++) { + uint8ArrayToHexLookupTable[i] = hexAlphabet[(i >>> 4) & 0xF] + hexAlphabet[i & 0xF] + } +} + +function uint8ArrayToHex(arr) { + let out = '' + for (let i = 0, edx = arr.length; i < edx; i++) { + out += uint8ArrayToHexLookupTable[arr[i]] + } + return out +} + + +function compareUint8Array(a, b) { + for (let i = 0; i < a.byteLength; i++) { + if (a[i] < b[i]) return -1 + if (a[i] > b[i]) return 1 + } + + if (a.byteLength > b.byteLength) return 1 + if (a.byteLength < b.byteLength) return -1 + + return 0 +} + +function itemCompare(a, b) { + if (a.timestamp === b.timestamp) { + return compareUint8Array(a.id, b.id) + } + + return a.timestamp - b.timestamp +} + + +export {Negentropy, NegentropyStorageVector,} diff --git a/packages/net2/src/subscribe.ts b/packages/net2/src/subscribe.ts index ec4b72a..f2019da 100644 --- a/packages/net2/src/subscribe.ts +++ b/packages/net2/src/subscribe.ts @@ -52,10 +52,10 @@ export class Subscribe extends (EventEmitter as new () => TypedEmitter