Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extension/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins {
dependencies {
implementation(libs.edc.control.plane.spi)
implementation(libs.edc.http.spi)
implementation(libs.edc.participant.context.single.spi)
implementation(libs.edc.transaction.datasource.spi)

implementation(libs.edc.sql.lib)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.http.spi.EdcHttpClient;
import org.eclipse.edc.participantcontext.single.spi.SingleParticipantContextSupplier;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Requires;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.event.EventRouter;
Expand Down Expand Up @@ -113,9 +113,6 @@ public class LoggingHouseClientExtension implements ServiceExtension {
"ids", "https://w3id.org/idsa/core/",
"idsc", "https://w3id.org/idsa/code/");

@Setting(key = "edc.participant.id")
private String participantId;

@Inject
private Hostname hostname;
@Inject
Expand All @@ -128,20 +125,20 @@ public class LoggingHouseClientExtension implements ServiceExtension {
private IdentityService identityService;
@Inject
private RemoteMessageDispatcherRegistry dispatcherRegistry;

@Inject(required = false)
private DataSourceRegistry dataSourceRegistry;
@Inject(required = false)
private TransactionContext transactionContext;
@Inject(required = false)
private QueryExecutor queryExecutor;

@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessStore transferProcessStore;
@Inject
private AssetIndex assetIndex;
@Inject
private SingleParticipantContextSupplier singleParticipantContextSupplier;

private boolean enabled;
private URL loggingHouseLogUrl;
Expand All @@ -167,13 +164,16 @@ public void initialize(ServiceExtensionContext context) {

migrationManager = initFlyway(context);

var participantId = singleParticipantContextSupplier.get().orElseThrow(f -> new EdcException("cannot obtain participant context: " + f.getFailureDetail()))
.getParticipantContextId();

registerSerializerClearingHouseMessages(context);

var store = initializeLoggingHouseMessageStore(context, typeManager);
registerEventSubscriber(context, store);
registerEventSubscriber(context, store, participantId);

registerDispatcher(context);
workersManager = initializeWorkersManager(context, store);
workersManager = initializeWorkersManager(context, store, participantId);
}

@Override
Expand All @@ -197,6 +197,10 @@ public void start() {
// Sending a hello message to LoggingHouse
monitor.info("Sending Hello Message to LoggingHouse.");
var currentTime = System.currentTimeMillis();

var participantId = singleParticipantContextSupplier.get().orElseThrow(f -> new EdcException("cannot obtain participant context: " + f.getFailureDetail()))
.getParticipantContextId();

ConnectorAvailableEvent connectorAvailableEvent = new ConnectorAvailableEvent(
UUID.randomUUID().toString(),
participantId,
Expand Down Expand Up @@ -274,7 +278,7 @@ private LoggingHouseMessageStore initializeLoggingHouseMessageStore(ServiceExten
}
}

private void registerEventSubscriber(ServiceExtensionContext context, LoggingHouseMessageStore loggingHouseMessageStore) {
private void registerEventSubscriber(ServiceExtensionContext context, LoggingHouseMessageStore loggingHouseMessageStore, String participantId) {
monitor.debug("Registering event subscriber for LoggingHouseClientExtension");

var eventSubscriber = new LoggingHouseEventSubscriber(
Expand Down Expand Up @@ -323,7 +327,7 @@ private void registerCommonTypes(TypeManager typeManager) {
monitor.debug("Registered serializers for LoggingHouseClientExtension");
}

private LoggingHouseWorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store) {
private LoggingHouseWorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store, String participantId) {
var initialDelaySeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_DELAY_SETTING, LOGGINGHOUSE_EXTENSION_WORKERS_DELAY_DEFAULT);
var periodSeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD_SETTING, LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD_DEFAULT);
var executor = new WorkersExecutor(Duration.ofSeconds(periodSeconds), Duration.ofSeconds(initialDelaySeconds), monitor);
Expand All @@ -345,7 +349,7 @@ private void registerDispatcher(ServiceExtensionContext context) {
var logMessageSender = new LogMessageSender(monitor);
var createProcessMessageSender = new CreateProcessMessageSender();

var idsMultipartSender = new IdsMultipartSender(monitor, httpClient, identityService, objectMapper);
var idsMultipartSender = new IdsMultipartSender(monitor, httpClient, identityService, objectMapper, singleParticipantContextSupplier);
var dispatcher = new IdsMultipartClearingRemoteMessageDispatcher(idsMultipartSender);
dispatcher.register(logMessageSender);
dispatcher.register(createProcessMessageSender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import org.eclipse.edc.http.spi.EdcHttpClient;
import org.eclipse.edc.participantcontext.spi.service.ParticipantContextSupplier;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.iam.TokenParameters;
Expand Down Expand Up @@ -58,14 +59,16 @@ public class IdsMultipartSender {
private final EdcHttpClient httpClient;
private final IdentityService identityService;
private final ObjectMapper objectMapper;
private final ParticipantContextSupplier participantContextSupplier;

public IdsMultipartSender(Monitor monitor, EdcHttpClient httpClient,
IdentityService identityService,
ObjectMapper objectMapper) {
ObjectMapper objectMapper, ParticipantContextSupplier participantContextSupplier) {
this.monitor = monitor;
this.httpClient = httpClient;
this.identityService = identityService;
this.objectMapper = objectMapper;
this.participantContextSupplier = participantContextSupplier;
}

public <M extends RemoteMessage, R> CompletableFuture<StatusResult<R>> send(M request, MultipartSenderDelegate<M, R> senderDelegate) {
Expand Down Expand Up @@ -178,7 +181,13 @@ protected Result<DynamicAttributeToken> obtainDynamicAttributeToken(String recip
.claims(SCOPE_CLAIM, IdsConstants.TOKEN_SCOPE)
.claims(AUDIENCE_CLAIM, recipientAddress)
.build();
return identityService.obtainClientCredentials("ignored", tokenParameters)

var participantContextServiceResult = participantContextSupplier.get();
if (participantContextServiceResult.failed()) {
return Result.failure("Cannot obtain participant context");
}
var participantContextId = participantContextServiceResult.getContent().getParticipantContextId();
return identityService.obtainClientCredentials(participantContextId, tokenParameters)
.map(credentials -> new DynamicAttributeTokenBuilder()
._tokenFormat_(TokenFormat.JWT)
._tokenValue_(credentials.getToken())
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ edc-dataplane-base-bom = { module = "org.eclipse.edc:dataplane-base-bom", versio
edc-dataplane-feature-sql-bom = { module = "org.eclipse.edc:dataplane-feature-sql-bom", version.ref = "edc" }
edc-control-plane-spi = { module = "org.eclipse.edc:control-plane-spi", version.ref = "edc" }
edc-http-spi = { module = "org.eclipse.edc:http-spi", version.ref = "edc" }
edc-participant-context-single-spi = { module = "org.eclipse.edc:participant-context-single-spi", version.ref = "edc" }
edc-transaction-datasource-spi = { module = "org.eclipse.edc:transaction-datasource-spi", version.ref = "edc" }
edc-sql-lib = { module = "org.eclipse.edc:sql-lib", version.ref = "edc" }
edc-vault-hashicorp = { module = "org.eclipse.edc:vault-hashicorp", version.ref = "edc" }
Expand Down