도메인 주도 개발 스터디

Chapter 8 애그리거트 트랜잭션 관리

막이86 2023. 11. 8. 17:42
728x90
create table locks (
	'type' varchar(255),
	id varchar(255),
	lockid varchar(255),
	expiration_time datetime,
	primary key ('type', id)
) character set utf8;

create unique index locks_idx ON locks (lockid);

// Order 타입의 1번 식별자를 갖는 애그리거트에 대한 잠금 쿼리
insert into locks values ('Order', '1', '생성한 lockid', '2016-03-28 09:10:00');

도메인 주도 개발 시작하기 8장을 요약한 내용입니다.

8.1 애그리거트와 트랜잭션

  • 주문 애그리거트에 대해 운영자는 배송 상태로 변경할 때 고객은 배송지 주소를 변경하면 어떻게 될까?
    • 운영자와 고객이 동시에 주문 애그리거트를 수정하는 과정
  • 운영자 스레드와 고객 스래드는 개념적으로 동일한 애그리거트지만 물리적으로 서로 다른 애그리거트 객채를 사용
    • 운영자 스레드가 주문 애그리거트 객체를 변경하더라도 고객 스레드가 사용하는 주문 애그리거트 객체에는 영향을 주지 않는다.
    • 고객 입장에서는 주문 애그리거트 객체는 배송 상태 전이므로 배송지 정보를 변경할 수 있다.
  • 두 스레드에서 각각 트랜잭션을 커밋할 때 수정된 내용을 DB에 반영
    • 배송 상태 변경 및 배송지 정보도 변경
    • 애그리거트의 일관성이 깨지는 상황이 발생
  • 일관성이 깨지는 문제가 발생하지 않도록 하려면 두가지중 하나를 해야함
    • 운영자가 배송지 정보를 조회하고 상태를 변경하는 동안, 고객이 애그리거트를 수정하지 못하게 막는다.
    • 운영자가 배송지 정보를 조회한 이후에 고객이 정보를 변경하면, 운영자가 애그리거트를 다시 조회한 뒤 수정하도록 한다.
  • DBMS가 지원하는 트랜잭션과 함께 애그리거트를 위한 추가적인 트랜잭션 처리 기법이 필요
  • 애그리거트에 대해 사용할 수 있는 대표적인 트랜잭션 처리 방식에는 선점 잠금과 비선점 잠금의 두가지 방식이 있다.

8.2 선점 잠금

  • 먼저 애그리거트를 구한 스레드가 애그리거트 사용이 끝날 때까지 다른 스레드가 해당 애그리거트를 수정하지 못하게 막는 방식
    • 스레드1이 선점 잠금 방식으로 애그리거트를 구한뒤 스레드2가 같은 애그리거트를 구하고 있다.
    • 스레드2는 스레드1이 애그리거트에 대한 잠금을 해제할 때까지 블로킹 된다.
    • 스레드1이 애그리거트를 수정하고 트랜잭션을 커밋하면 잠금을 해제한다.
    • 대시하고 있던 스레드2가 애그리거트에 접근하게 된다.
    • 스레드1이 커밋한 뒤에 스래드2가 애그리거틀르 구하게 되므로 스레드2는 스레드1이 수정한 내용을 보게 된다.
  • 한 스레드가 애그리거트를 구하고 수정하는 동안 다른 스레드가 수정할 수 없으므로 도시에 애그리거트를 수정할 때 발생하는 데이터 충돌 문제를 해소할 수 있다.
  • 선점 잠금은 보통 DBMS가 제공하는 행단위 잠금을 사용해서 구현
  • DBMS가 for update와 같은 쿼리를 사용해서 특정 레코드에 한 커넥션만 접근할 수 있는 잠금장치를 제공
  • JPA EntityManager는 LockModeType을 인자로 받는 find() 메서드를 제공
    • LockModeType.PESSIMISTIC_WRITE를 값으로 전달하면 해당 엔티티와 매핑된 테이블을 이용해서 선점 잠금 방식을 적용할 수 있다.
    Order order = entityManger.find(Order.class, orderNo, LockModeType.PESSIMISTIC_WRITE);
    
  • JPA 프로바이터와 DBMS에 따라 잠금 모드 구현이 다르다.
    • 하이버네이트의 경우 PESSIMISTIC_WRITE를 잠금모드로 상용하면 for update 쿼리를 이용해서 선점 잠금을 구현
    • 스프링 데이터 JPA는 @Lock 애너테이션을 사용해서 잠금 모드를 지정
    public interface MemberRepository extends Repository<Member, MemberId> {
    	@Lock(LockModeType.PESSIMISTIC_WRITE)
    	@Query("select m from Member m where m.id = :id")
    	Optional<Member> findByIdForUpdate(@Param("id") MemberId memberId);
    }
    

8.2.1 선점 잠금과 교착 상태

  • 선점 잠금 기능을 사용할 때는 잠금 순서에 따른 교착 상태가 발생하지 않도록 주의
    1. 스레드1: A애그리거트에 대한 선점 잠금 구함
    2. 스레드2: B애그리거트에 대한 선점 잠금 구함
    3. 스레드1: B애그리거트에 대한 선점 잠금 시도
    4. 스레드2: A애그리거트에 대한 선점 잠금 시도
  • 스레드1은 영원히 B애그리거트에 대한 선점 잠금을 구할 수 없다.
    • 스레드2가 B애그리거트에 대한 잠금을 이미 선점하고 있기 때문
  • 두 스레드는 먼저 선점한 잠금을 구할 수 없어 더 이상 다음 단계를 진행하지 못하게 된다.
    • 스레드1, 스레드2는 교착 상태에 빠진다
  • 선점 잠금에 따른 교착 상태는 상대적으로 사용자 수가 많을 때 발생할 가능성이 높다
    • 사용자 수가 많아지면 교착 상태에 빠지는 스레드는 더 빠르게 증가
  • 잠금을 구할 때 최대 대기 시간을 지정해야 한다.
    • JPA에서 선점 잠금을 시도할 때 최대 대기 시간을 지정
    • JPA 힌트는 잠금을 구하는 대기 시간을 밀리초 단위로 지정
      • 지정한 시간 이내에 잠금을 구하지 못하면 익셉션을 발생
    Map<String, Object> hits = new HashMap<>();
    hints.put("javax.persistence.lock.timeout", 2000);
    Order order = entityManger.find(Order.class, orderNo, LockModeType.PESSIMISTIC_WRITE, hints);
    
  • JPA의 힌트를 사용할 때는 사용 중인 DBMS가 관련 기능을 지원하는지 확인 해야함
    • 지원하지 않을 경우 힌트가 적용되지 않을 수 있음
  • 스프링 데이터 JPA는 @QueryHints 애너테이션을 사용해서 쿼리 힌트를 지정할 수 있다.
  • public interface MemberRepository extends Repository<Member, MemberId> { @Lock(LockModeType.PESSIMISTIC_WRITE) @QueryHints({ @QueryHint(name = "javax.persistence.lock.timeout", value = "2000") }) @Query("select m from Member m where m.id = :id") Optional<Member> findByIdForUpdate(@Param("id") MemberId memberId); }

<aside> 👉 DBMS에 따라 교착 상태에 빠진 커넥션을 처리하는 방식이 다르다. 쿼리별로 대기 시간을 저장할 수 있는 DBMS가 있고 커넥션 단위로만 대기 시간을 지정할 수 있는 DBMS도 있다. 선점 잠근을 사용하려면 DBMS에 대해 JPA가 어떤식으로 대기 시간을 처리하는지 반드시 확인해야 한다.

</aside>

8.3 비선점 잠금

  • 선점 잠금으로 모든 트랜잭션 충돌 문제가 해결되는 것은 아니다.
    1. 운영자는 배송을 위해 주문 정보를 조회
    2. 고객이 배송지 변경을 위해 변경 폼을 요청
    3. 고객이 새로운 배송지를 입력하고 폼을 전송하여 배송지를 변경
    4. 운영자가 1번에서 조회한 주문 정보를 기준으로 배송지를 정하고 배송 상태 변경을 요청
    • 배송 상태 변경전에 배송지를 한번더 확인하지 않으면 운영자는 다른 배송지로 물건을 발송하게 된다.
    <aside> ❓ 이 문제는 잘못 구현한거 아닌가?? 상태를 변경하기 위해서 다시 조회하면 이런 문제는 발생하지 않지 않나???
  • </aside>
  • 위에 문제는 선점 잠금 방식으로는 해결할 수 없다. 비선점 잠금 방식으로 해결 가능
    • 비선점 잠금 방식은 동시에 접근하는 것을 막는 대신 변경한 데이터를 실제 DBMS에 반영하는 시점에 변경 가능 여부를 확인 하는 방식
  • 비선점 잠금을 구현하려면 애그리거트에 버전으로 사용할 숫자 타입 프로퍼티를 추가해야 한다.
    • 애그리거트를 수정할 때마다 버전으로 사용할 프로퍼티 값이 1씩 증가
    • 수정할 애그리거트와 매핑되는 테이블의 버전 값이 현재 애그리거트의 버전과 동일한 경우에만 수정
  • JPA는 버전을 이용한 비선점 잠금 기능을 지원
    • @Version 애너테이션을 붙이고 매핑되는 테이블에 버전을 저장할 칼럼을 추가하면 된다.
    • 엔티티가 변경되어 Update 쿼리를 실행행 할때 version에 명시한 필드를 이용해서 비선점 잠금 쿼리를 실행
    @Entity
    @Table(name = "purchase_order")
    @Access(AccessType.FIELD)
    public class Order {
    	@EmbeddedId
    	private OrderNo number;
    	
    	@Version
    	private long version;
    }
    
  • 응용 서비스는 버전에 대해 알 필요가 없다.
    • 리포지터리에서 필요한 애그리거트를 구하고 알맞은 기능만 실행하면 된다.
    • JPA는 트랜잭션 종료 시점에 비선점 잠금을 위한 쿼리를 실행
    public class ChangeShippingService {
    	@Transactional
    	public void changeShipping(ChangeShippingRequest changeReq) {
    		Order order = orderRepository.findById(new OrderNo(changeReq.getNumber()));
    		checkNoOrder(order);
    		order.changeShippingInfo(changeReq.getShippingInfo());
    	}
    }
    
  • 스프링의 @Transactional을 이용해서 트랜잭션 범위를 정했으므로 changeShipping() 메서드가 리턴될 때 트랜잭션이 종료되고, 트랜잭션 충돌이 발생하면 OptimisticLockingFailureException이 발생
  • @Controller public class OrderController { private ChangeShippingService changeShippingService; @PostMapping("/changeShipping") public String changeShipping(ChangeShippingRequest changeReq) { try { changeShippingService.changeShipping(changeReq); return "changeShippingSuccess"; } catch (OptimisticLockingFailureException ex) { // 누군가 먼저 같은 주문 애그리거트를 수정했으므로 트랜잭션이 충돌 return "changeShippinTxConflict"; } } }
  • 비선점 잠금을 사용하여 문제 상황을 해결 할 수 있다.
    • 애그리거트 조회시 버전 값도 같이 읽어 온다.
    • 애그리거트가 수정되면서 버전이 변경됨
    • 조회한 버전 값과 DB의 버전값이 다르면 오류 발생
  • 비선점 잠금 방식을 여러 트랜잭션으로 확장하려면 애그리거트 정보를 뷰로 보여줄 때 버전 정보도 함께 사용자 화면에 전달 해야한다.
    • HTML 폼을 생성하는 경우 버전 값을 갖는 hidden 타입 <input> 태그를 생성해서 폼 전송시 버전 값이 서버에 함께 전달되도록 한다.
    <!-- 애그리거트 정보를 보여줄 때 뷰 코드는 버전 값을 함께 전송 -->
    <form th:action="@{/startShipping}" method="post">
    	<input type="hidden" name="version" th:value="${orderDto.version}">
    	<input type="hidden" name="orderNumber" th:value="${orderDto.orderNumber}">
    	<input type="summit" value="배송 상태로 변경하기">
    </form>
    
  • 응용 서비스에 전달할 요청 데이터는 사용자가 전송한 버전 값을 포함
  • public class StartShippingRequest { private String orderNumber; private long version; }
  • 응용 서비스는 전달 받은 버전 값을 이용해서 애그리거트 버전과 일치하는지 확인하고 기능을 수행
  • public class StartShippingService { @PreAuthorize("hasRole('ADMIN')") @Transactional public void startShipping(StartShippingRequest req) { Order order = orderRepository.findById(new OrderNo(req.getOrderNumber())); checkOrder(order); // 애그리거트의 버전과 요청 버전이 맞는지 확인 if (!order.matchVersion(req.getVersion())) { throw new VersionConflicException(); } order.startShipping(); } }
  • 표현 계층은 버전 충돌 익셉션이 발생하면 버전 충돌을 사용자에게 알려 사용자가 알맞은 후촉 처리를 할수 있도록 한다.
    • 스프링 프레임워크가 발생시키는 OptimisticLockingFailureException과 응용 서비스 코드에서 발생시키는 VersionConflictException를 처리
    @Controller 
    public class OrderAdminController {
    	private StartShippingService startShippingService;
    
    	@PostMapping("/startShipping")
    	public String startShipping(StartShippingRequest startReq) {
    		try {
    		  startShippingService.startShipping(startReq);
    			return "shippingStarted";
        } catch (OptimisticLockingFailureException | VersionConflicException ex) {
    			return "startShippingTxConflict
    		}
    	}
    }
    

8.3.1 강제 버전 증가

  • 애그리거으에 애그리거트 루트외에 다른 엔티티가 존재할때 루트가 아닌 다른 엔티티의 값만 변경될 경우 JPA는 루트 엔티티의 버전값을 증가시키지 않는다.
    • 연관된 엔티티의 값이 변경된다고 해도 루트 엔티티 자체의 값은 바뀌는 것이 없으므로 루트 엔티티의 버전 값은 갱신하지 않는다.
  • 애그리거트 관전에서는 문제가 됨
    • 애그리거트 구성 요소중 일부 값이 바뀌면 논리적으로 애그리거트는 바뀐 것
  • JPA는 이런 문제를 처리할 수 있도록 EntityManager#find() 메서드로 엔티티를 구할 때 강제로 버전 값을 증가시키는 잠금 모드를 지원
    • LockModeType.OPTIMISTIC_FORCE_INCREMENTS를 사용하면 해당 엔티티의 상태가 변경되었는지에 상관없이 트랜잭션 종료 시점에 버전 값을 증가 처리
    @Repository
    public class JpaOrderRepository implements OrderRepository {
    	@PersistenceContext
    	private EntityManger entityManger;
    
    	@Override
    	public Order findByIdOptimisticLockMode(OrderNo id) {
    		return entityManger.find(Order.class, id, LockModeType.OPTIMISTIC_FORCE_INCREMENT);
    	}
    }
    
  • 스프링 데이터 JPA를 사용하면 @Lock 애너테이션을 이용해서 지정하면 된다.

<aside> ❓ 스프링 데이터 JPA를 사용하면 강제로 버전을 해주고 있다는건가???

</aside>

8.4 오프라인 선점 잠금

  • 컨플루언스는 문서를 편집할 때 누군가 편집을 하는 중이면 다른 사용자가 편집중이라는 안내 문구를 보여준다.
    • 여러 사용자가 동시에 한 문서를 수정할 때 발생하는 충동을 사전에 방지 할 수 있다.
  • 엄격하게 데이터 충돌을 막고 싶다면 누군가 수정 화면을 보고 있을 때 수정 화면 자체를 실행하지 못하게 하도록 한다.
    • 한 트랜잭션 범위에서만 적용되는 선점 잠금 방식이나 나중에 버전 충돌을 확인하는 비선점 잠금 방식으로는 이를 구현할 수 없다.
    • 오프라인 선점 잠금 방식으로 해결 할 수 있다.
  • 오프라인 선점 잠금 방식은 여러 트랜잭션에 걸쳐 동시에 변경을 막는다.
    • 첫 번째 트랙잭션을 시작할 때 오프라인 잠금을 선점하고, 마지막 트랜잭션에서 잠금을 해제 한다.
    • 잠금을 해제하기 전까지 다른 사용자는 잠금을 구할 수 없다.
  • 오프라인 선점 방식은 잠금 유효 시간을 가져야 한다.
    • 사용자가 잠금을 해제하지 않고 프로그램을 종료하는 경우 영원히 잠금이 된다.
    • 유효 시간이 지나면 자동으로 잠금을 해제해서 다른 사용자가 잠금을 일정 시간 후에 다시 구할 수 있도록
  • 사용자가 잠금 유효 시간이 지난 후 수정 요청을 하면 수정요청이 실패하게 된다.
    • 일정 주기로 유효시간을 증가시키는 방식을 사용
    • 1분 단위로 Ajax 호출을 해서 잠금 유효시간을 증가시키는 방법 등

8.4.1 오프라인 선점 잠금을 위한 LockManager 인터페이스와 관련 클래스

  • 오프라인 선점 잠금은 크게 잠금 선점 시도, 잠금 확인, 잠금 해제, 잠금 유효시간 연장 네 가지 기능이 필요
  • public interface LockManager { LockId tryLock(String type, String id) throws LockException; void checkLock(LockId lockId) throws LockException; void releaseLock(LockId lockId) throws LockExcepiton; void extendLockExpiration(LockId lockId, long inc) throws LockException; }
  • tryLock: 잠금 선점 시도
    • 잠금을 식별할 때 사용할 LockId를 리턴
    • 잠금이 유효한지 검사허거나 잠금 유효시간을 늘릴때 LockId를 사용
    public class LockId {
    	private String value;
    
    	public LockId(String value) {
    		this.value = value;
    	}
    
    	public String getValue() {
    		return value;
    	}
    }
    
  • 컨트롤러가 오프라인 선점 잠금 기능을 이용해 데이터 수정폼에 동시에 접근하는 것을 제어하는 코드
    • 잠금을 선점하는 데 실패하면 LockException이 발생
    public DataNadLockId getDataWithLock(Long id) {
    	// 1. 오프라인 선점 잠금 시도
    	LockId lockId = lockManager.tryLock("data", id);
    	// 2. 기능 실행 
    	Data data = someDao.select(id);
    	return new DataAndLockId(data, lockId);	
    }
    
    @RequestMapping("/some/edit/{id}")
    public String editForm(@ParthVariable("id") Long id, ModelMap model) {
    	DataAndLockId dl = dataService.getDataWithLock(id);
    	model.addAttribute("data", dl.getData());
    	// 3. 잠금 해제에 사용할 LockId를 모델에 추가 
    	model.addAttribute("lockId", dl.getLockId());
    	return "editForm"
    }
    
    <form th:action="@{/some/edit/{id}{id=${data.id}}}" method="post">
    	<input type="hidden" name="lid" th:value="${lockId.value}">
    </form>
    
  • 잠금을 해제하는 코드
  • public void edit(EditRequest editReq, LockId lockId) { // 1. 잠금 선점 확인 lockManager.checkLock(lockId); // 2. 기능 실행 ... // 3. 잠금 해제 lockManager.releaseLock(lockId); } @RequestMapping(value = "/some/eidt/{id}", method = RequestMethod.POST) public String edit(@PathVariable("id") Long id, @ModelAttribute("editReq") EditRequest editReq, @RequestParam("id") String lockIdValue) { editReq.setId(id); someEditService.edit(editReq, new LockId(lockIdValue)); model.addAttribute("data", data); return "editSuccess"; }

8.4.2 DB를 이용한 LockManager 구현

  • 잠금 정보를 저장할 테이블과 인텍스를 생성
create table locks (
	'type' varchar(255),
	id varchar(255),
	lockid varchar(255),
	expiration_time datetime,
	primary key ('type', id)
) character set utf8;

create unique index locks_idx ON locks (lockid);

// Order 타입의 1번 식별자를 갖는 애그리거트에 대한 잠금 쿼리
insert into locks values ('Order', '1', '생성한 lockid', '2016-03-28 09:10:00');
  • locks 테이블의 데이터를 담은 LockData 클래스
public class LockData {
	private String type;
	private String id;
	private String lockId;
	private long expirationTime;
	... getter, setter

	// 유효시간 확인
	public boolean isExpired() {
		return expirationTime < System.currentTimeMillis();
	}
}
  • LockManager를 구현한 코드
@Component
public class SpringLockManager implements LockManager {
	private int lockTimeout = 5 * 60 * 1000;
	private JdbcTemplate jdbcTemplate;

	// locks 테이블에서 조회한 데이터를 LockData로 매핑하기 위한 RowMapper
	private RomMapper<LockData> lockDataRomMapper = (rs, romNum) -> new LockData(rs.getString(1), rs.getString(2), rs.getString(3), rs.getTimestamp(4).getTime());

	// type과 id에 대한 잠금을 시도 
	@Transactional(propagation = Propagation.REQUIRES_NEW)
	@Override
	public LockId tryLock(String type, String id) throws LockException {
		// type과 id에 잠금이 존재하는지 검사
		checkAlreadyLocked(type, id);
		LockId lockId = new LockId(UUID.randomUUID().toString());
		locking(type, id, lockId);
		return lockId;
	}

	// 잠금이 있는지 확인
	private void checkAlreadyLocked(String type, String id) {
		List<LockData> locks = jdbcTemplate.query("select * from locks where type = ? and id = ?", lockDataRowMapper, type, id);
		Optional<LockData> lockData = handleExpiration(locks);
		if (lockData.isPresent()) {
			throw new AlreadyLockedException();
		}
	}

	// 잠금 유효시간이 지나면 해당 데이터를 삭제, 값이 없는 Optional을 리턴
	// 유효시간이 지자니 않으면 해당 LockData를 리턴
	private Optional<LockData> handleExpiration(List<LockData> locks) {
		if (locks.isEmpty()) {
			return Optinal.empty();
		}
		LockData lockData = locks.get(0);
		if (lockData.isExpired()) {
			jdbcTemplate.update("delete from locks where type = ? and id = ?", lockData.getType(), lockData.getId());
		} else {
			return Optinal.of(lockData);
		}
	}

	// 잠금을 위한 locks 테이블에 데이터를 삽입
	// 동일한 키나 lockId를 가진 데이터가 이미 존재해서 DuplicateKeyException 발생하면 LockingFailException 발생
	private void locking(String type, String id, LockId lockId) {
		try {
			int updatedCount = jdbcTemplate.update("insert into locks values (?, ?, ?, ?)", type, id, lockId.getValue(), new Timestamp(getExpirationTime()));
			if (updateCount == 0) {
				throw new LockingFailException();
			}
		} catch (DuplicateKeyException e) {
			throw new LockingFailException(e);
		}
	}

	// 현재 시간 기준으로 lockTimeout 이후 시간을 유효 시간으로 생성
	private long getExpirationTime() {
		return System.currentTimeMillis() + lockTimeout;
	}

	// 잠금이 유효한지 검사, 유효하지 않으면 익셉션 발생
	@Transactional(propagation = Propagation.REQUIRES_NEW)
	@Override
	public void checkLock(LockId lockId) throws LockException {
		Optional<LockData> lockData = getLockData(lockId);
		if (!lockData.isPresent()) {
			throw new NoLockException();
		}
	}

	// lockid에 해당하는 LockData를 구한다. 
	// 유효시간이 지난 lockData를 처리
	private Optinal<LockData> getLockData(LockId lockId) {
		List<LockData> locks = jdbcTemplate.query("select * from locks where lockid = ?", lockDataRowMapper, lockId.getValue());
		return handleExpiration(locks);
	}


	// lockId에 해당하는 잠금 유효 시간을 inc 만큼 늘린다. 
	@Transactional(propation = Propagation.REQUIRES_NEW)
	@Override
	public void extendLockExpiration(LockId lockId, long inc) {
		Optinal<LockData> lockDataOpt = getLockData(lockId);
		LockData lockData = lockDataOpt.orElesThrow(() -> new NoLockException());
		jdbcTemplate.update("update locks set expiration_time = ? where type = ? and id = ?", new Timestamp(lockData.getTimestamp() + inc), lockData.getType(), lockData.getId()))
		
	}

	// lockId에 해당하는 잠금 데이터를 삭제
	@Transactional(propation = Propagation.REQUIRES_NEW)
	@Override
	public void releaseLock(LockId lockId) throws LockException {
		jdbcTempplate.update("delete from locks where lockid = ?", lockId.getValue());
	}

	@Autowired
	public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
		this.jdbcTemplate = jdbcTemplate;
	}
}
728x90