Skip to content

Commit 1f1d41f

Browse files
Miles-Garnseyemerkle826
authored andcommitted
Reaper endpoints: Async Repair Endpoint (#358)
* Allow `repair()` RPC method to take more arguments (as required by Reaper). Get the old NodeOpsResources class calling the new RPC method signature. --------- Co-authored-by: Erik Merkle <erik.merkle@datastax.com> * Cancel repairs endpoint (#368) * Add cancelAllRepairs endpoint and requisite NodeOps.
1 parent 53d3ce9 commit 1f1d41f

File tree

13 files changed

+741
-90
lines changed

13 files changed

+741
-90
lines changed

management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java

Lines changed: 105 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -746,96 +746,123 @@ public String repair(
746746
@RpcParam(name = "keyspaceName") String keyspace,
747747
@RpcParam(name = "tables") List<String> tables,
748748
@RpcParam(name = "full") Boolean full,
749-
@RpcParam(name = "notifications") boolean notifications)
749+
@RpcParam(name = "notifications") boolean notifications,
750+
@RpcParam(name = "repairParallelism") String repairParallelism,
751+
@RpcParam(name = "datacenters") List<String> datacenters,
752+
@RpcParam(name = "associatedTokens") String ringRangeString,
753+
@RpcParam(name = "repairThreadCount") Integer repairThreadCount)
750754
throws IOException {
751755
// At least one keyspace is required
752-
if (keyspace != null) {
753-
// create the repair spec
754-
Map<String, String> repairSpec = new HashMap<>();
755-
756-
// add any specified tables to the repair spec
757-
if (tables != null && !tables.isEmpty()) {
758-
// set the tables/column families
759-
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
756+
assert (keyspace != null);
757+
Map<String, String> repairSpec = new HashMap<>();
758+
// add tables/column families
759+
if (tables != null && !tables.isEmpty()) {
760+
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
761+
}
762+
// set incremental reapir
763+
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
764+
// Parallelism should be set if it's requested OR if incremental repair is requested.
765+
if (!full) {
766+
// Incremental repair requested, make sure parallelism is correct
767+
if (repairParallelism != null
768+
&& !RepairParallelism.PARALLEL.getName().equals(repairParallelism)) {
769+
throw new IOException(
770+
"Invalid repair combination. Incremental repair if Parallelism is not set");
760771
}
761-
762-
// handle incremental vs full
763-
boolean isIncremental = Boolean.FALSE.equals(full);
764-
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental));
765-
if (isIncremental) {
766-
// incremental repairs will fail if parallelism is not set
767-
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
772+
// Incremental repair and parallelism should be set
773+
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
774+
}
775+
if (repairThreadCount != null) {
776+
// if specified, the value should be at least 1
777+
if (repairThreadCount.compareTo(Integer.valueOf(0)) <= 0) {
778+
throw new IOException(
779+
"Invalid repari thread count: "
780+
+ repairThreadCount
781+
+ ". Value should be greater than 0");
768782
}
783+
repairSpec.put(RepairOption.JOB_THREADS_KEY, repairThreadCount.toString());
784+
}
785+
repairSpec.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
769786

770-
// Since Cassandra provides us with a async, we don't need to use our executor interface for
771-
// this.
772-
final int repairJobId =
773-
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);
787+
if (ringRangeString != null && !ringRangeString.isEmpty()) {
788+
repairSpec.put(RepairOption.RANGES_KEY, ringRangeString);
789+
}
790+
// add datacenters to the repair spec
791+
if (datacenters != null && !datacenters.isEmpty()) {
792+
repairSpec.put(RepairOption.DATACENTERS_KEY, String.join(",", datacenters));
793+
}
774794

775-
if (!notifications) {
776-
return Integer.valueOf(repairJobId).toString();
777-
}
795+
// Since Cassandra provides us with a async, we don't need to use our executor interface for
796+
// this.
797+
final int repairJobId =
798+
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);
778799

779-
String jobId = String.format("repair-%d", repairJobId);
780-
final Job job = service.createJob("repair", jobId);
800+
if (!notifications) {
801+
return Integer.valueOf(repairJobId).toString();
802+
}
781803

782-
if (repairJobId == 0) {
783-
// Job is done and won't continue
784-
job.setStatusChange(ProgressEventType.COMPLETE, "");
785-
job.setStatus(Job.JobStatus.COMPLETED);
786-
job.setFinishedTime(System.currentTimeMillis());
787-
service.updateJob(job);
788-
return job.getJobId();
789-
}
790-
791-
ShimLoader.instance
792-
.get()
793-
.getStorageService()
794-
.addNotificationListener(
795-
(notification, handback) -> {
796-
if (notification.getType().equals("progress")) {
797-
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
798-
ProgressEventType progress = ProgressEventType.values()[data.get("type")];
799-
800-
switch (progress) {
801-
case START:
802-
job.setStatusChange(progress, notification.getMessage());
803-
job.setStartTime(System.currentTimeMillis());
804-
break;
805-
case NOTIFICATION:
806-
case PROGRESS:
807-
break;
808-
case ERROR:
809-
case ABORT:
810-
job.setError(new RuntimeException(notification.getMessage()));
811-
job.setStatus(Job.JobStatus.ERROR);
812-
job.setFinishedTime(System.currentTimeMillis());
813-
break;
814-
case SUCCESS:
815-
job.setStatusChange(progress, notification.getMessage());
816-
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
817-
break;
818-
case COMPLETE:
819-
job.setStatusChange(progress, notification.getMessage());
820-
job.setStatus(Job.JobStatus.COMPLETED);
821-
job.setFinishedTime(System.currentTimeMillis());
822-
break;
823-
}
824-
service.updateJob(job);
825-
}
826-
},
827-
(NotificationFilter)
828-
notification -> {
829-
final int repairNo =
830-
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
831-
return repairNo == repairJobId;
832-
},
833-
null);
804+
String jobId = String.format("repair-%d", repairJobId);
805+
final Job job = service.createJob("repair", jobId);
834806

807+
if (repairJobId == 0) {
808+
// Job is done and won't continue
809+
job.setStatusChange(ProgressEventType.COMPLETE, "");
810+
job.setStatus(Job.JobStatus.COMPLETED);
811+
job.setFinishedTime(System.currentTimeMillis());
812+
service.updateJob(job);
835813
return job.getJobId();
836814
}
837815

838-
throw new RuntimeException("At least one keyspace must be defined");
816+
ShimLoader.instance
817+
.get()
818+
.getStorageService()
819+
.addNotificationListener(
820+
(notification, handback) -> {
821+
if (notification.getType().equals("progress")) {
822+
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
823+
ProgressEventType progress = ProgressEventType.values()[data.get("type")];
824+
825+
switch (progress) {
826+
case START:
827+
job.setStatusChange(progress, notification.getMessage());
828+
job.setStartTime(System.currentTimeMillis());
829+
break;
830+
case NOTIFICATION:
831+
case PROGRESS:
832+
break;
833+
case ERROR:
834+
case ABORT:
835+
job.setError(new RuntimeException(notification.getMessage()));
836+
job.setStatus(Job.JobStatus.ERROR);
837+
job.setFinishedTime(System.currentTimeMillis());
838+
break;
839+
case SUCCESS:
840+
job.setStatusChange(progress, notification.getMessage());
841+
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
842+
break;
843+
case COMPLETE:
844+
job.setStatusChange(progress, notification.getMessage());
845+
job.setStatus(Job.JobStatus.COMPLETED);
846+
job.setFinishedTime(System.currentTimeMillis());
847+
break;
848+
}
849+
service.updateJob(job);
850+
}
851+
},
852+
(NotificationFilter)
853+
notification -> {
854+
final int repairNo =
855+
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
856+
return repairNo == repairJobId;
857+
},
858+
null);
859+
860+
return job.getJobId();
861+
}
862+
863+
@Rpc(name = "stopAllRepairs")
864+
public void stopAllRepairs() {
865+
ShimLoader.instance.get().getStorageService().forceTerminateAllRepairSessions();
839866
}
840867

841868
@Rpc(name = "move")

management-api-server/doc/openapi.json

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1740,6 +1740,75 @@
17401740
},
17411741
"summary" : "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version). This operation is asynchronous and returns immediately."
17421742
}
1743+
},
1744+
"/api/v2/repairs" : {
1745+
"delete" : {
1746+
"operationId" : "deleteRepairsV2",
1747+
"responses" : {
1748+
"202" : {
1749+
"content" : {
1750+
"application/json" : {
1751+
"example" : "Accepted",
1752+
"schema" : {
1753+
"$ref" : "#/components/schemas/RepairRequestResponse"
1754+
}
1755+
}
1756+
},
1757+
"description" : "Cancel repairs Successfully requested"
1758+
}
1759+
},
1760+
"summary" : "Cancel all repairs"
1761+
},
1762+
"put" : {
1763+
"operationId" : "putRepairV2",
1764+
"requestBody" : {
1765+
"content" : {
1766+
"application/json" : {
1767+
"schema" : {
1768+
"$ref" : "#/components/schemas/RepairRequest"
1769+
}
1770+
}
1771+
}
1772+
},
1773+
"responses" : {
1774+
"202" : {
1775+
"content" : {
1776+
"application/json" : {
1777+
"example" : "Accepted",
1778+
"schema" : {
1779+
"$ref" : "#/components/schemas/RepairRequestResponse"
1780+
}
1781+
}
1782+
},
1783+
"description" : "Repair Successfully requested"
1784+
},
1785+
"400" : {
1786+
"content" : {
1787+
"text/plain" : {
1788+
"example" : "keyspace must be specified",
1789+
"schema" : {
1790+
"type" : "string",
1791+
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
1792+
}
1793+
}
1794+
},
1795+
"description" : "Repair request missing Keyspace name"
1796+
},
1797+
"500" : {
1798+
"content" : {
1799+
"text/plain" : {
1800+
"example" : "internal error, we did not receive the expected repair ID from Cassandra.",
1801+
"schema" : {
1802+
"type" : "string",
1803+
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
1804+
}
1805+
}
1806+
},
1807+
"description" : "internal error, we did not receive the expected repair ID from Cassandra."
1808+
}
1809+
},
1810+
"summary" : "Initiate a new repair"
1811+
}
17431812
}
17441813
},
17451814
"components" : {
@@ -2014,20 +2083,52 @@
20142083
"RepairRequest" : {
20152084
"type" : "object",
20162085
"properties" : {
2017-
"full" : {
2086+
"associated_tokens" : {
2087+
"type" : "array",
2088+
"items" : {
2089+
"$ref" : "#/components/schemas/RingRange"
2090+
}
2091+
},
2092+
"datacenters" : {
2093+
"type" : "array",
2094+
"items" : {
2095+
"type" : "string"
2096+
}
2097+
},
2098+
"full_repair" : {
20182099
"type" : "boolean"
20192100
},
2020-
"keyspace_name" : {
2101+
"keyspace" : {
20212102
"type" : "string"
20222103
},
2104+
"notifications" : {
2105+
"type" : "boolean"
2106+
},
2107+
"repair_parallelism" : {
2108+
"type" : "string",
2109+
"enum" : [ "sequential", "parallel", "dc_parallel" ]
2110+
},
2111+
"repair_thread_count" : {
2112+
"type" : "integer",
2113+
"format" : "int32"
2114+
},
20232115
"tables" : {
20242116
"type" : "array",
20252117
"items" : {
20262118
"type" : "string"
20272119
}
20282120
}
20292121
},
2030-
"required" : [ "keyspace_name" ]
2122+
"required" : [ "keyspace" ]
2123+
},
2124+
"RepairRequestResponse" : {
2125+
"type" : "object",
2126+
"properties" : {
2127+
"repair_id" : {
2128+
"type" : "string"
2129+
}
2130+
},
2131+
"required" : [ "repair_id" ]
20312132
},
20322133
"ReplicationSetting" : {
20332134
"type" : "object",
@@ -2042,6 +2143,18 @@
20422143
},
20432144
"required" : [ "dc_name", "replication_factor" ]
20442145
},
2146+
"RingRange" : {
2147+
"type" : "object",
2148+
"properties" : {
2149+
"end" : {
2150+
"type" : "integer"
2151+
},
2152+
"start" : {
2153+
"type" : "integer"
2154+
}
2155+
},
2156+
"required" : [ "end", "start" ]
2157+
},
20452158
"ScrubRequest" : {
20462159
"type" : "object",
20472160
"properties" : {

management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,11 +503,17 @@ public Response repair(RepairRequest repairRequest) {
503503
}
504504
app.cqlService.executePreparedStatement(
505505
app.dbUnixSocketFile,
506-
"CALL NodeOps.repair(?, ?, ?, ?)",
506+
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
507507
repairRequest.keyspaceName,
508508
repairRequest.tables,
509509
repairRequest.full,
510-
false);
510+
false,
511+
// The default repair does not allow for specifying things like parallelism,
512+
// threadCounts, source DCs or ranges etc.
513+
null,
514+
null,
515+
null,
516+
null);
511517

512518
return Response.ok("OK").build();
513519
});

management-api-server/src/main/java/com/datastax/mgmtapi/resources/common/BaseResources.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ protected Response handle(Callable<Response> action) {
6565
.entity("Internal connection to Cassandra closed")
6666
.build();
6767
} catch (Throwable t) {
68+
t.printStackTrace();
6869
return Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR)
6970
.entity(t.getLocalizedMessage())
7071
.build();

management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,15 @@ public Response repair(RepairRequest repairRequest) {
154154
ResponseTools.getSingleRowStringResponse(
155155
app.dbUnixSocketFile,
156156
app.cqlService,
157-
"CALL NodeOps.repair(?, ?, ?, ?)",
157+
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
158158
repairRequest.keyspaceName,
159159
repairRequest.tables,
160160
repairRequest.full,
161-
true))
161+
true,
162+
null,
163+
null,
164+
null,
165+
null))
162166
.build();
163167
});
164168
}

0 commit comments

Comments
 (0)