import { markets } from 'liquibook-lib'
import { ofType } from 'redux-observable'
import { of, map, filter, ignoreElements, switchMap, catchError } from 'rxjs'
import { MQTT_CONNECTED, MQTT_RX, MQTT_SEND_, MQTT_SUBSCRIBE_ } from './mqtt'
import { decodeProtobuf } from '../utils/utils'
import { v4 as uuidv4 } from 'uuid';
import { SESSION_FETCHED } from './session'
import { Map } from 'immutable'
import { OrdersApiService } from '../services'

// action types generated by client
//
export const ORDERS_NEW = 'ORDERS:NEW'
export const ORDERS_CANCEL = 'ORDERS:CANCEL'
export const ORDERS_MODIFY = 'ORDERS:MODIFY'
export const ORDERS_FETCHED = 'ORDERS:FETCHED'
export const ORDERS_FETCHED_BY_FILTER = 'ORDERS:FETCHED_BY_FILTER'
export const ORDERS_BY_FILTER = 'ORDERS:BY_FILTER'
export const ORDERS_FETCH_ERROR = 'ORDERS:FETCH_ERROR'
export const ORDERS_NEW_FETCH_ERROR = 'ORDERS:NEW_FETCH_ERROR'

// action types received from system
//
export const ORDERS_RX_NEW = 'ORDERS:RX_NEW'
export const ORDERS_RX_CANCEL = 'ORDERS:RX_CANCEL'

// order status events
//
export const ORDERS_RX_STATUS = 'ORDERS:RX_STATUS'
export const ORDERS_RX_FILL = 'ORDERS:RX_FILL'

// cancel status events
//
export const ORDERS_RX_CANCEL_STATUS = 'ORDERS:RX_CANCEL_STATUS'


// order status codes
//
export const OS_NEW          = markets.OrderStatusCode.OS_NEW
export const OS_PENDING      = markets.OrderStatusCode.OS_PENDING
export const OS_REJECTED     = markets.OrderStatusCode.OS_REJECTED
export const OS_OPEN         = markets.OrderStatusCode.OS_OPEN
export const OS_CANCELED     = markets.OrderStatusCode.OS_CANCELED
export const OS_PARTIAL_FILL = markets.OrderStatusCode.OS_PARTIAL_FILL
export const OS_FILLED       = markets.OrderStatusCode.OS_FILLED

// action constructors
//
export const ORDERS_NEW_ = (userID, profileID, pocketID, symbol, side, orderType, quantity, price, stopPrice) => ({
  type: ORDERS_NEW,
  userID,
  profileID,
  pocketID,
  clOrdID: uuidv4(),
  symbol,
  side,
  orderType,
  quantity,
  price: price || 0,
  stopPrice: stopPrice || 0
})

export const ORDERS_FETCHED_ = orders =>
  ({ type: ORDERS_FETCHED, orders })
export const ORDERS_FETCHED_BY_FILTER_ = orders =>
  ({ type: ORDERS_FETCHED_BY_FILTER, orders})
export const ORDERS_BY_FILTER_ = queryParams =>
  ({ type: ORDERS_BY_FILTER, queryParams })

export const ORDERS_FETCH_ERROR_ = (status, message) =>
  ({ type: ORDERS_FETCH_ERROR, status, message })
export const ORDERS_NEW_FETCH_ERROR_ = (status, message) =>
  ({ type: ORDERS_NEW_FETCH_ERROR, status, message })
export const ORDERS_CANCEL_ = order =>
  ({ type: ORDERS_CANCEL, order, requestID: uuidv4() })
export const ORDERS_MODIFY_ = (clOrdID, quantity) =>
  ({ type: ORDERS_MODIFY, clOrdID, quantity })

export const ORDERS_RX_NEW_ = (symbol, newOrder) =>
  ({ type: ORDERS_RX_NEW, symbol, newOrder })
export const ORDERS_RX_CANCEL_ = cancel =>
  ({ type: ORDERS_RX_CANCEL, cancel })
export const ORDERS_RX_STATUS_ = status =>
  ({ type: ORDERS_RX_STATUS, status })

export const ORDERS_RX_FILL_ = fill =>
  ({ type: ORDERS_RX_FILL, fill })

export const ORDERS_RX_CANCEL_STATUS_ = status =>
  ({ type: ORDERS_RX_CANCEL_STATUS, status })


// Lookup clOrdID by orderID.
//
let CLIENT_ID = {}

// state
//
// all: immutable {[clOrdID]: mutable {immutble order, [immutable cancelStatus]}
//
//   'all' is an immutable dynamic hash of clOrdID to mutable object of immutable order
//   and cancelStatus.  'all' immutability is to determine hash container content by clOrdID,
//   not data state.  Order and CancelStatus state changes will be reflected in state.change
//   as they occur.
//
// change: immutable {immutable order, [immutable cancelStatus]}
//
const INITIAL_STATE = {
  all: Map(),
  change: null
}

// reducer
//
export const orders = (state = INITIAL_STATE, action) => {
  switch (action.type) {
  default:
    return state

  case ORDERS_FETCHED: {
    //
    // TODO: Do we need to worry about overwriting an order
    //       submitted before the load response arrives?
    //
    //       If so then this will overwrite it and lose information.
    //
    return {
      ...state,
      change: null,
      all: action.orders.reduce(
        (acc, order) => {
          CLIENT_ID[order.orderID] = order.clOrdID
          return acc.set(order.clOrdID, { order })
        },
        Map()
      )
    }
  }
  case ORDERS_NEW: {
    const { userID, profileID, pocketID, clOrdID, symbol, side, orderType, price, quantity, stopPrice } = action
    const order = new markets.Order({
      userID,
      profileID,
      pocketID,
      clOrdID,
      side,
      orderType,
      price,
      quantity,
      stopPrice,
      market: symbol,
      status: markets.OrderStatusCode.OS_NEW,
      filled: 0,
      filledCost: 0,
      timestamp: {seconds: (new Date().getTime()/1000), nanos: (new Date().getTime()%1000)}
    })
    const change = { order }
    return {
      ...state,
      change,
      all: state.all.set(action.clOrdID, change)
    }
  }
  case ORDERS_RX_NEW: {
    CLIENT_ID[action.newOrder.orderID] = action.newOrder.clOrdID
    const existing = state.all.get(action.newOrder.clOrdID)
    if (existing) {
      existing.order = new markets.Order(existing.order)
      existing.order.orderID = action.newOrder.orderID
      existing.order.timestamp = action.newOrder.timestamp
      // optimization, avoid copy if change is already detectable
      const change = state.change === existing ? {...existing} : existing

      return {
        ...state,
        change
      }
    }
    else {
      const change = {
        order: new markets.Order({
          ...action.newOrder,
          market: action.symbol,
          status: markets.OrderStatusCode.OS_NEW,
          filled: 0,
          filledCost: 0
        })
      }
      return {
        ...state,
        change,
        all: state.all.set(action.newOrder.clOrdID, change)
      }
    }
  }
  case ORDERS_RX_STATUS: {
    const existing = state.all.get(action.status.clOrdID)
    if (existing) {
      existing.order = new markets.Order(existing.order)
      if (!is_sequential_status(existing.order.status, action.status.event.code)) {
        if (is_old_status(action.status.event.code, existing.order.status)) {
          //console.warn('IGNORED OLD STATUS ' + action.status.clOrdID + ' ' + action.status.event.code + ' current=' + order.status)
          // prevent earlier status from overwriting later status
          return state
        }
        else if (is_invalid_transition(existing.order.status, action.status.event.code)) {
          //console.error('INVALID STATUS CHANGE ' + action.status.clOrdID + ' ' + order.status + '->' + action.status.event.code)
          // in this case we use the last one received.
          // Maybe it was because of a corrected status although no such status correction
          // features existed at the time this code was written.
        }
        else {
          // Assume status received earlier than expected.
          //console.warn('EARLY STATUS ' + action.status.clOrdID + ' ' + order.status + '->' + action.status.event.code)
        }
      }
      existing.order.status = action.status.event.code
      if (action.status.filled) {
        existing.order.filled = action.status.filled
      }
      if (action.status.filledCost) {
        existing.order.filledCost = action.status.filledCost
      }
      // optimization, avoid copy if change is already detectable
      const change = state.change === existing ? {...existing} : existing
      return {
        ...state,
        change
      }
    }
    else {
      //
      // This should ideally never happen.  It means that the order was in flight
      // when this page loaded and was missed by both the ORDERS_FETCHED and ORDERS_RX_NEW.
      //
      console.warn('in-flight order missed by ORDERS_RX_NEW')
      return state
    }
  }
  case ORDERS_CANCEL: {
    const existing = state.all.get(action.order.clOrdID)
    existing.cancelStatus = new markets.CancelStatus({
      requestID: action.requestID,
      orderID: action.order.orderID, // may be (null or 0) = unknown
      clientKey: markets.ClientKey({
        pocketID: action.order.pocketID,
        clOrdID: action.order.clOrdID
      }),
      code: markets.CancelStatusCode.CS_NEW
    })
    // optimization, avoid copy if change is already detectable
    const change = state.change === existing ? {...existing} : existing
    return {
      ...state,
      change
    }
  }
  case ORDERS_RX_CANCEL_STATUS: {
    let clOrdID = CLIENT_ID[action.status.orderID]
    if (!clOrdID) {
      return state
    }
    const existing = state.all.get(clOrdID)
    existing.cancelStatus = action.status
    // optimization, avoid copy if change is already detectable
    const change = state.change === existing ? {...existing} : existing
    return {
      ...state,
      change
    }
  }}
}

export const epics = [

  // fetch orders
  //
  (action$, state$) => action$.pipe(
    ofType(SESSION_FETCHED),
    filter(action => action.session.isAuthorized),
    switchMap(() =>
      new OrdersApiService(state$.value.session.access).getAll$().pipe(
        map(response =>
          ORDERS_FETCHED_(decodeProtobuf(markets.OrderList, response).orders)),
        catchError(error =>
          of(ORDERS_FETCH_ERROR_(error.status, error.message)))))),

  // fetch orders by filter
  //
  (actions$, state$) => actions$.pipe(
    ofType(ORDERS_BY_FILTER),
    switchMap((action$) =>
      new OrdersApiService(state$.value.session.access)
      .getFiltered$(action$.queryParams).pipe(
        map(response =>
          ORDERS_FETCHED_(decodeProtobuf(markets.OrderList, response).orders)),
        catchError(error =>
          of(ORDERS_FETCH_ERROR_(error.status, error.message)))))),

  // new order
  //
  action$ => action$.pipe(
    ofType(ORDERS_NEW),
    map(action =>
      // topic order/<market_symbol>/<user_id>/<profile_id>/<pocket_id>
      MQTT_SEND_(`order/${action.symbol}/${action.userID}/${action.profileID}/${action.pocketID}`,
		    markets.NewOrder.encode(new markets.NewOrder(action)).finish())
      ),
    catchError(error =>
      of(ORDERS_NEW_FETCH_ERROR_(error.status, error.message)))),

  // cancel order
  //
  (action$, state$) => action$.pipe(
    ofType(ORDERS_CANCEL),
    map(action => {
      let cancel = new markets.CancelOrder()
      cancel.requestID = action.requestID
      if (action.order.orderID && action.order.orderID !== 0) {
      	cancel.orderID = action.order.orderID
      }
      else {
      	cancel.clientKey = markets.ClientKey({
          clOrdID: action.order.clOrdID,
          pocketID: action.order.pocketID
        })
      }
      return MQTT_SEND_(`cancel/${action.order.market}/${state$.value.session.userID}`,
        markets.CancelOrder.encode(cancel).finish())})),

  // subscribe to order actions
  //
  (action$, state$) => action$.pipe(
    ofType(MQTT_CONNECTED),
    map(() =>
      // topic action/<market_symbol>/<user_id>/<profile_id>/<pocket_id>
      MQTT_SUBSCRIBE_(`action/+/+/+/${state$.value.session.pocketID}`))),

  // subscribe to order statuses
  //
  (action$, state$) => action$.pipe(
    ofType(MQTT_CONNECTED),
    map(() =>
    	// topic order_status/<market_symbol>/<user_id>/<profile_id>/<pocket_id>
	    MQTT_SUBSCRIBE_(`order_status/+/+/+/${state$.value.session.pocketID}`))),

  // subscribe to cancel statuses
  //
  (action$, state$) => action$.pipe(
    ofType(MQTT_CONNECTED),
    map(() =>
    	// Subscribe to cancel status for current user.
    	//
    	// topic cancel_status/<market_symbol>/<user_id>/<profile_id>/<pocket_id>
    	MQTT_SUBSCRIBE_(`cancel_status/+/${state$.value.session.userID}/+/+`))),

  // receive order action
  //
  action$ => action$.pipe(
    ofType(MQTT_RX),
    filter(action =>
      action.topic[0] === 'action'),
    map(action => {
      let rxAction = markets.Action.decode(action.message)
      switch(rxAction.msg) {
      default:
        return ignoreElements()
      case "newOrder":
        return ORDERS_RX_NEW_(action.topic[1], rxAction.newOrder)
      case "cancelOrder":
        return ORDERS_RX_CANCEL_(rxAction.cancelOrder)
      }
    })),

  // receive order status
  //
  action$ => action$.pipe(
    ofType(MQTT_RX),
    filter(action =>
	   action.topic[0] === 'order_status'),
    map(action =>
      ORDERS_RX_STATUS_(markets.OrderStatus.decode(action.message)))),

  // receive order status
  //
  action$ => action$.pipe(
    ofType(MQTT_RX),
    filter(action =>
      action.topic[0] === 'cancel_status'),
    map(action =>
      ORDERS_RX_CANCEL_STATUS_(markets.CancelStatus.decode(action.message)))),

  // detect fills
  //
  action$ => action$.pipe(
    ofType(ORDERS_RX_STATUS),
    filter(action =>
      action.status.event.code === markets.OrderStatusCode.OS_PARTIAL_FILL ||
      action.status.event.code === markets.OrderStatusCode.OS_FILLED),
    map(action =>
      ORDERS_RX_FILL_(action.status)))
]

// Valid Order Status Finite State Machine Transitions
//
// Although these transitions represent the actual sequence of events
// in the distributed order management system, it is possible that some
// statuses may be received earlier than statuses that will be received
// from events that had already occured.  This may either be due to
// latency optimizations or partial order management system failures.
//
const TRANSITIONS = {
    [OS_NEW]:          [OS_REJECTED, OS_PENDING],
    [OS_PENDING]:      [OS_REJECTED, OS_OPEN],
    [OS_REJECTED]:     [],
    [OS_OPEN]:         [OS_CANCELED, OS_PARTIAL_FILL, OS_FILLED],
    [OS_CANCELED]:     [],
    [OS_PARTIAL_FILL]: [OS_PARTIAL_FILL, OS_FILLED],
    [OS_FILLED]:       []
}

function is_sequential_status(lastKnown, status) {
  return -1 !== TRANSITIONS[lastKnown].indexOf(status)
}

const PRIORS = {
    [OS_NEW]:          [],
    [OS_PENDING]:      [OS_NEW],
    [OS_REJECTED]:     [OS_PENDING, OS_NEW],
    [OS_OPEN]:         [OS_PENDING, OS_NEW],
    [OS_CANCELED]:     [OS_OPEN, OS_PENDING, OS_NEW],
    [OS_PARTIAL_FILL]: [OS_OPEN, OS_PENDING, OS_NEW],
    [OS_FILLED]:       [OS_PARTIAL_FILL, OS_OPEN, OS_PENDING, OS_NEW]
}

function is_old_status(status, lastKnown) {
  return -1 !== PRIORS[lastKnown].indexOf(status)
}

// Invalid Future Transitions
//
// Given a starting status, ignoring early statuses,
// what transitions are invalid regardless of any intermediate statuses.
//
const INVALID_TRANSITIONS = {
  [OS_NEW]: [],
  [OS_PENDING]: [],
  [OS_REJECTED]: [OS_CANCELED, OS_FILLED],
  [OS_OPEN]: [OS_REJECTED],
  [OS_CANCELED]: [OS_REJECTED, OS_FILLED],
  [OS_PARTIAL_FILL]: [OS_REJECTED],
  [OS_FILLED]: [OS_REJECTED, OS_CANCELED]
}

function is_invalid_transition(lastKnown, status) {
  return -1 !== INVALID_TRANSITIONS[lastKnown].indexOf(status)
}
