Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import io.serverlessworkflow.impl.expressions.RuntimeDescriptor;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.StaticResource;
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
Expand Down Expand Up @@ -124,7 +124,7 @@ public void validate(WorkflowModel node) {}
};

@Override
public SchemaValidator getValidator(StaticResource resource) {
public SchemaValidator getValidator(ExternalResourceHandler resource) {
return NoValidation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow)
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, Path path) {
WorkflowDefinition definition =
new WorkflowDefinition(
application, workflow, application.resourceLoaderFactory().getResourceLoader(path));
application,
workflow,
application.resourceLoaderFactory().getResourceLoader(application, path));
Schedule schedule = workflow.getSchedule();
if (schedule != null) {
ListenTo to = schedule.getOn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ public static Optional<SchemaValidator> getSchemaValidator(
return Optional.of(validatorFactory.getValidator(schema.getSchemaInline()));
} else if (schema.getSchemaExternal() != null) {
return Optional.of(
validatorFactory.getValidator(
resourceLoader.loadStatic(schema.getSchemaExternal().getResource())));
resourceLoader.load(
schema.getSchemaExternal().getResource(),
validatorFactory::getValidator,
null,
null,
null));
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
*/
package io.serverlessworkflow.impl.resources;

import java.io.InputStream;
import java.time.Instant;

public interface StaticResource {
InputStream open();

String name();
}
public record CachedResource<T>(Instant lastReload, T content) {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.io.InputStream;

public class ClasspathResource implements StaticResource {
public class ClasspathResource implements ExternalResourceHandler {

private String path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,67 @@
import io.serverlessworkflow.api.types.EndpointUri;
import io.serverlessworkflow.api.types.ExternalResource;
import io.serverlessworkflow.api.types.UriTemplate;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

public class DefaultResourceLoader implements ResourceLoader {

private final Optional<Path> workflowPath;
private final WorkflowApplication application;

protected DefaultResourceLoader(Path workflowPath) {
this.workflowPath = Optional.ofNullable(workflowPath);
}
private final AtomicReference<URITemplateResolver> templateResolver =
new AtomicReference<URITemplateResolver>();

@Override
public StaticResource loadStatic(ExternalResource resource) {
return processEndpoint(resource.getEndpoint());
}
private Map<ExternalResourceHandler, CachedResource> resourceCache = new LRUCache<>(100);
private Lock cacheLock = new ReentrantLock();

@Override
public DynamicResource loadDynamic(
WorkflowContext workflow, ExternalResource resource, ExpressionFactory factory) {
throw new UnsupportedOperationException("Dynamic loading of resources is not suppported");
protected DefaultResourceLoader(WorkflowApplication application, Path workflowPath) {
this.application = application;
this.workflowPath = Optional.ofNullable(workflowPath);
}

private StaticResource buildFromString(String uri) {
return fileResource(uri);
private URITemplateResolver templateResolver() {
URITemplateResolver result = templateResolver.get();
if (result == null) {
result =
ServiceLoader.load(URITemplateResolver.class)
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Need an uri template resolver to resolve uri template"));
templateResolver.set(result);
}
return result;
}

private StaticResource fileResource(String pathStr) {
private ExternalResourceHandler fileResource(String pathStr) {
Path path = Path.of(pathStr);
if (path.isAbsolute()) {
return new FileResource(path);
} else {
return workflowPath
.<StaticResource>map(p -> new FileResource(p.resolve(path)))
.<ExternalResourceHandler>map(p -> new FileResource(p.resolve(path)))
.orElseGet(() -> new ClasspathResource(pathStr));
}
}

private StaticResource buildFromURI(URI uri) {
private ExternalResourceHandler buildFromURI(URI uri) {
String scheme = uri.getScheme();
if (scheme == null || scheme.equalsIgnoreCase("file")) {
return fileResource(uri.getPath());
Expand All @@ -75,31 +94,79 @@ private StaticResource buildFromURI(URI uri) {
}
}

private StaticResource processEndpoint(Endpoint endpoint) {
@Override
public <T> T load(
ExternalResource resource,
Function<ExternalResourceHandler, T> function,
WorkflowContext workflowContext,
TaskContext taskContext,
WorkflowModel model) {
ExternalResourceHandler resourceHandler =
buildFromURI(
uriSupplier(resource.getEndpoint())
.apply(
workflowContext,
taskContext,
model == null ? application.modelFactory().fromNull() : model));
try {
CachedResource<T> cachedResource;
cacheLock.lock();
cachedResource = resourceCache.get(resourceHandler);
cacheLock.unlock();
if (cachedResource == null || resourceHandler.shouldReload(cachedResource.lastReload())) {
cachedResource = new CachedResource(Instant.now(), function.apply(resourceHandler));
cacheLock.lock();
resourceCache.put(resourceHandler, cachedResource);
}
return cachedResource.content();
} finally {
cacheLock.unlock();
}
}

@Override
public WorkflowValueResolver<URI> uriSupplier(Endpoint endpoint) {
if (endpoint.getEndpointConfiguration() != null) {
EndpointUri uri = endpoint.getEndpointConfiguration().getUri();
if (uri.getLiteralEndpointURI() != null) {
return getURI(uri.getLiteralEndpointURI());
return getURISupplier(uri.getLiteralEndpointURI());
} else if (uri.getExpressionEndpointURI() != null) {
throw new UnsupportedOperationException(
"Expression not supported for loading a static resource");
return new ExpressionURISupplier(
application
.expressionFactory()
.resolveString(ExpressionDescriptor.from(uri.getExpressionEndpointURI())));
}
} else if (endpoint.getRuntimeExpression() != null) {
throw new UnsupportedOperationException(
"Expression not supported for loading a static resource");
return new ExpressionURISupplier(
application
.expressionFactory()
.resolveString(ExpressionDescriptor.from(endpoint.getRuntimeExpression())));
} else if (endpoint.getUriTemplate() != null) {
return getURI(endpoint.getUriTemplate());
return getURISupplier(endpoint.getUriTemplate());
}
throw new IllegalArgumentException("Invalid endpoint definition " + endpoint);
}

private StaticResource getURI(UriTemplate template) {
private WorkflowValueResolver<URI> getURISupplier(UriTemplate template) {
if (template.getLiteralUri() != null) {
return buildFromURI(template.getLiteralUri());
return (w, t, n) -> template.getLiteralUri();
} else if (template.getLiteralUriTemplate() != null) {
return buildFromString(template.getLiteralUriTemplate());
} else {
throw new IllegalStateException("Invalid endpoint definition" + template);
return (w, t, n) ->
templateResolver().resolveTemplates(template.getLiteralUriTemplate(), w, t, n);
}
throw new IllegalArgumentException("Invalid uritemplate definition " + template);
}

private class ExpressionURISupplier implements WorkflowValueResolver<URI> {
private WorkflowValueResolver<String> expr;

public ExpressionURISupplier(WorkflowValueResolver<String> expr) {
this.expr = expr;
}

@Override
public URI apply(WorkflowContext workflow, TaskContext task, WorkflowModel node) {
return URI.create(expr.apply(workflow, task, node));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.serverlessworkflow.impl.resources;

import io.serverlessworkflow.impl.WorkflowApplication;
import java.nio.file.Path;

public class DefaultResourceLoaderFactory implements ResourceLoaderFactory {
Expand All @@ -28,7 +29,7 @@ public static final ResourceLoaderFactory get() {
private DefaultResourceLoaderFactory() {}

@Override
public ResourceLoader getResourceLoader(Path path) {
return new DefaultResourceLoader(path);
public ResourceLoader getResourceLoader(WorkflowApplication application, Path path) {
return new DefaultResourceLoader(application, path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.http;
package io.serverlessworkflow.impl.resources;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;
import jakarta.ws.rs.client.WebTarget;
import java.io.InputStream;
import java.time.Instant;

public interface TargetSupplier {
WebTarget apply(WorkflowContext workflow, TaskContext task, WorkflowModel node);
public interface ExternalResourceHandler {

String name();

InputStream open();

default boolean shouldReload(Instant lasUpdate) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.nio.file.Files;
import java.nio.file.Path;

class FileResource implements StaticResource {
class FileResource implements ExternalResourceHandler {

private Path path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.UncheckedIOException;
import java.net.URL;

public class HttpResource implements StaticResource {
public class HttpResource implements ExternalResourceHandler {

private URL url;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.resources;

import java.util.LinkedHashMap;
import java.util.Map;

public class LRUCache<K, V> extends LinkedHashMap<K, V> {

private final int capacity;

public LRUCache(int capacity) {
super(capacity, 0.75f, true);
this.capacity = capacity;
}

@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > capacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,23 @@
*/
package io.serverlessworkflow.impl.resources;

import io.serverlessworkflow.api.types.Endpoint;
import io.serverlessworkflow.api.types.ExternalResource;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import java.net.URI;
import java.util.function.Function;

public interface ResourceLoader {

StaticResource loadStatic(ExternalResource resource);
WorkflowValueResolver<URI> uriSupplier(Endpoint endpoint);

DynamicResource loadDynamic(
WorkflowContext context, ExternalResource resource, ExpressionFactory factory);
<T> T load(
ExternalResource resource,
Function<ExternalResourceHandler, T> function,
WorkflowContext workflowContext,
TaskContext taskContext,
WorkflowModel model);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package io.serverlessworkflow.impl.resources;

import io.serverlessworkflow.impl.WorkflowApplication;
import java.nio.file.Path;

public interface ResourceLoaderFactory {
ResourceLoader getResourceLoader(Path path);
ResourceLoader getResourceLoader(WorkflowApplication application, Path path);
}
Loading