mmm: use flock instead of file presence for locking the database.
This commit is contained in:
+35
-13
@@ -35,6 +35,7 @@ type MultiMmapManager struct {
|
|||||||
mmapfEnd uint64
|
mmapfEnd uint64
|
||||||
|
|
||||||
writeMutex sync.Mutex
|
writeMutex sync.Mutex
|
||||||
|
lockfile *os.File
|
||||||
|
|
||||||
lmdbEnv *lmdb.Env
|
lmdbEnv *lmdb.Env
|
||||||
stuff lmdb.DBI
|
stuff lmdb.DBI
|
||||||
@@ -55,28 +56,38 @@ const (
|
|||||||
maxuint32 = 4294967295
|
maxuint32 = 4294967295
|
||||||
)
|
)
|
||||||
|
|
||||||
func (b *MultiMmapManager) Init() error {
|
func (b *MultiMmapManager) Init() (err error) {
|
||||||
if b.Logger == nil {
|
if b.Logger == nil {
|
||||||
nopLogger := zerolog.Nop()
|
nopLogger := zerolog.Nop()
|
||||||
b.Logger = &nopLogger
|
b.Logger = &nopLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
b.releaseLock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// create directory if it doesn't exist
|
// create directory if it doesn't exist
|
||||||
dbpath := filepath.Join(b.Dir, "mmmm")
|
dbpath := filepath.Join(b.Dir, "mmmm")
|
||||||
if err := os.MkdirAll(dbpath, 0755); err != nil {
|
if err := os.MkdirAll(dbpath, 0755); err != nil {
|
||||||
return fmt.Errorf("failed to create directory %s: %w", dbpath, err)
|
return fmt.Errorf("failed to create directory %s: %w", dbpath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !b.ReadOnly {
|
// lock database directory to prevent multiple instances
|
||||||
// create lockfile to prevent multiple instances
|
lockfilePath := filepath.Join(b.Dir, "mmmm.lock")
|
||||||
lockfilePath := filepath.Join(b.Dir, "mmmm.lock")
|
lockfile, err := os.OpenFile(lockfilePath, os.O_CREATE|os.O_RDWR, 0644)
|
||||||
if _, err := os.OpenFile(lockfilePath, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0644); err != nil {
|
if err != nil {
|
||||||
if os.IsExist(err) {
|
return fmt.Errorf("failed to open lockfile %s: %w", lockfilePath, err)
|
||||||
return fmt.Errorf("database at %s is already in use by another instance", b.Dir)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("failed to create lockfile %s: %w", lockfilePath, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if err := syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
|
||||||
|
lockfile.Close()
|
||||||
|
if errors.Is(err, syscall.EWOULDBLOCK) || errors.Is(err, syscall.EAGAIN) {
|
||||||
|
return fmt.Errorf("database at %s is already in use by another instance", b.Dir)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to lock database at %s: %w", b.Dir, err)
|
||||||
|
}
|
||||||
|
b.lockfile = lockfile
|
||||||
|
|
||||||
// open a huge mmapped file
|
// open a huge mmapped file
|
||||||
b.mmapfPath = filepath.Join(b.Dir, "events")
|
b.mmapfPath = filepath.Join(b.Dir, "events")
|
||||||
@@ -365,6 +376,19 @@ func (b *MultiMmapManager) getNextAvailableLayerId(txn *lmdb.Txn) (uint16, error
|
|||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *MultiMmapManager) releaseLock() {
|
||||||
|
if b.lockfile == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = syscall.Flock(int(b.lockfile.Fd()), syscall.LOCK_UN)
|
||||||
|
_ = b.lockfile.Close()
|
||||||
|
b.lockfile = nil
|
||||||
|
|
||||||
|
lockfilePath := filepath.Join(b.Dir, "mmmm.lock")
|
||||||
|
_ = os.Remove(lockfilePath)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *MultiMmapManager) Close() {
|
func (b *MultiMmapManager) Close() {
|
||||||
b.lmdbEnv.Close()
|
b.lmdbEnv.Close()
|
||||||
for _, il := range b.layers {
|
for _, il := range b.layers {
|
||||||
@@ -373,7 +397,5 @@ func (b *MultiMmapManager) Close() {
|
|||||||
|
|
||||||
syscall.Munmap(b.mmapf)
|
syscall.Munmap(b.mmapf)
|
||||||
|
|
||||||
// remove lockfile
|
b.releaseLock()
|
||||||
lockfilePath := filepath.Join(b.Dir, "mmmm.lock")
|
|
||||||
os.Remove(lockfilePath)
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user