defmodule IBClient do require Logger use GenServer @ip {127, 0, 0, 1} @port 4001 @type connection_state() :: :disconnected | :connecting | :connected @initial_state %{ socket: nil, conn_state: :disconnected, server_api: nil, server_connect_time: nil, next_id: 0, inbound_handler: nil } def send_message(pid, message) do GenServer.cast(pid, {:message, message}) end def start do Logger.debug("Starting IBClient...") GenServer.start(__MODULE__, @initial_state) end def start_link(opt) do Logger.info("Starting IBClient as a supervised process...") GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__) end def init(state) do send(self(), :tick) {:ok, state} end def handle_info(:tick, state) do state = case state[:conn_state] do :disconnected -> connect_to_server(state) :send_api_version -> send_version(state) :waiting_on_version_resp -> wait_for_api_version_response(state) :sending_start_api -> start_api(state) :waiting_on_start_api_resp -> wait_for_start_api_resp(state) :transition_to_non_blocking -> transition_to_nonblocking(state) :connected -> state end if state[:conn_state] != :connected do Logger.info("Sending tick...") Process.send_after(self(), :tick, 1000) end {:noreply, state} end def handle_info(:start_api, state) do Logger.debug("Starting API...") {:noreply, state} end def handle_info(:req_account_summary, state) do msg = <<0, 0, 0, 8, "62", 0, 2, 0, 2, 0, 0, 0, 0>> :ok = :gen_tcp.send(state[:socket], msg) {:noreply, state} end def handle_info(:req_ids, state) do msg = Ibex.Messages.Outbound.ReqIds.t() |> IBMessage.to_ib_api() :ok = :gen_tcp.send(state[:socket], msg) {:noreply, state} end def handle_info({:tcp, _, data}, state) do <<l::size(32), rest::binary>> = data Logger.debug("Data is: #{inspect(data)}. Length is: #{inspect(l)}") fields = String.split(rest, <<0>>) send(Ibex.Inbound.Handler, {:dispatch, fields}) {:noreply, state} end def handle_info({:tcp_closed, _}, state), do: {:stop, :normal, state} def handle_info({:tcp_error, _}, state), do: {:stop, :normal, state} def handle_info({:req_stock_contract_details, ticker}, state) do {req_id, state} = next_id(state) msg = Ibex.Messages.Outbound.ReqContractDetails.t(to_string(req_id), ticker, "STK") |> IBMessage.to_ib_api() :ok = :gen_tcp.send(state[:socket], msg) {:noreply, state} end def handle_info({:symbol_search, symbol}, state) do {req_id, state} = next_id(state) msg = Ibex.Messages.Outbound.ReqMatchingSymbols.t(to_string(req_id), symbol) |> IBMessage.to_ib_api() :ok = :gen_tcp.send(state[:socket], msg) {:noreply, state} end @doc """ Catch-call for handle_info """ def handle_info({other, _}, state) do Logger.debug("Received #{inspect(other)}") {:noreply, state} end def handle_cast({:message, message}, %{socket: socket} = state) do :ok = :gen_tcp.send(socket, message) {:noreply, state} end @spec handle({:disconnect, any}, any) :: {:noreply, any} def handle({:disconnect, _}, state) do disconnect(state, :user_requested) {:noreply, state} end def disconnect(state, reason) do Logger.info("Disconnected: #{reason}") {:stop, :normal, state} end defp connect_to_server(state) do case :gen_tcp.connect(@ip, @port, [:binary, active: false]) do {:ok, socket} -> %{state | conn_state: :send_api_version, socket: socket} {:error, reason} -> Logger.error("Failed to connect to server: #{inspect(reason)}") state end end defp send_version(state) do Logger.info("Sending API version to server...") case :gen_tcp.send(state[:socket], <<"API", 0, 0, 0, 0, 9, "v100..177">>) do :ok -> %{state | conn_state: :waiting_on_version_resp} {:error, reason} -> Logger.error("Failed to send API version string to server: #{inspect(reason)}") state end end defp start_api(state) do Logger.info("Sending START_API message...") case :gen_tcp.send(state[:socket], <<0, 0, 0, 8, 55, 49, 0, 50, 0, 50, 0, 0>>) do :ok -> %{state | conn_state: :waiting_on_start_api_resp} {:error, reason} -> Logger.error("Failed to send StartApi message: #{inspect(reason)}") state end end defp wait_for_api_version_response(state) do Logger.info("Waiting for API version response...") {length, code, data} = wait_for_data(state) state = case length(data) do 3 -> Logger.info( "Extracting Server API version from fields. Version: #{Enum.at(data, 0)}. Time: #{Enum.at(data, 1)}" ) state = %{state | server_api: Enum.at(data, 0)} %{state | server_connect_time: Enum.at(data, 1)} _ -> Logger.info("Resposne from API Version Request was longer: #{inspect(data)}") state end %{state | conn_state: :sending_start_api} end defp wait_for_start_api_resp(state) do Logger.info("Waiting for START_API response...") {:ok, data} = :gen_tcp.recv(state[:socket], 0) <<l::size(32), rest::binary>> = data <<_::binary-size(l), rest::binary>> = rest <<l::size(32), msg2::binary-size(l), rest::binary>> = rest stripped = String.split(msg2, <<0>>) Logger.debug("Next valid order ID is #{Enum.at(stripped, 1)}") state = %{state | next_id: String.to_integer(Enum.at(stripped, 1))} %{state | conn_state: :transition_to_non_blocking} end defp wait_for_data(state) do Logger.info("Waiting synchronously for data...") {:ok, data} = :gen_tcp.recv(state[:socket], 0) Ibex.Messages.Inbound.parse(data) end defp transition_to_nonblocking(state) do Logger.info("Transitioning to non-blocking mode...") :inet.setopts(state[:socket], active: true) %{state | conn_state: :connected} end defp next_id(state) do next_id = state[:next_id] state = %{state | next_id: next_id + 1} {next_id, state} end end