diff --git a/store/zookeeper/zookeeper.go b/store/zookeeper/zookeeper.go index ff8d4ebe..4c29334f 100644 --- a/store/zookeeper/zookeeper.go +++ b/store/zookeeper/zookeeper.go @@ -401,14 +401,16 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store. func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { err := l.lock.Lock() + lostCh := make(chan struct{}) if err == nil { // We hold the lock, we can set our value - // FIXME: The value is left behind - // (problematic for leader election) _, err = l.client.Set(l.key, l.value, -1) + if err == nil { + go l.monitorLock(stopChan, lostCh) + } } - return make(chan struct{}), err + return lostCh, err } // Unlock the "key". Calling unlock while @@ -427,3 +429,31 @@ func (s *Zookeeper) normalize(key string) string { key = store.Normalize(key) return strings.TrimSuffix(key, "/") } + +func (l *zookeeperLock) monitorLock(stopCh <-chan struct{}, lostCh chan struct{}) { + defer close(lostCh) + + for { + _, _, eventCh, err := l.client.GetW(l.key) + if err != nil { + // We failed to set watch, relinquish the lock + return + } + select { + case e := <-eventCh: + if e.Type == zk.EventNotWatching || + (e.Type == zk.EventSession && e.State == zk.StateExpired) { + // Either the session has been closed and our watch has been + // invalidated or the session has expired. + return + } else if e.Type == zk.EventNodeDataChanged { + // Somemone else has written to the lock node and believes + // that they have the lock. + return + } + case <-stopCh: + // The caller has requested that we relinquish our lock + return + } + } +}