Chapter 15b: Focused Example - TransferFunds
This focused example demonstrates Aspects for cross-cutting concerns in a financial domain with audit requirements.
Requirements
Business Goal: Transfer funds between accounts with full audit trail.
Flow:
- Validate transfer request
- Verify source account has sufficient balance
- Execute the transfer (debit source, credit destination)
- Record audit log
Regulatory Requirements:
- Every operation must be logged with timestamp, actor, and outcome
- Failed operations must also be logged (not just successes)
- Audit logging must not fail the transfer (best-effort)
- All transfers above threshold require additional authorization
Technical Requirements:
- Idempotency - same request ID must not execute twice
- Timeout - external calls must timeout after 5 seconds
- Retry - transient failures should retry up to 3 times
Domain Model
public record AccountId(String value) { /* factory omitted */ }
public record TransferId(String value) { /* factory omitted */ }
public record Money(BigDecimal value) {
public boolean isGreaterThan(Money other) {
return this.value.compareTo(other.value) > 0;
}
public Money subtract(Money other) {
return new Money(this.value.subtract(other.value));
}
public Money add(Money other) {
return new Money(this.value.add(other.value));
}
}
public record TransferRequest(
String requestId,
String sourceAccountId,
String destinationAccountId,
String amount,
String initiatedBy
) {}
public record TransferResult(
TransferId transferId,
Money amount,
Instant completedAt
) {}
Error Types
public sealed interface TransferError extends Cause {
enum General implements TransferError {
INSUFFICIENT_FUNDS("Insufficient funds in source account"),
SAME_ACCOUNT("Cannot transfer to same account"),
DUPLICATE_REQUEST("Transfer request already processed"),
ACCOUNT_NOT_FOUND("Account not found"),
TRANSFER_FAILED("Transfer execution failed");
private final String message;
General(String message) {
this.message = message;
}
@Override
public String message() {
return message;
}
}
record AuthorizationRequired(Money amount, Money threshold) implements TransferError {
@Override
public String message() {
return String.format("Transfer of %s requires authorization (threshold: %s)",
amount.value(), threshold.value());
}
}
}
Validated Request
public record ValidTransfer(
String requestId,
AccountId sourceAccount,
AccountId destinationAccount,
Money amount,
String initiatedBy
) {
public static Result<ValidTransfer> validTransfer(TransferRequest raw) {
return Result.all(
validateRequestId(raw.requestId()),
AccountId.accountId(raw.sourceAccountId()),
AccountId.accountId(raw.destinationAccountId()),
Money.money(raw.amount()),
validateInitiator(raw.initiatedBy())
).flatMap(ValidTransfer::validateNotSameAccount);
}
private static Result<ValidTransfer> validateNotSameAccount(
String requestId,
AccountId source,
AccountId destination,
Money amount,
String initiatedBy
) {
if (source.value().equals(destination.value())) {
return TransferError.General.SAME_ACCOUNT.result();
}
return Result.success(new ValidTransfer(requestId, source, destination, amount, initiatedBy));
}
private static Result<String> validateRequestId(String requestId) {
return Verify.ensure(requestId, Verify.Is::notBlank)
.mapError(_ -> Causes.cause("Request ID is required"));
}
private static Result<String> validateInitiator(String initiatedBy) {
return Verify.ensure(initiatedBy, Verify.Is::notBlank)
.mapError(_ -> Causes.cause("Initiator is required"));
}
}
Use Case with Aspects
public interface TransferFunds {
Promise<TransferResult> execute(TransferRequest request);
// Step interfaces
interface CheckIdempotency {
Promise<ValidTransfer> apply(ValidTransfer transfer);
}
interface CheckBalance {
Promise<ValidTransfer> apply(ValidTransfer transfer);
}
interface ExecuteTransfer {
Promise<TransferResult> apply(ValidTransfer transfer);
}
interface AuditLog {
Promise<Unit> apply(AuditEntry entry);
}
// Factory with aspect composition
static TransferFunds transferFunds(
CheckIdempotency checkIdempotency,
CheckBalance checkBalance,
ExecuteTransfer executeTransfer,
AuditLog auditLog,
TimeSpan timeout,
RetryPolicy retryPolicy
) {
// Wrap executeTransfer with aspects
var decoratedExecute = withRetry(retryPolicy,
withTimeout(timeout,
withAudit(auditLog, executeTransfer)
)
);
return request -> ValidTransfer.validTransfer(request)
.async()
.flatMap(checkIdempotency::apply)
.flatMap(checkBalance::apply)
.flatMap(decoratedExecute);
}
// ASPECT: Timeout
private static Fn1<ValidTransfer, Promise<TransferResult>> withTimeout(
TimeSpan timeout,
Fn1<ValidTransfer, Promise<TransferResult>> step
) {
return transfer -> step.apply(transfer).timeout(timeout);
}
// ASPECT: Retry
private static Fn1<ValidTransfer, Promise<TransferResult>> withRetry(
RetryPolicy policy,
Fn1<ValidTransfer, Promise<TransferResult>> step
) {
return transfer -> retryWithPolicy(policy, () -> step.apply(transfer));
}
private static Promise<TransferResult> retryWithPolicy(
RetryPolicy policy,
Supplier<Promise<TransferResult>> operation
) {
return operation.get()
.recover(cause -> {
if (policy.shouldRetry(cause) && policy.attemptsRemaining() > 0) {
return retryWithPolicy(policy.decrementAttempts(), operation);
}
return cause.promise();
});
}
// ASPECT: Audit (wraps execution with before/after logging)
private static Fn1<ValidTransfer, Promise<TransferResult>> withAudit(
AuditLog auditLog,
Fn1<ValidTransfer, Promise<TransferResult>> step
) {
return transfer -> {
var startTime = Instant.now();
return step.apply(transfer)
.onResult(result -> logAuditEntry(auditLog, transfer, result, startTime));
};
}
private static void logAuditEntry(
AuditLog auditLog,
ValidTransfer transfer,
Result<TransferResult> result,
Instant startTime
) {
var entry = new AuditEntry(
transfer.requestId(),
"TRANSFER",
transfer.initiatedBy(),
startTime,
Instant.now(),
result.isSuccess() ? "SUCCESS" : "FAILURE",
result.fold(Cause::message, r -> "Transfer completed: " + r.transferId().value())
);
// Best-effort: log but don't fail if audit fails
auditLog.apply(entry)
.onFailure(e -> { /* log to fallback */ });
}
}
Supporting Types
AuditEntry
public record AuditEntry(
String requestId,
String operation,
String initiatedBy,
Instant startedAt,
Instant completedAt,
String outcome,
String details
) {}
RetryPolicy
public record RetryPolicy(
int maxAttempts,
int attemptsRemaining,
Set<Class<? extends Cause>> retryableErrors
) {
public static RetryPolicy retryPolicy(int maxAttempts, Set<Class<? extends Cause>> retryableErrors) {
return new RetryPolicy(maxAttempts, maxAttempts, retryableErrors);
}
public boolean shouldRetry(Cause cause) {
return retryableErrors.stream()
.anyMatch(errorClass -> errorClass.isInstance(cause));
}
public RetryPolicy decrementAttempts() {
return new RetryPolicy(maxAttempts, attemptsRemaining - 1, retryableErrors);
}
}
Step Implementations
CheckIdempotency
public class IdempotencyChecker implements TransferFunds.CheckIdempotency {
private final IdempotencyStore store;
public IdempotencyChecker(IdempotencyStore store) {
this.store = store;
}
@Override
public Promise<ValidTransfer> apply(ValidTransfer transfer) {
return Promise.lift(
_ -> TransferError.General.TRANSFER_FAILED,
() -> store.exists(transfer.requestId())
).flatMap(exists -> exists
? TransferError.General.DUPLICATE_REQUEST.promise()
: Promise.success(transfer));
}
}
CheckBalance
public class BalanceChecker implements TransferFunds.CheckBalance {
private final AccountRepository accounts;
public BalanceChecker(AccountRepository accounts) {
this.accounts = accounts;
}
@Override
public Promise<ValidTransfer> apply(ValidTransfer transfer) {
return accounts.getBalance(transfer.sourceAccount())
.flatMap(balance -> balance.isGreaterThan(transfer.amount())
? Promise.success(transfer)
: TransferError.General.INSUFFICIENT_FUNDS.promise());
}
}
ExecuteTransfer
public class TransferExecutor implements TransferFunds.ExecuteTransfer {
private final AccountRepository accounts;
private final IdempotencyStore idempotencyStore;
@Override
public Promise<TransferResult> apply(ValidTransfer transfer) {
return Promise.lift(
_ -> TransferError.General.TRANSFER_FAILED,
() -> executeInTransaction(transfer)
);
}
private TransferResult executeInTransaction(ValidTransfer transfer) {
// Transaction boundary
accounts.debit(transfer.sourceAccount(), transfer.amount());
accounts.credit(transfer.destinationAccount(), transfer.amount());
idempotencyStore.record(transfer.requestId());
return new TransferResult(
new TransferId(UUID.randomUUID().toString()),
transfer.amount(),
Instant.now()
);
}
}
Key Points: Aspects Pattern
What Aspects Are
Aspects are higher-order functions that wrap step execution with cross-cutting concerns. They form a composition chain:
Request -> [Retry -> [Timeout -> [Audit -> Execute]]]
Aspect Composition Order
The order matters. Aspects are applied outside-in:
var decorated = withRetry(policy, // 1. Outer: retry wraps everything
withTimeout(timeout, // 2. Middle: timeout per attempt
withAudit(auditLog, execute) // 3. Inner: audit each attempt
)
);
This means:
- Retry wraps timeout (each retry attempt has its own timeout)
- Timeout wraps audit (audit happens within timeout)
- Audit wraps execute (execution is audited)
Standard Composition Order
For most use cases:
- Metrics/Logging (outermost) - Observe everything
- Timeout - Limit total time
- Circuit Breaker - Fail fast if service unhealthy
- Retry - Retry transient failures
- Rate Limit - Control request rate
- Business Logic (innermost) - Actual operation
Aspect Independence
Each aspect is independent and testable:
@Test
void withTimeout_fails_whenTimeoutExceeds() {
var slowStep = transfer -> Promise.<TransferResult>promise()
.timeout(TimeSpan.timeSpan(100).millis()); // Never resolves
var decorated = withTimeout(TimeSpan.timeSpan(50).millis(), slowStep);
decorated.apply(validTransfer())
.await()
.onSuccess(Assertions::fail); // Should timeout
}
Testing
class TransferFundsTest {
@Nested
class Aspects {
@Test
void execute_retriesOnTransientFailure() {
var attempts = new AtomicInteger(0);
TransferFunds.ExecuteTransfer failingThenSucceeding = transfer -> {
if (attempts.incrementAndGet() < 3) {
return new TransientError().promise();
}
return Promise.success(new TransferResult(
new TransferId("tx-123"),
transfer.amount(),
Instant.now()
));
};
var policy = RetryPolicy.retryPolicy(3, Set.of(TransientError.class));
var useCase = TransferFunds.transferFunds(
t -> Promise.success(t),
t -> Promise.success(t),
failingThenSucceeding,
e -> Promise.success(Unit.INSTANCE),
TimeSpan.timeSpan(5).seconds(),
policy
);
useCase.execute(validRequest())
.await()
.onFailure(Assertions::fail)
.onSuccess(result -> assertEquals(3, attempts.get()));
}
@Test
void execute_auditsSuccessfulTransfer() {
var auditedEntries = new ArrayList<AuditEntry>();
TransferFunds.AuditLog auditLog = entry -> {
auditedEntries.add(entry);
return Promise.success(Unit.INSTANCE);
};
var useCase = TransferFunds.transferFunds(
t -> Promise.success(t),
t -> Promise.success(t),
t -> Promise.success(new TransferResult(new TransferId("tx-123"), t.amount(), Instant.now())),
auditLog,
TimeSpan.timeSpan(5).seconds(),
RetryPolicy.retryPolicy(0, Set.of())
);
useCase.execute(validRequest())
.await()
.onFailure(Assertions::fail);
assertEquals(1, auditedEntries.size());
assertEquals("SUCCESS", auditedEntries.get(0).outcome());
}
@Test
void execute_auditsFailedTransfer() {
var auditedEntries = new ArrayList<AuditEntry>();
TransferFunds.AuditLog auditLog = entry -> {
auditedEntries.add(entry);
return Promise.success(Unit.INSTANCE);
};
var useCase = TransferFunds.transferFunds(
t -> Promise.success(t),
t -> TransferError.General.INSUFFICIENT_FUNDS.promise(),
t -> Promise.success(new TransferResult(new TransferId("tx-123"), t.amount(), Instant.now())),
auditLog,
TimeSpan.timeSpan(5).seconds(),
RetryPolicy.retryPolicy(0, Set.of())
);
useCase.execute(validRequest())
.await()
.onSuccess(Assertions::fail);
// Audit still captured even though transfer failed
assertEquals(1, auditedEntries.size());
assertEquals("FAILURE", auditedEntries.get(0).outcome());
}
@Test
void execute_succeedsEvenIfAuditFails() {
TransferFunds.AuditLog failingAudit = entry ->
Causes.cause("Audit service unavailable").promise();
var useCase = TransferFunds.transferFunds(
t -> Promise.success(t),
t -> Promise.success(t),
t -> Promise.success(new TransferResult(new TransferId("tx-123"), t.amount(), Instant.now())),
failingAudit,
TimeSpan.timeSpan(5).seconds(),
RetryPolicy.retryPolicy(0, Set.of())
);
// Transfer should succeed even if audit fails
useCase.execute(validRequest())
.await()
.onFailure(Assertions::fail);
}
}
private TransferRequest validRequest() {
return new TransferRequest(
"req-123",
"acc-source",
"acc-dest",
"100.00",
"[email protected]"
);
}
static class TransientError implements Cause {
@Override
public String message() {
return "Transient error";
}
}
}
Exercises
-
Add circuit breaker: Implement a circuit breaker aspect that fails fast after N consecutive failures.
-
Add rate limiting: Implement a rate limit aspect that rejects requests exceeding N per second.
-
Add metrics: Create a metrics aspect that records duration and success/failure counts.
-
Conditional aspects: Modify the factory to only apply audit logging for transfers above a threshold amount.
Summary
TransferFunds demonstrates:
- Aspects as higher-order functions - Wrap step execution with cross-cutting concerns
- Composition order matters - Outside-in application (retry wraps timeout wraps audit)
- Independent and testable - Each aspect tested in isolation
- Best-effort operations - Audit failure doesnโt fail the transfer
- Idempotency pattern - Prevent duplicate processing
- Retry with policy - Configurable retry behavior based on error type