reconnect

This commit is contained in:
Yasuhiro Matsumoto
2023-12-02 19:14:54 +09:00
committed by fiatjaf_
parent a9972245f3
commit cb8b40bd00
+34 -11
View File
@@ -79,20 +79,19 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
ticker := time.NewTicker(seenAlreadyDropTick) ticker := time.NewTicker(seenAlreadyDropTick)
eose := false eose := false
updateSince := func() {
// After reconnection, update the since in the filter so that
// old events are not retrieved.
now := Now()
for i := range filters {
filters[i].Since = &now
}
}
pending := xsync.NewCounter() pending := xsync.NewCounter()
pending.Add(int64(len(urls))) pending.Add(int64(len(urls)))
for _, url := range urls { for _, url := range urls {
go func(nm string) { go func(nm string) {
relay, err := pool.EnsureRelay(nm)
if err != nil {
return
}
sub, _ := relay.Subscribe(ctx, filters)
if sub == nil {
return
}
defer func() { defer func() {
pending.Dec() pending.Dec()
if pending.Value() == 0 { if pending.Value() == 0 {
@@ -101,11 +100,33 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
cancel() cancel()
}() }()
for {
select {
case <-ctx.Done():
return
default:
}
relay, err := pool.EnsureRelay(nm)
if err != nil {
time.Sleep(3 * time.Second)
updateSince()
continue
}
sub, err := relay.Subscribe(ctx, filters)
if err != nil {
time.Sleep(3 * time.Second)
updateSince()
continue
}
loop:
for { for {
select { select {
case evt, more := <-sub.Events: case evt, more := <-sub.Events:
if !more { if !more {
return break loop
} }
if unique { if unique {
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen { if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
@@ -139,6 +160,8 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
return return
} }
} }
updateSince()
}
}(NormalizeURL(url)) }(NormalizeURL(url))
} }