Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 12 additions & 46 deletions core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,26 @@ public enum LockLossReason {
LOCK_DELETED, SESSION_EXPIRED
}

public interface LockWatcher {
public interface AccumuloLockWatcher {

void acquiredLock();

void failedToAcquireLock(Exception e);

void lostLock(LockLossReason reason);

/**
* lost the ability to monitor the lock node, and its status is unknown
*/
void unableToMonitorLockNode(Exception e);
}

public interface AccumuloLockWatcher extends LockWatcher {
void acquiredLock();

void failedToAcquireLock(Exception e);
}

private final ServiceLockPath path;
protected final ZooSession zooKeeper;
private final Prefix vmLockPrefix;

private LockWatcher lockWatcher;
private AccumuloLockWatcher lockWatcher;
private String lockNodeName;
private volatile boolean lockWasAcquired;
private volatile boolean watchingParent;
Expand All @@ -112,45 +112,11 @@ public ServiceLock(ZooSession zookeeper, ServiceLockPath path, UUID uuid) {
}
}

private static class LockWatcherWrapper implements AccumuloLockWatcher {

boolean acquiredLock = false;
final LockWatcher lw;

public LockWatcherWrapper(LockWatcher lw2) {
this.lw = lw2;
}

@Override
public void acquiredLock() {
acquiredLock = true;
}

@Override
public void failedToAcquireLock(Exception e) {
LOG.debug("Failed to acquire lock", e);
}

@Override
public void lostLock(LockLossReason reason) {
lw.lostLock(reason);
}

@Override
public void unableToMonitorLockNode(Exception e) {
lw.unableToMonitorLockNode(e);
}

}

public synchronized boolean tryLock(LockWatcher lw, ServiceLockData lockData)
public synchronized boolean tryLock(AccumuloLockWatcher lw, ServiceLockData lockData)
throws KeeperException, InterruptedException {

LockWatcherWrapper lww = new LockWatcherWrapper(lw);

lock(lww, lockData);

if (lww.acquiredLock) {
lock(lw, lockData);
if (isLocked()) {
return true;
}

Expand Down Expand Up @@ -370,7 +336,7 @@ public void process(WatchedEvent event) {
}

private void lostLock(LockLossReason reason) {
LockWatcher localLw = lockWatcher;
AccumuloLockWatcher localLw = lockWatcher;
lockNodeName = null;
lockId = null;
lockWatcher = null;
Expand Down Expand Up @@ -538,7 +504,7 @@ public synchronized void unlock() throws InterruptedException, KeeperException {
throw new IllegalStateException();
}

LockWatcher localLw = lockWatcher;
AccumuloLockWatcher localLw = lockWatcher;
String localLock = lockNodeName;

lockNodeName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.util.Halt;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -138,17 +137,18 @@ public boolean isLockAcquired() {
return acquiredLock;
}

public boolean isFailedToAcquireLock() {
return failedToAcquireLock;
public boolean cannotRetryLocking() {
return acquiredLock && !failedToAcquireLock;
}

}

/**
* Lock Watcher used by non-HA services
*/
public static class ServiceLockWatcher implements LockWatcher {
public static class ServiceLockWatcher implements AccumuloLockWatcher {

private final Logger log = LoggerFactory.getLogger(ServiceLockWatcher.class);
private final Type server;
private final Supplier<Boolean> shutdownComplete;
private final Consumer<Type> lostLockAction;
Expand All @@ -160,6 +160,16 @@ public ServiceLockWatcher(Type server, Supplier<Boolean> shutdownComplete,
this.lostLockAction = lostLockAction;
}

@Override
public void acquiredLock() {
LOG.debug("Acquired {} lock", server);
}

@Override
public void failedToAcquireLock(Exception e) {
log.debug("Failed to acquire lock", e);
}

@Override
public void lostLock(final LockLossReason reason) {
if (shutdownComplete.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
Expand Down Expand Up @@ -373,8 +373,9 @@ protected void announceExistence(HostAndPort clientAddress)
getContext().getServerPaths().createCompactorPath(getResourceGroup(), clientAddress);
ServiceLockSupport.createNonHaServiceLockPath(Type.COMPACTOR, zoo, path);
compactorLock = new ServiceLock(getContext().getZooSession(), path, compactorId);
LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> getShutdownComplete().get(),
(type) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));
AccumuloLockWatcher lw =
new ServiceLockWatcher(Type.COMPACTOR, () -> getShutdownComplete().get(),
(type) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));

try {
for (int i = 0; i < 25; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedExc
break;
}

if (!gcLockWatcher.isFailedToAcquireLock()) {
if (gcLockWatcher.cannotRetryLocking()) {
throw new IllegalStateException("gc lock in unknown state");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,7 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc)
break;
}

if (!managerLockWatcher.isFailedToAcquireLock()) {
if (managerLockWatcher.cannotRetryLocking()) {
throw new IllegalStateException("manager lock in unknown state");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private void getMonitorLock(HostAndPort monitorLocation)
UUID zooLockUUID = UUID.randomUUID();
monitorLock = new ServiceLock(context.getZooSession(), monitorLockPath, zooLockUUID);
HAServiceLockWatcher monitorLockWatcher =
new HAServiceLockWatcher(Type.MONITOR, () -> isShutdownRequested());
new HAServiceLockWatcher(Type.MONITOR, this::isShutdownRequested);

while (true) {
monitorLock.lock(monitorLockWatcher,
Expand All @@ -774,7 +774,7 @@ private void getMonitorLock(HostAndPort monitorLocation)
break;
}

if (!monitorLockWatcher.isFailedToAcquireLock()) {
if (monitorLockWatcher.cannotRetryLocking()) {
throw new IllegalStateException("monitor lock in unknown state");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
Expand Down Expand Up @@ -320,8 +320,9 @@ private ServiceLock announceExistence() {
ServiceLockSupport.createNonHaServiceLockPath(Type.SCAN_SERVER, zoo, zLockPath);
serverLockUUID = UUID.randomUUID();
scanServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, serverLockUUID);
LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> getShutdownComplete().get(),
(type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));
AccumuloLockWatcher lw =
new ServiceLockWatcher(Type.SCAN_SERVER, () -> getShutdownComplete().get(),
(type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));

for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
Expand Down Expand Up @@ -490,8 +490,9 @@ private void announceExistence() {
UUID tabletServerUUID = UUID.randomUUID();
tabletServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, tabletServerUUID);

LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> getShutdownComplete().get(),
(type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));
AccumuloLockWatcher lw =
new ServiceLockWatcher(Type.TABLET_SERVER, () -> getShutdownComplete().get(),
(type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration()));

for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void acquiredLock() {

@Override
public void failedToAcquireLock(Exception e) {
log.warn("Failed to acquire ZooKeeper lock for test, msg: " + e.getMessage());
log.warn("Failed to acquire ZooKeeper lock for test", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
Expand Down Expand Up @@ -144,7 +144,17 @@ public static void main(String[] args) throws Exception {
metricsInfo.init(MetricsInfo.serviceTags(context.getInstanceName(), "zombie.server",
serverPort.address, ResourceGroupId.DEFAULT));

LockWatcher lw = new LockWatcher() {
AccumuloLockWatcher lw = new AccumuloLockWatcher() {

@Override
public void acquiredLock() {

}

@Override
public void failedToAcquireLock(Exception e) {
log.warn("Failed to acquire lock", e);
}

@SuppressFBWarnings(value = "DM_EXIT",
justification = "System.exit() is a bad idea here, but okay for now, since it's a test")
Expand Down