Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions query/src/main/java/tech/ydb/query/QuerySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.operation.Operation;
import tech.ydb.core.operation.OperationTray;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.proto.scripting.ScriptingProtos;
import tech.ydb.query.result.OperationResult;
import tech.ydb.query.settings.BeginTransactionSettings;
import tech.ydb.query.settings.ExecuteQuerySettings;
import tech.ydb.query.settings.ExecuteScriptSettings;
import tech.ydb.query.settings.FetchScriptSettings;
import tech.ydb.table.query.Params;

/**
Expand Down Expand Up @@ -68,6 +76,53 @@ public interface QuerySession extends AutoCloseable {
*/
QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings);

/**
* Executes a YQL script via the scripting service and returns its result as a completed future.
*
* <p>This method sends a YQL script for execution and collects the full result set in a single response.
* It uses {@link ScriptingProtos.ExecuteYqlRequest} under the hood and returns
* an {@link OperationResult} wrapped in {@link Result} to provide status and issues details.</p>
*
* @param query the YQL script text to execute
* @param params input parameters for the script
* @param settings execution settings such as statistics collection or tracing
* @return a future that resolves to a {@link Result} containing {@link ScriptingProtos.ExecuteYqlResult}
*/
CompletableFuture<Result<ScriptingProtos.ExecuteYqlResult>> executeScriptYql(String query,
Params params,
ExecuteScriptSettings settings);


/**
* Submits a YQL script for asynchronous execution and returns a handle to the operation.
* Take a not that join return future will not guarantee that script is finished. It's guarantee that script is passed to ydb
*
* <p>This method executes the given script asynchronously and immediately returns
* a {@link CompletableFuture} for an {@link Operation}, which can be later monitored or fetched
* via {@link #waitForScript(CompletableFuture)} or {@link #fetchScriptResults(String, Params, FetchScriptSettings)}.</p>
*
* @param query the YQL script text to execute
* @param params input parameters to pass to the script
* @param settings script execution options such as TTL, statistics mode, or resource pool
* @return a future resolving to an {@link Operation} representing the submitted script execution
*/
CompletableFuture<Operation<Status>> executeScript(String query, Params params, ExecuteScriptSettings settings);

/**
* Fetches partial or complete results from a previously executed YQL script.
*
* <p>This method retrieves result sets produced by an asynchronous script execution.
* It supports incremental fetching using tokens, row limits, and result set index selection.</p>
*
* @param query optional query text for context (not used by the server but may help debugging)
* @param params parameters used during script execution (typically empty)
* @param settings settings that define which operation to fetch results from, including fetch token, row limit, and index
* @return a future resolving to a {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse}
*/
CompletableFuture<Result<YdbQuery.FetchScriptResultsResponse>> fetchScriptResults(String query,
Params params,
FetchScriptSettings settings);

@Override
void close();

Expand Down Expand Up @@ -106,4 +161,46 @@ default QueryStream createQuery(String query, TxMode tx) {
default CompletableFuture<Result<QueryTransaction>> beginTransaction(TxMode txMode) {
return beginTransaction(txMode, BeginTransactionSettings.newBuilder().build());
}

/**
* Executes a YQL script via the scripting service and returns its result as a completed future.
*
* <p>This method sends a YQL script for execution and collects the full result set in a single response.
* It uses {@link ScriptingProtos.ExecuteYqlRequest} under the hood and returns
* an {@link OperationResult} wrapped in {@link Result} to provide status and issues details.</p>
*
* @param query the YQL script text to execute
* @return a future that resolves to a {@link Result} containing {@link ScriptingProtos.ExecuteYqlResult}
*/
default CompletableFuture<Result<ScriptingProtos.ExecuteYqlResult>> executeScriptYql(String query) {
return executeScriptYql(query, Params.empty(), ExecuteScriptSettings.newBuilder().build());
}

/**
* Submits a YQL script for asynchronous execution and returns a handle to the operation.
* Take a not that join return future will not guarantee that script is finished. It's guarantee that script is passed to ydb
*
* <p>This method executes the given script asynchronously and immediately returns
* a {@link CompletableFuture} for an {@link Operation}, which can be later monitored or fetched
* via {@link #waitForScript(CompletableFuture)} or {@link #fetchScriptResults(String, Params, FetchScriptSettings)}.</p>
*
* @param query the YQL script text to execute
* @return a future resolving to an {@link Operation} representing the submitted script execution
*/
default CompletableFuture<Operation<Status>> executeScript(String query) {
return executeScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build());
}

/**
* Waits for a previously submitted script operation to complete.
*
* <p>This method polls or fetches the state of the running operation via {@link OperationTray#fetchOperation}
* until the operation completes successfully or fails. It is typically used after calling
* {@link #executeScript(String, Params, ExecuteScriptSettings)}.</p>
*
* @param scriptFuture a {@link CompletableFuture} returned by {@link #executeScript(String, Params, ExecuteScriptSettings)}
* @return a future resolving to the final {@link Status} of the script execution
*/
CompletableFuture<Status> waitForScript(CompletableFuture<Operation<Status>> scriptFuture);

}
57 changes: 54 additions & 3 deletions query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,25 @@
import io.grpc.Context;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.operation.Operation;
import tech.ydb.core.operation.OperationBinder;
import tech.ydb.core.operation.StatusExtractor;
import tech.ydb.proto.OperationProtos;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.proto.query.v1.QueryServiceGrpc;
import tech.ydb.proto.scripting.ScriptingProtos;
import tech.ydb.proto.scripting.v1.ScriptingServiceGrpc;

/**
* Low-level RPC client for YDB Query and Scripting services.
* <p>
* Provides direct gRPC bindings for session management, query execution,
* transaction control, and script execution APIs.
* <p>
* Used internally by higher-level query session and client abstractions.
*
* @author Aleksandr Gorshenin
*/
Expand Down Expand Up @@ -106,11 +116,52 @@ public GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> executeQuery(
return transport.readStreamCall(QueryServiceGrpc.getExecuteQueryMethod(), settings, request);
}

public CompletableFuture<Result<OperationProtos.Operation>> executeScript(
/**
* Executes a YQL script via the scripting service.
*
* @param request the {@link ScriptingProtos.ExecuteYqlRequest} containing the script definition
* @param settings gRPC request settings
* @return a future resolving to an {@link Operation} with {@link ScriptingProtos.ExecuteYqlResult}
*/
public CompletableFuture<Operation<Result<ScriptingProtos.ExecuteYqlResult>>> executeScriptYql(
ScriptingProtos.ExecuteYqlRequest request, GrpcRequestSettings settings) {

return transport.unaryCall(ScriptingServiceGrpc.getExecuteYqlMethod(), settings, request)
.thenApply(OperationBinder.bindAsync(
transport,
ScriptingProtos.ExecuteYqlResponse::getOperation,
ScriptingProtos.ExecuteYqlResult.class)
);
}

/**
* Executes a YQL script using the Query service API.
*
*
* @param request the {@link YdbQuery.ExecuteScriptRequest} containing the script
* @param settings gRPC request settings
* @return a future resolving to an {@link Operation} representing the script execution
*/
public CompletableFuture<Operation<Status>> executeScript(
YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings) {
return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request);

return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request)
.thenApply(
OperationBinder.bindAsync(transport,
op -> op
));
}

/**
* Fetches the results of a previously executed script.
*
* <p>This method retrieves the next portion of script execution results,
* supporting pagination and partial fetch using tokens.</p>
*
* @param request the {@link YdbQuery.FetchScriptResultsRequest} specifying the fetch parameters
* @param settings gRPC request settings
* @return a future resolving to {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse}
*/
public CompletableFuture<Result<YdbQuery.FetchScriptResultsResponse>> fetchScriptResults(
YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings) {
return transport
Expand Down
95 changes: 95 additions & 0 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.base.Strings;
import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,14 +23,19 @@
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.operation.Operation;
import tech.ydb.core.operation.OperationTray;
import tech.ydb.core.operation.StatusExtractor;
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.utils.URITools;
import tech.ydb.core.utils.UpdatableOptional;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.proto.scripting.ScriptingProtos;
import tech.ydb.proto.table.YdbTable;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
import tech.ydb.query.QueryTransaction;
import tech.ydb.query.result.OperationResult;
import tech.ydb.query.result.QueryInfo;
import tech.ydb.query.result.QueryStats;
import tech.ydb.query.settings.AttachSessionSettings;
Expand All @@ -37,6 +44,8 @@
import tech.ydb.query.settings.CreateSessionSettings;
import tech.ydb.query.settings.DeleteSessionSettings;
import tech.ydb.query.settings.ExecuteQuerySettings;
import tech.ydb.query.settings.ExecuteScriptSettings;
import tech.ydb.query.settings.FetchScriptSettings;
import tech.ydb.query.settings.QueryExecMode;
import tech.ydb.query.settings.QueryStatsMode;
import tech.ydb.query.settings.RollbackTransactionSettings;
Expand Down Expand Up @@ -182,6 +191,19 @@ private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) {
}
}

private static YdbTable.QueryStatsCollection.Mode mapStatsCollectionMode(QueryStatsMode mode) {
switch (mode) {
case NONE: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_NONE;
case BASIC: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_BASIC;
case FULL: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_FULL;
case PROFILE: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_PROFILE;

case UNSPECIFIED:
default:
return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_UNSPECIFIED;
}
}

private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) {
switch (mode) {
case NONE: return YdbQuery.StatsMode.STATS_MODE_NONE;
Expand Down Expand Up @@ -244,6 +266,79 @@ void handleTxMeta(String txID) {
};
}

@Override
public CompletableFuture<Result<ScriptingProtos.ExecuteYqlResult>> executeScriptYql(
String query,
Params params,
ExecuteScriptSettings settings) {
ScriptingProtos.ExecuteYqlRequest.Builder requestBuilder = ScriptingProtos.ExecuteYqlRequest.newBuilder()
.setScript(query)
.setCollectStats(mapStatsCollectionMode(settings.getStatsMode()));

requestBuilder.putAllParameters(params.toPb());

GrpcRequestSettings.Builder options = makeOptions(settings);

return rpc.executeScriptYql(requestBuilder.build(), options.build()).thenApply(OperationResult::new);
}

@Override
public CompletableFuture<Operation<Status>> executeScript(String query,
Params params,
ExecuteScriptSettings settings) {
YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder()
.setExecMode(mapExecMode(settings.getExecMode()))
.setStatsMode(mapStatsMode(settings.getStatsMode()))
.setScriptContent(YdbQuery.QueryContent.newBuilder()
.setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1)
.setText(query)
.build());

java.time.Duration ttl = settings.getTtl();
if (ttl != null) {
request.setResultsTtl(Duration.newBuilder().setNanos(settings.getTtl().getNano()));
}

String resourcePool = settings.getResourcePool();
if (resourcePool != null && !resourcePool.isEmpty()) {
request.setPoolId(resourcePool);
}

request.putAllParameters(params.toPb());

GrpcRequestSettings.Builder options = makeOptions(settings);

return rpc.executeScript(request.build(), options.build());
}

@Override
public CompletableFuture<Status> waitForScript(CompletableFuture<Operation<Status>> scriptFuture) {
return scriptFuture.thenCompose(operation -> OperationTray.fetchOperation(operation, 1));
}

@Override
public CompletableFuture<Result<YdbQuery.FetchScriptResultsResponse>>
fetchScriptResults(String query, Params params, FetchScriptSettings settings) {
YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder();

if (!Strings.isNullOrEmpty(settings.getFetchToken())) {
requestBuilder.setFetchToken(settings.getFetchToken());
}

if (settings.getRowsLimit() > 0) {
requestBuilder.setRowsLimit(settings.getRowsLimit());
}

requestBuilder.setOperationId(settings.getOperationId());

if (settings.getSetResultSetIndex() >= 0) {
requestBuilder.setResultSetIndex(settings.getSetResultSetIndex());
}

GrpcRequestSettings.Builder options = makeOptions(settings);
return rpc.fetchScriptResults(requestBuilder.build(), options.build());
}

public CompletableFuture<Result<YdbQuery.DeleteSessionResponse>> delete(DeleteSessionSettings settings) {
YdbQuery.DeleteSessionRequest request = YdbQuery.DeleteSessionRequest.newBuilder()
.setSessionId(sessionId)
Expand Down
Loading