import { Subject, interval, filter, map, takeUntil, mergeMap, delay, buffer, retryWhen, tap } from 'rxjs'
import { webSocket } from 'rxjs/webSocket'
import { ofType } from 'redux-observable'
import { markets as pb } from 'liquibook-lib'
import { List } from 'immutable'

export const MD_SUBSCRIBE = 'MD:SUBSCRIBE'
export const MD_UNSUBSCRIBE = 'MD:UNSUBSCRIBE'
export const MD_UPDATE = 'MD:UPDATE'

export const MD_SUBSCRIBE_ = (aggregation, topic) =>
  ({ type: MD_SUBSCRIBE, aggregation, topic })
export const MD_UNSUBSCRIBE_ = (aggregation, topic) =>
  ({ type: MD_UNSUBSCRIBE, aggregation, topic })
export const MD_UPDATE_ = updates =>
  ({ type: MD_UPDATE, updates })

const INITIAL_STATE = {
  trades: {},
  dailies: {},
}

const continuumReduce = {
  trades: (continuum, trades) => {
    if (continuum.trade) {
      // insert new ticks keeping max 500 ticks
      // TODO: compare timestamp and eliminate duplicates
      return trades.unshift(continuum.trade).slice(0, 500)
    }
    else if (continuum.segment) {
      // TODO: range update/insert
      return List(continuum.segment.trades.trades)
    }
    else {
      // should never get here
      console.warn(`ignoring unrecognized continuum for topic ${continuum.topic}`)
      return trades
    }
  },
  dailies: (continuum, dailies) => {
    // TODO: pb defs, server support, and this reducer
    return dailies
  }
}

const update = (continuum, state) => {
  let [aggregation, market] = continuum.topic.split('/')
  let markets = state[aggregation]
  return {
    ...state,
    [aggregation]: {
      ...markets,
      [market]: continuumReduce[aggregation](continuum, markets[market])
    }
  }
}

export const reduce = (state=INITIAL_STATE, action) => {
  switch (action.type) {
  default:
    return state
  case MD_UPDATE:
    return action.updates.reduce(
      (acc, continuum) => update(continuum, acc),
      state
    )
  }
}

let md$ = null
const webSocketOpen$ = new Subject()
const webSocketClosing$ = new Subject()
const webSocketClose$ = new Subject()
const PLEX = {}

const plexOf = (session, topic) => {
  if (!md$) {
    const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
    const url = `${protocol}://${window.location.host}/mds`
    console.log(`subscribing to mds via ${url}`)
    md$ = webSocket({
      url,
      binaryType: 'arraybuffer',
      serializer: request => request,
      deserializer: message => pb.Continuum.decode(new Uint8Array(message.data)),
      openObserver: webSocketOpen$,
      closingObserver: webSocketClosing$,
      closeObserver: webSocketClose$,
    })
  }
  let plex = PLEX[topic]
  if (plex === undefined) {
    plex = md$.multiplex(() => `+/${topic}`,
                         () => `-/${topic}`,
                         m => m.topic === topic)
    PLEX[topic] = plex
  }
  return plex
}

export const epics = [

  // market data sub/unsub lifetime
  //
  (action$, state$) => action$.pipe(
    ofType(MD_SUBSCRIBE),
    mergeMap(sub =>
      plexOf(state$.value.session, `${sub.aggregation}/${sub.topic}`).pipe(
        buffer(interval(250)), // throttle for rendering
        filter(notEmpty => notEmpty.length > 0),
        map(updates => MD_UPDATE_(updates)),
        takeUntil(action$.pipe(
          ofType(MD_UNSUBSCRIBE),
          filter(unsub => unsub.topic === sub.topic),
          delay(250))), // to prevent rapid md$ disconnect/reconnect
        retryWhen(error$ =>
          error$.pipe(
            tap(err => console.error('md', err)),
            delay(5000)))))),

  // eslint-disable-next-line no-unused-vars
  (action$, state$) => webSocketOpen$.pipe(
  map(() => ({
      type: 'MDS_SOCKET_OPEN_EVENT'
    }))),

  // eslint-disable-next-line no-unused-vars
  (action$, state$) => webSocketClosing$.pipe(
    map(() => ({
      type: 'MDS_SOCKET_CLOSING_EVENT'
    }))),

  // eslint-disable-next-line no-unused-vars
  (action$, state$) => webSocketClose$.pipe(
    map((event) => ({
      type: 'MDS_SOCKET_CLOSE_EVENT',
      wasClean: event.wasClean,
      code: event.code,
      reason: event.reason
    }))),
]
