replace queue with worker
This commit is contained in:
@@ -12,7 +12,7 @@ Some general-purpose utilities used elsewhere in paravel.
|
|||||||
- `Emitter` extends EventEmitter to support `emitter.on('*', ...)`.
|
- `Emitter` extends EventEmitter to support `emitter.on('*', ...)`.
|
||||||
- `Fluent` is a wrapper around arrays with chained methods that modify and copy the underlying array.
|
- `Fluent` is a wrapper around arrays with chained methods that modify and copy the underlying array.
|
||||||
- `LRUCache` is an implementation of an LRU cache.
|
- `LRUCache` is an implementation of an LRU cache.
|
||||||
- `Queue` is an implementation of an asynchronous queue.
|
- `Worker` is an implementation of an asynchronous queue.
|
||||||
- `Tools` is a collection of general-purpose utility functions.
|
- `Tools` is a collection of general-purpose utility functions.
|
||||||
|
|
||||||
## @coracle.social/util
|
## @coracle.social/util
|
||||||
|
|||||||
@@ -1,57 +0,0 @@
|
|||||||
export class Queue {
|
|
||||||
timeout?: number
|
|
||||||
messages: any[] = []
|
|
||||||
|
|
||||||
clear() {
|
|
||||||
this.messages = []
|
|
||||||
}
|
|
||||||
|
|
||||||
push(message: any) {
|
|
||||||
this.messages.push(message)
|
|
||||||
this.enqueueWork()
|
|
||||||
}
|
|
||||||
|
|
||||||
handle(message: any) {
|
|
||||||
throw new Error("Not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
shouldSend(message: any) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
doWork() {
|
|
||||||
for (let i = 0; i < 10; i++) {
|
|
||||||
if (this.messages.length === 0) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pop the messages one at a time so handle can modify the queue
|
|
||||||
const [message] = this.messages.splice(0, 1)
|
|
||||||
|
|
||||||
if (this.shouldSend(message)) {
|
|
||||||
this.handle(message)
|
|
||||||
} else {
|
|
||||||
this.messages.push(message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.timeout = undefined
|
|
||||||
this.enqueueWork()
|
|
||||||
}
|
|
||||||
|
|
||||||
enqueueWork() {
|
|
||||||
if (this.timeout) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.messages.length === 0) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
this.timeout = setTimeout(() => this.doWork(), 100)
|
|
||||||
}
|
|
||||||
|
|
||||||
stop() {
|
|
||||||
clearTimeout(this.timeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,71 @@
|
|||||||
|
const ANY = Symbol("worker/ANY")
|
||||||
|
|
||||||
|
export type WorkerOpts<T> = {
|
||||||
|
getKey?: (x: T) => any
|
||||||
|
shouldDefer?: (x: T) => boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Worker<T> {
|
||||||
|
buffer: T[] = []
|
||||||
|
handlers: Map<any, Array<(x: T) => void>> = new Map()
|
||||||
|
timeout: number | undefined
|
||||||
|
|
||||||
|
constructor(readonly opts: WorkerOpts<T> = {}) {}
|
||||||
|
|
||||||
|
#doWork = async () => {
|
||||||
|
for (let i = 0; i < 50; i++) {
|
||||||
|
if (this.buffer.length === 0) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pop the buffer one at a time so handle can modify the queue
|
||||||
|
const [message] = this.buffer.splice(0, 1)
|
||||||
|
|
||||||
|
if (this.opts.shouldDefer?.(message)) {
|
||||||
|
this.buffer.push(message)
|
||||||
|
} else {
|
||||||
|
for (const handler of this.handlers.get(ANY) || []) {
|
||||||
|
await handler(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.opts.getKey) {
|
||||||
|
const k = this.opts.getKey(message)
|
||||||
|
|
||||||
|
for (const handler of this.handlers.get(k) || []) {
|
||||||
|
await handler(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.timeout = undefined
|
||||||
|
this.#enqueueWork()
|
||||||
|
}
|
||||||
|
|
||||||
|
#enqueueWork = () => {
|
||||||
|
if (!this.timeout && this.buffer.length > 0) {
|
||||||
|
this.timeout = setTimeout(this.#doWork, 50)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
push = (message: T) => {
|
||||||
|
this.buffer.push(message)
|
||||||
|
this.#enqueueWork()
|
||||||
|
}
|
||||||
|
|
||||||
|
addHandler = (k: any, handler: (message: T) => void) => {
|
||||||
|
this.handlers.set(k, (this.handlers.get(k) || []).concat(handler))
|
||||||
|
}
|
||||||
|
|
||||||
|
addGlobalHandler = (handler: (message: T) => void) => {
|
||||||
|
this.addHandler(ANY, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
clear() {
|
||||||
|
this.buffer = []
|
||||||
|
}
|
||||||
|
|
||||||
|
stop() {
|
||||||
|
clearTimeout(this.timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,6 +3,6 @@ export * from './Deferred'
|
|||||||
export * from './Emitter'
|
export * from './Emitter'
|
||||||
export * from './Fluent'
|
export * from './Fluent'
|
||||||
export * from './LRUCache'
|
export * from './LRUCache'
|
||||||
export * from './Queue'
|
export * from './Worker'
|
||||||
export * from './Tools'
|
export * from './Tools'
|
||||||
export {default as normalizeUrl} from './normalize-url'
|
export {default as normalizeUrl} from './normalize-url'
|
||||||
|
|||||||
@@ -1,66 +1,13 @@
|
|||||||
import {Emitter, Queue} from '@coracle.social/lib'
|
import {Emitter, Worker} from '@coracle.social/lib'
|
||||||
import {AuthStatus, ConnectionMeta} from './ConnectionMeta'
|
import {AuthStatus, ConnectionMeta} from './ConnectionMeta'
|
||||||
import {Socket, isMessage, asMessage} from './Socket'
|
import {Socket, isMessage, asMessage} from './Socket'
|
||||||
import type {SocketMessage} from './Socket'
|
import type {SocketMessage} from './Socket'
|
||||||
|
|
||||||
class SendQueue extends Queue {
|
|
||||||
constructor(readonly cxn: Connection) {
|
|
||||||
super()
|
|
||||||
}
|
|
||||||
|
|
||||||
shouldSend(message: SocketMessage) {
|
|
||||||
if (!this.cxn.socket.isReady()) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
const [verb, ...extra] = asMessage(message)
|
|
||||||
|
|
||||||
if (['AUTH', 'CLOSE'].includes(verb)) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow relay requests through
|
|
||||||
if (verb === 'EVENT' && extra[0].kind === 28934) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only defer for auth if we're not multiplexing
|
|
||||||
if (isMessage(message) && ![AuthStatus.Ok, AuthStatus.Pending].includes(this.cxn.meta.authStatus)) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if (verb === 'REQ') {
|
|
||||||
return this.cxn.meta.pendingRequests.size < 8
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
handle(message: SocketMessage) {
|
|
||||||
// If we ended up handling a CLOSE before we handled the REQ, don't send the REQ
|
|
||||||
if (message[0] === 'CLOSE') {
|
|
||||||
this.messages = this.messages.filter(m => !(m[0] === 'REQ' && m[1] === message[1]))
|
|
||||||
}
|
|
||||||
|
|
||||||
this.cxn.onSend(message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class ReceiveQueue extends Queue {
|
|
||||||
constructor(readonly cxn: Connection) {
|
|
||||||
super()
|
|
||||||
}
|
|
||||||
|
|
||||||
handle(message: SocketMessage) {
|
|
||||||
this.cxn.onReceive(message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export class Connection extends Emitter {
|
export class Connection extends Emitter {
|
||||||
url: string
|
url: string
|
||||||
socket: Socket
|
socket: Socket
|
||||||
sendQueue: SendQueue
|
sender: Worker<SocketMessage>
|
||||||
receiveQueue: ReceiveQueue
|
receiver: Worker<SocketMessage>
|
||||||
meta: ConnectionMeta
|
meta: ConnectionMeta
|
||||||
|
|
||||||
constructor(url: string) {
|
constructor(url: string) {
|
||||||
@@ -68,13 +15,64 @@ export class Connection extends Emitter {
|
|||||||
|
|
||||||
this.url = url
|
this.url = url
|
||||||
this.socket = new Socket(url, this)
|
this.socket = new Socket(url, this)
|
||||||
this.sendQueue = new SendQueue(this)
|
this.sender = this.createSender()
|
||||||
this.receiveQueue = new ReceiveQueue(this)
|
this.receiver = this.createReceiver()
|
||||||
this.meta = new ConnectionMeta(this)
|
this.meta = new ConnectionMeta(this)
|
||||||
this.setMaxListeners(100)
|
this.setMaxListeners(100)
|
||||||
}
|
}
|
||||||
|
|
||||||
send = (m: SocketMessage) => this.sendQueue.push(m)
|
createSender = () => {
|
||||||
|
const worker = new Worker<SocketMessage>({
|
||||||
|
shouldDefer: (message: SocketMessage) => {
|
||||||
|
if (!this.socket.isReady()) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
const [verb, ...extra] = asMessage(message)
|
||||||
|
|
||||||
|
if (['AUTH', 'CLOSE'].includes(verb)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow relay requests through
|
||||||
|
if (verb === 'EVENT' && extra[0].kind === 28934) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only defer for auth if we're not multiplexing
|
||||||
|
if (isMessage(message) && ![AuthStatus.Ok, AuthStatus.Pending].includes(this.meta.authStatus)) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if (verb === 'REQ') {
|
||||||
|
return this.meta.pendingRequests.size >= 8
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
worker.addGlobalHandler((message: SocketMessage) => {
|
||||||
|
// If we ended up handling a CLOSE before we handled the REQ, don't send the REQ
|
||||||
|
if (message[0] === 'CLOSE') {
|
||||||
|
worker.buffer = worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === message[1]))
|
||||||
|
}
|
||||||
|
|
||||||
|
this.onSend(message)
|
||||||
|
})
|
||||||
|
|
||||||
|
return worker
|
||||||
|
}
|
||||||
|
|
||||||
|
createReceiver = () => {
|
||||||
|
const worker = new Worker<SocketMessage>()
|
||||||
|
|
||||||
|
worker.addGlobalHandler(this.onReceive)
|
||||||
|
|
||||||
|
return worker
|
||||||
|
}
|
||||||
|
|
||||||
|
send = (m: SocketMessage) => this.sender.push(m)
|
||||||
|
|
||||||
onOpen = () => this.emit('open', this)
|
onOpen = () => this.emit('open', this)
|
||||||
|
|
||||||
@@ -82,7 +80,7 @@ export class Connection extends Emitter {
|
|||||||
|
|
||||||
onError = () => this.emit('fault', this)
|
onError = () => this.emit('fault', this)
|
||||||
|
|
||||||
onMessage = (m: SocketMessage) => this.receiveQueue.push(m)
|
onMessage = (m: SocketMessage) => this.receiver.push(m)
|
||||||
|
|
||||||
onSend = (message: SocketMessage) => {
|
onSend = (message: SocketMessage) => {
|
||||||
this.emit('send', this, message)
|
this.emit('send', this, message)
|
||||||
@@ -105,15 +103,15 @@ export class Connection extends Emitter {
|
|||||||
|
|
||||||
disconnect() {
|
disconnect() {
|
||||||
this.socket.disconnect()
|
this.socket.disconnect()
|
||||||
this.sendQueue.clear()
|
this.sender.clear()
|
||||||
this.receiveQueue.clear()
|
this.receiver.clear()
|
||||||
this.meta.clearPending()
|
this.meta.clearPending()
|
||||||
}
|
}
|
||||||
|
|
||||||
destroy() {
|
destroy() {
|
||||||
this.disconnect()
|
this.disconnect()
|
||||||
this.removeAllListeners()
|
this.removeAllListeners()
|
||||||
this.sendQueue.stop()
|
this.sender.stop()
|
||||||
this.receiveQueue.stop()
|
this.receiver.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,13 @@ export const isShareableRelayUrl = (url: string) =>
|
|||||||
!url.slice(6).match(/\/npub/)
|
!url.slice(6).match(/\/npub/)
|
||||||
)
|
)
|
||||||
|
|
||||||
export const normalizeRelayUrl = (url: string) => {
|
type NormalizeRelayUrlOpts = {
|
||||||
|
allowInsecure?: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
export const normalizeRelayUrl = (url: string, {allowInsecure = false}: NormalizeRelayUrlOpts = {}) => {
|
||||||
|
const prefix = allowInsecure ? url.match(/^wss?:\/\//)?.[0] || "wss://" : "wss://"
|
||||||
|
|
||||||
// Use our library to normalize
|
// Use our library to normalize
|
||||||
url = normalizeUrl(url, {stripHash: true, stripAuthentication: false})
|
url = normalizeUrl(url, {stripHash: true, stripAuthentication: false})
|
||||||
|
|
||||||
@@ -29,5 +35,5 @@ export const normalizeRelayUrl = (url: string) => {
|
|||||||
url += "/"
|
url += "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
return "wss://" + url
|
return prefix + url
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ export class Tags extends (Fluent<Tag> as OmitStatics<typeof Fluent<Tag>, 'from'
|
|||||||
|
|
||||||
entries = () => this.mapTo(t => t.entry())
|
entries = () => this.mapTo(t => t.entry())
|
||||||
|
|
||||||
relays = () => this.flatMap((t: Tag) => t.valueOf().filter(isShareableRelayUrl).map(normalizeRelayUrl)).uniq()
|
relays = () => this.flatMap((t: Tag) => t.valueOf().filter(isShareableRelayUrl).map(url => normalizeRelayUrl(url))).uniq()
|
||||||
|
|
||||||
topics = () => this.whereKey("t").values().map((t: string) => t.replace(/^#/, ""))
|
topics = () => this.whereKey("t").values().map((t: string) => t.replace(/^#/, ""))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user