import { markets as pb } from 'liquibook-lib'
import { ofType } from 'redux-observable'
import { of, map, mergeMap, filter, catchError, switchMap } from 'rxjs'
import { MQTT_CONNECTED, MQTT_RX, MQTT_SUBSCRIBE_ } from './mqtt'
import { decodeProtobuf } from '../utils/utils'
import { SESSION_FETCHED } from './session'
import { Map } from 'immutable'
import { ProfileApiService } from '../services'

// action types
//
export const WALLETS_ADD = 'WALLETS:ADD'
export const WALLETS_ADD_ERROR = 'WALLETS:ADD_ERROR'
export const WALLETS_FETCH = 'WALLETS:FETCH'
export const WALLETS_FETCHED = 'WALLETS:FETCHED'
export const WALLETS_FETCHED_ERROR = 'WALLETS:FETCHED_ERROR'
export const WALLETS_POSITION_CHANGE = "WALLETS:POSITION_CHANGE"

// action creators
//
export const WALLETS_ADD_ = asset =>
  ({ type: WALLETS_ADD, asset })
export const WALLETS_ADD_ERROR_ = (status, message) =>
  ({ type: WALLETS_ADD_ERROR, status, message })
export const WALLETS_FETCH_ = () =>
  ({ type: WALLETS_FETCH })
export const WALLETS_FETCHED_ = pocket =>
  ({ type: WALLETS_FETCHED, pocket })
export const WALLETS_FETCHED_ERROR_ = (status, message) =>
  ({ type: WALLETS_FETCHED_ERROR, status, message })
export const WALLETS_POSITION_CHANGE_ = (asset, position) =>
  ({ type: WALLETS_POSITION_CHANGE, asset, position })

// reducer
//

const zero = x => x ? x : 0
const bake = w => {
  const r = {
    id: w.id,
    asset: w.asset,
    available: zero(w.position.available), 
    open: zero(w.position.open),
    pending: zero(w.position.pending)
  }
  return r
}

export const reduce = (state = Map(), action) => {
  switch (action.type) {
  default:
    return state
  case WALLETS_FETCH:
    return state
  case WALLETS_FETCHED:
    return action.pocket.wallets.reduce((acc, w) => acc.set(w.asset, bake(w)), Map())
  case WALLETS_POSITION_CHANGE: {
      const wallet = state.get(action.asset)
      // Ignore updates for unknown wallets. This happens when another session adds a new wallet
      // or if the wallet is added by the server, in either case before the UI catches up.
      return wallet ? state.set(action.asset, {...wallet, ...pb.Position.toObject(action.position)}) : state
    }
  }
}

// epics
//
export const epics = [

  // Fetch initial wallets.
  //
  action$ => action$.pipe(
    ofType(SESSION_FETCHED),
    filter(action => action.session.isAuthorized),
    map(() => WALLETS_FETCH_())),

  // Add new wallet and refetch wallets.
  //
  (action$, state$) => action$.pipe(
    ofType(WALLETS_ADD),
    switchMap(action =>
      new ProfileApiService(state$.value.session.access)
      .postWallet$(state$.value.session.profile.id, action.asset.code)
      .pipe(
        map(() => WALLETS_FETCH_()),
        catchError(error => of(WALLETS_ADD_ERROR_(error.status, error.message)))
      ))),

  // Fetch initial pocket positions.
  //
  (action$, state$) => action$.pipe(
    ofType(WALLETS_FETCH),
    mergeMap(() =>
    new ProfileApiService(state$.value.session.access)
      .getPocket$(state$.value.session.profile.id)
	    .pipe(
        map(response =>
          WALLETS_FETCHED_(decodeProtobuf(pb.Pocket, response))),
        catchError(error => {
          of(WALLETS_FETCHED_ERROR_(error.status, error.message))})))),

  // subscribe to position changes
  //
  (action$, state$) => action$.pipe(
    ofType(MQTT_CONNECTED),
    map(() => MQTT_SUBSCRIBE_(`position/${state$.value.session.pocketID}/+`))),

  // handle position update
  //
  action$ => action$.pipe(
    ofType(MQTT_RX),
    filter(action =>
	   action.topic[0] === 'position'),
    map(action =>
	    WALLETS_POSITION_CHANGE_(action.topic[2], pb.Position.decode(action.message)))),

]
