Skip to content

Commit 73c0289

Browse files
committed
[Fix #889] Dynamic resource loading
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 4989238 commit 73c0289

File tree

26 files changed

+317
-130
lines changed

26 files changed

+317
-130
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import io.serverlessworkflow.impl.expressions.RuntimeDescriptor;
2929
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
3030
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
31+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
3132
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
32-
import io.serverlessworkflow.impl.resources.StaticResource;
3333
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
3434
import io.serverlessworkflow.impl.schema.SchemaValidator;
3535
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
@@ -124,7 +124,7 @@ public void validate(WorkflowModel node) {}
124124
};
125125

126126
@Override
127-
public SchemaValidator getValidator(StaticResource resource) {
127+
public SchemaValidator getValidator(ExternalResourceHandler resource) {
128128
return NoValidation;
129129
}
130130

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow)
7979
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, Path path) {
8080
WorkflowDefinition definition =
8181
new WorkflowDefinition(
82-
application, workflow, application.resourceLoaderFactory().getResourceLoader(path));
82+
application,
83+
workflow,
84+
application.resourceLoaderFactory().getResourceLoader(application, path));
8385
Schedule schedule = workflow.getSchedule();
8486
if (schedule != null) {
8587
ListenTo to = schedule.getOn();

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,12 @@ public static Optional<SchemaValidator> getSchemaValidator(
4848
return Optional.of(validatorFactory.getValidator(schema.getSchemaInline()));
4949
} else if (schema.getSchemaExternal() != null) {
5050
return Optional.of(
51-
validatorFactory.getValidator(
52-
resourceLoader.loadStatic(schema.getSchemaExternal().getResource())));
51+
resourceLoader.load(
52+
schema.getSchemaExternal().getResource(),
53+
validatorFactory::getValidator,
54+
null,
55+
null,
56+
null));
5357
}
5458
}
5559
return Optional.empty();

impl/core/src/main/java/io/serverlessworkflow/impl/resources/StaticResource.java renamed to impl/core/src/main/java/io/serverlessworkflow/impl/resources/CachedResource.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
*/
1616
package io.serverlessworkflow.impl.resources;
1717

18-
import java.io.InputStream;
18+
import java.time.Instant;
1919

20-
public interface StaticResource {
21-
InputStream open();
22-
23-
String name();
24-
}
20+
public record CachedResource<T>(Instant lastReload, T content) {}

impl/core/src/main/java/io/serverlessworkflow/impl/resources/ClasspathResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import java.io.InputStream;
1919

20-
public class ClasspathResource implements StaticResource {
20+
public class ClasspathResource implements ExternalResourceHandler {
2121

2222
private String path;
2323

impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoader.java

Lines changed: 96 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,67 @@
1919
import io.serverlessworkflow.api.types.EndpointUri;
2020
import io.serverlessworkflow.api.types.ExternalResource;
2121
import io.serverlessworkflow.api.types.UriTemplate;
22+
import io.serverlessworkflow.impl.TaskContext;
23+
import io.serverlessworkflow.impl.WorkflowApplication;
2224
import io.serverlessworkflow.impl.WorkflowContext;
23-
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
25+
import io.serverlessworkflow.impl.WorkflowModel;
26+
import io.serverlessworkflow.impl.WorkflowValueResolver;
27+
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
2428
import java.net.MalformedURLException;
2529
import java.net.URI;
2630
import java.nio.file.Path;
31+
import java.time.Instant;
32+
import java.util.Map;
2733
import java.util.Optional;
34+
import java.util.ServiceLoader;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
import java.util.concurrent.locks.Lock;
37+
import java.util.concurrent.locks.ReentrantLock;
38+
import java.util.function.Function;
2839

2940
public class DefaultResourceLoader implements ResourceLoader {
3041

3142
private final Optional<Path> workflowPath;
43+
private final WorkflowApplication application;
3244

33-
protected DefaultResourceLoader(Path workflowPath) {
34-
this.workflowPath = Optional.ofNullable(workflowPath);
35-
}
45+
private final AtomicReference<URITemplateResolver> templateResolver =
46+
new AtomicReference<URITemplateResolver>();
3647

37-
@Override
38-
public StaticResource loadStatic(ExternalResource resource) {
39-
return processEndpoint(resource.getEndpoint());
40-
}
48+
private Map<ExternalResourceHandler, CachedResource> resourceCache = new LRUCache<>(100);
49+
private Lock cacheLock = new ReentrantLock();
4150

42-
@Override
43-
public DynamicResource loadDynamic(
44-
WorkflowContext workflow, ExternalResource resource, ExpressionFactory factory) {
45-
throw new UnsupportedOperationException("Dynamic loading of resources is not suppported");
51+
protected DefaultResourceLoader(WorkflowApplication application, Path workflowPath) {
52+
this.application = application;
53+
this.workflowPath = Optional.ofNullable(workflowPath);
4654
}
4755

48-
private StaticResource buildFromString(String uri) {
49-
return fileResource(uri);
56+
private URITemplateResolver templateResolver() {
57+
URITemplateResolver result = templateResolver.get();
58+
if (result == null) {
59+
result =
60+
ServiceLoader.load(URITemplateResolver.class)
61+
.findFirst()
62+
.orElseThrow(
63+
() ->
64+
new IllegalStateException(
65+
"Need an uri template resolver to resolve uri template"));
66+
templateResolver.set(result);
67+
}
68+
return result;
5069
}
5170

52-
private StaticResource fileResource(String pathStr) {
71+
private ExternalResourceHandler fileResource(String pathStr) {
5372
Path path = Path.of(pathStr);
5473
if (path.isAbsolute()) {
5574
return new FileResource(path);
5675
} else {
5776
return workflowPath
58-
.<StaticResource>map(p -> new FileResource(p.resolve(path)))
77+
.<ExternalResourceHandler>map(p -> new FileResource(p.resolve(path)))
5978
.orElseGet(() -> new ClasspathResource(pathStr));
6079
}
6180
}
6281

63-
private StaticResource buildFromURI(URI uri) {
82+
private ExternalResourceHandler buildFromURI(URI uri) {
6483
String scheme = uri.getScheme();
6584
if (scheme == null || scheme.equalsIgnoreCase("file")) {
6685
return fileResource(uri.getPath());
@@ -75,31 +94,79 @@ private StaticResource buildFromURI(URI uri) {
7594
}
7695
}
7796

78-
private StaticResource processEndpoint(Endpoint endpoint) {
97+
@Override
98+
public <T> T load(
99+
ExternalResource resource,
100+
Function<ExternalResourceHandler, T> function,
101+
WorkflowContext workflowContext,
102+
TaskContext taskContext,
103+
WorkflowModel model) {
104+
ExternalResourceHandler resourceHandler =
105+
buildFromURI(
106+
uriSupplier(resource.getEndpoint())
107+
.apply(
108+
workflowContext,
109+
taskContext,
110+
model == null ? application.modelFactory().fromNull() : model));
111+
try {
112+
CachedResource<T> cachedResource;
113+
cacheLock.lock();
114+
cachedResource = resourceCache.get(resourceHandler);
115+
cacheLock.unlock();
116+
if (cachedResource == null || resourceHandler.shouldReload(cachedResource.lastReload())) {
117+
cachedResource = new CachedResource(Instant.now(), function.apply(resourceHandler));
118+
cacheLock.lock();
119+
resourceCache.put(resourceHandler, cachedResource);
120+
}
121+
return cachedResource.content();
122+
} finally {
123+
cacheLock.unlock();
124+
}
125+
}
126+
127+
@Override
128+
public WorkflowValueResolver<URI> uriSupplier(Endpoint endpoint) {
79129
if (endpoint.getEndpointConfiguration() != null) {
80130
EndpointUri uri = endpoint.getEndpointConfiguration().getUri();
81131
if (uri.getLiteralEndpointURI() != null) {
82-
return getURI(uri.getLiteralEndpointURI());
132+
return getURISupplier(uri.getLiteralEndpointURI());
83133
} else if (uri.getExpressionEndpointURI() != null) {
84-
throw new UnsupportedOperationException(
85-
"Expression not supported for loading a static resource");
134+
return new ExpressionURISupplier(
135+
application
136+
.expressionFactory()
137+
.resolveString(ExpressionDescriptor.from(uri.getExpressionEndpointURI())));
86138
}
87139
} else if (endpoint.getRuntimeExpression() != null) {
88-
throw new UnsupportedOperationException(
89-
"Expression not supported for loading a static resource");
140+
return new ExpressionURISupplier(
141+
application
142+
.expressionFactory()
143+
.resolveString(ExpressionDescriptor.from(endpoint.getRuntimeExpression())));
90144
} else if (endpoint.getUriTemplate() != null) {
91-
return getURI(endpoint.getUriTemplate());
145+
return getURISupplier(endpoint.getUriTemplate());
92146
}
93147
throw new IllegalArgumentException("Invalid endpoint definition " + endpoint);
94148
}
95149

96-
private StaticResource getURI(UriTemplate template) {
150+
private WorkflowValueResolver<URI> getURISupplier(UriTemplate template) {
97151
if (template.getLiteralUri() != null) {
98-
return buildFromURI(template.getLiteralUri());
152+
return (w, t, n) -> template.getLiteralUri();
99153
} else if (template.getLiteralUriTemplate() != null) {
100-
return buildFromString(template.getLiteralUriTemplate());
101-
} else {
102-
throw new IllegalStateException("Invalid endpoint definition" + template);
154+
return (w, t, n) ->
155+
templateResolver().resolveTemplates(template.getLiteralUriTemplate(), w, t, n);
156+
}
157+
throw new IllegalArgumentException("Invalid uritemplate definition " + template);
158+
}
159+
160+
private class ExpressionURISupplier implements WorkflowValueResolver<URI> {
161+
private WorkflowValueResolver<String> expr;
162+
163+
public ExpressionURISupplier(WorkflowValueResolver<String> expr) {
164+
this.expr = expr;
165+
}
166+
167+
@Override
168+
public URI apply(WorkflowContext workflow, TaskContext task, WorkflowModel node) {
169+
return URI.create(expr.apply(workflow, task, node));
103170
}
104171
}
105172
}

impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoaderFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.resources;
1717

18+
import io.serverlessworkflow.impl.WorkflowApplication;
1819
import java.nio.file.Path;
1920

2021
public class DefaultResourceLoaderFactory implements ResourceLoaderFactory {
@@ -28,7 +29,7 @@ public static final ResourceLoaderFactory get() {
2829
private DefaultResourceLoaderFactory() {}
2930

3031
@Override
31-
public ResourceLoader getResourceLoader(Path path) {
32-
return new DefaultResourceLoader(path);
32+
public ResourceLoader getResourceLoader(WorkflowApplication application, Path path) {
33+
return new DefaultResourceLoader(application, path);
3334
}
3435
}

impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/TargetSupplier.java renamed to impl/core/src/main/java/io/serverlessworkflow/impl/resources/ExternalResourceHandler.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,18 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.executors.http;
16+
package io.serverlessworkflow.impl.resources;
1717

18-
import io.serverlessworkflow.impl.TaskContext;
19-
import io.serverlessworkflow.impl.WorkflowContext;
20-
import io.serverlessworkflow.impl.WorkflowModel;
21-
import jakarta.ws.rs.client.WebTarget;
18+
import java.io.InputStream;
19+
import java.time.Instant;
2220

23-
public interface TargetSupplier {
24-
WebTarget apply(WorkflowContext workflow, TaskContext task, WorkflowModel node);
21+
public interface ExternalResourceHandler {
22+
23+
String name();
24+
25+
InputStream open();
26+
27+
default boolean shouldReload(Instant lasUpdate) {
28+
return false;
29+
}
2530
}

impl/core/src/main/java/io/serverlessworkflow/impl/resources/FileResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.nio.file.Files;
2222
import java.nio.file.Path;
2323

24-
class FileResource implements StaticResource {
24+
class FileResource implements ExternalResourceHandler {
2525

2626
private Path path;
2727

impl/core/src/main/java/io/serverlessworkflow/impl/resources/HttpResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.io.UncheckedIOException;
2121
import java.net.URL;
2222

23-
public class HttpResource implements StaticResource {
23+
public class HttpResource implements ExternalResourceHandler {
2424

2525
private URL url;
2626

0 commit comments

Comments
 (0)