Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions src/carnot/exec/grpc_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ class GRPCRouter final : public carnotpb::ResultSinkService::Service {
*/
struct SourceNodeTracker {
SourceNodeTracker() = default;
GRPCSourceNode* source_node GUARDED_BY(node_lock) = nullptr;
GRPCSourceNode* source_node ABSL_GUARDED_BY(node_lock) = nullptr;
// connection_initiated_by_sink and connection_closed_by_sink are true when the
// grpc sink (aka the client) initiates the query result stream or closes a query result stream,
// respectively.
bool connection_initiated_by_sink GUARDED_BY(node_lock) = false;
bool connection_closed_by_sink GUARDED_BY(node_lock) = false;
bool connection_initiated_by_sink ABSL_GUARDED_BY(node_lock) = false;
bool connection_closed_by_sink ABSL_GUARDED_BY(node_lock) = false;
std::vector<std::unique_ptr<::px::carnotpb::TransferResultChunkRequest>> response_backlog
GUARDED_BY(node_lock);
ABSL_GUARDED_BY(node_lock);
absl::base_internal::SpinLock node_lock;
};

Expand All @@ -126,17 +126,18 @@ class GRPCRouter final : public carnotpb::ResultSinkService::Service {
*/
struct QueryTracker {
QueryTracker() : create_time(std::chrono::steady_clock::now()) {}
absl::node_hash_map<int64_t, SourceNodeTracker> source_node_trackers GUARDED_BY(query_lock);
const std::chrono::steady_clock::time_point create_time GUARDED_BY(query_lock);
std::function<void()> restart_execution_func_ GUARDED_BY(query_lock);
absl::node_hash_map<int64_t, SourceNodeTracker> source_node_trackers
ABSL_GUARDED_BY(query_lock);
const std::chrono::steady_clock::time_point create_time ABSL_GUARDED_BY(query_lock);
std::function<void()> restart_execution_func_ ABSL_GUARDED_BY(query_lock);
// The set of agents we've seen for the query.
absl::flat_hash_set<sole::uuid> seen_agents GUARDED_BY(query_lock);
absl::flat_hash_set<::grpc::ServerContext*> active_agent_contexts GUARDED_BY(query_lock);
absl::flat_hash_set<sole::uuid> seen_agents ABSL_GUARDED_BY(query_lock);
absl::flat_hash_set<::grpc::ServerContext*> active_agent_contexts ABSL_GUARDED_BY(query_lock);
// The execution stats for agents that are clients to this service.
std::vector<queryresultspb::AgentExecutionStats> agent_exec_stats GUARDED_BY(query_lock);
std::vector<queryresultspb::AgentExecutionStats> agent_exec_stats ABSL_GUARDED_BY(query_lock);

// Errors that occur during execution from parent_agents.
std::vector<statuspb::Status> upstream_exec_errors GUARDED_BY(query_lock);
std::vector<statuspb::Status> upstream_exec_errors ABSL_GUARDED_BY(query_lock);
absl::base_internal::SpinLock query_lock;

void ResetRestartExecutionFunc() ABSL_EXCLUSIVE_LOCKS_REQUIRED(query_lock) {
Expand Down Expand Up @@ -178,7 +179,7 @@ class GRPCRouter final : public carnotpb::ResultSinkService::Service {
SourceNodeTracker* GetSourceNodeTracker(QueryTracker* query_tracker, int64_t source_id);

absl::node_hash_map<sole::uuid, std::shared_ptr<QueryTracker>> id_to_query_tracker_map_
GUARDED_BY(id_to_query_tracker_map_lock_);
ABSL_GUARDED_BY(id_to_query_tracker_map_lock_);
mutable absl::base_internal::SpinLock id_to_query_tracker_map_lock_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/carnot/udf/borrow_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class BorrowPool {
}

private:
std::vector<StoredPtrType> pool_ GUARDED_BY(pool_lock_);
std::vector<StoredPtrType> pool_ ABSL_GUARDED_BY(pool_lock_);
absl::base_internal::SpinLock pool_lock_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/experimental/standalone_pem/sink_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class StandaloneResultSinkServer final : public carnotpb::ResultSinkService::Ser
}

absl::flat_hash_map<sole::uuid, ::grpc::ServerWriter<::px::api::vizierpb::ExecuteScriptResponse>*>
consumer_map_ GUARDED_BY(id_to_query_consumer_map_lock_);
consumer_map_ ABSL_GUARDED_BY(id_to_query_consumer_map_lock_);
mutable absl::base_internal::SpinLock id_to_query_consumer_map_lock_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/vizier/services/agent/shared/manager/chan_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ChanCache {
};

// The cache of channels (grpc conns) made to other agents.
absl::flat_hash_map<std::string, Channel> chan_cache_ GUARDED_BY(chan_cache_lock_);
absl::flat_hash_map<std::string, Channel> chan_cache_ ABSL_GUARDED_BY(chan_cache_lock_);
absl::base_internal::SpinLock chan_cache_lock_;
// Connections that are alive for shorter than warm_up_period_ won't be cleared.
std::chrono::nanoseconds warm_up_period_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class RelationInfoManager {
private:
mutable std::atomic<bool> has_updates_ = false;
mutable absl::base_internal::SpinLock relation_info_map_lock_;
absl::btree_map<std::string, RelationInfo> relation_info_map_ GUARDED_BY(relation_info_map_lock_);
absl::btree_map<std::string, RelationInfo> relation_info_map_
ABSL_GUARDED_BY(relation_info_map_lock_);
};

} // namespace agent
Expand Down