#include "stream.h" #include #include #include #include #include #include #include #include "buffer.h" /// Creates and pushes new Stream into the Lua stack. /// \param L Lua state to push to /// \param fd File descriptor used by the Stream /// \return TODO int stream_push_new(lua_State * L, const int fd) { struct stream * s = lua_newuserdata(L, sizeof(struct stream)); memset(s, 0, sizeof(struct stream)); s->fd = fd; if (1 == luaL_newmetatable(L, "stream")) { lua_pushliteral(L, "__gc"); lua_pushcfunction(L, stream_gc); lua_rawset(L, -3); lua_pushliteral(L, "__index"); lua_createtable(L, 0, 1); { lua_pushliteral(L, "read"); lua_pushcfunction(L, stream_read); lua_rawset(L, -3); lua_pushliteral(L, "skip"); lua_pushnil(L); // TODO: Implement skipping in input buffer. lua_rawset(L, -3); lua_pushliteral(L, "write"); lua_pushcfunction(L, stream_write); lua_rawset(L, -3); lua_pushliteral(L, "flush"); lua_pushcfunction(L, stream_flush); lua_rawset(L, -3); lua_pushliteral(L, "discard"); lua_pushcfunction(L, stream_discard); lua_rawset(L, -3); } lua_rawset(L, -3); } lua_setmetatable(L, -2); return LUA_OK; } /// Metamethod to handle garbage collection of Stream userdata. /// \param L Lua state in which Stream resides /// \return Number of the results pushed to the stack; always zero in this case int stream_gc(lua_State * L) { int n = lua_gettop(L); if (1 != n) { lua_pushliteral(L, "Invalid number of arguments received"); return lua_error(L); } struct stream * s = lua_touserdata(L, -1); if (NULL == s) { lua_pushliteral(L, "Missing stream argument"); return lua_error(L); } if (NULL != s->in.data) { free(s->in.data); } if (NULL != s->out.data) { free(s->out.data); } lua_pop(L, 1); return 0; } /// Starts reading operation from a stream. /// \param L Lua state in which Stream resides /// \return Number of the results pushed to the stack int stream_read(lua_State * L) { int n = lua_gettop(L); if (2 > n) { lua_pushliteral(L, "At least two arguments expected"); return lua_error(L); } struct stream * s = lua_touserdata(L, 1); if (NULL == s) { // TODO: Improve error handling and raising in steam_* lua_CFunctions lua_pushliteral(L, "Missing stream argument"); return lua_error(L); } if (NULL == s->in.data) { void * buffer = malloc(1024); if (NULL == buffer) { lua_pushliteral(L, "Could not allocate buffer for stream"); return lua_error(L); } s->in.data = buffer; s->in.allocated = 1024; s->in.length = 0; s->in.offset = 0; s->in.next = 0; } return stream_readk(L, LUA_OK, 2); // Intentionally do not remove arguments from the stack. } /// Continuation function and core implementation of the reading operation from a stream. /// \param L Lua state running reading operation /// \param status Unused /// \param ctx Address of the Stream that is being read /// \param Number of the results pushed to the stack int stream_readk(lua_State * L, int status, lua_KContext ctx) { struct stream * s = lua_touserdata(L, 1); (void) status; for (; ctx <= lua_gettop(L); ++ctx) { size_t pattern_length; const char * pattern = lua_tolstring(L, ctx, &pattern_length); int offset; int remaining_bytes; do { remaining_bytes = buffer_prepare_at_least(s->fd, &s->in, (int) pattern_length); if (-1 == remaining_bytes) { if (EWOULDBLOCK == errno || EAGAIN == errno) return lua_yieldk(L, 0, ctx, stream_readk); else { lua_pushstring(L, strerror(errno)); return lua_error(L); } } offset = buffer_until(&s->in, pattern, (int) pattern_length); } while (-1 == offset && 0 < remaining_bytes); if (-1 != offset) { lua_pushlstring(L, &s->in.data[s->in.offset], offset - s->in.offset); s->in.offset = offset + pattern_length; s->in.next = offset + pattern_length; } else { lua_pushnil(L); } lua_remove(L, ctx); lua_insert(L, ctx); } lua_remove(L, 1); return lua_gettop(L); } /// Writes to output buffer. /// \param L Lua state in which Stream resides /// \return Number of the results pushed to the stack int stream_write(lua_State * L) { const int n = lua_gettop(L); if (2 > n) { lua_pushliteral(L, "At least two arguments expected"); return lua_error(L); } struct stream * s = lua_touserdata(L, 1); if (NULL == s) { lua_pushliteral(L, "Missing stream argument"); return lua_error(L); } if (NULL == s->out.data) { void * buffer = malloc(1024); if (NULL == buffer) { lua_pushliteral(L, "Could not allocate buffer for stream"); return lua_error(L); } s->out.data = buffer; s->out.allocated = 1024; s->out.length = 0; s->out.offset = 0; s->out.next = 0; } for (int i = 2; n >= i; ++i) { size_t data_length; const char * data = lua_tolstring(L, i, &data_length); int free_space = s->out.allocated - s->out.length; while (free_space < (int) data_length) { const int res = buffer_grow(&s->out); if (-1 == res) { lua_pushstring(L, strerror(errno)); return lua_error(L); } free_space = s->out.allocated - s->out.length; } if (free_space >= (int) data_length) { memcpy(&s->out.data[s->out.length], data, data_length); s->out.length += (int) data_length; } } lua_pop(L, n); return 0; } /// Flushes the contents of output buffer writing it to the socket. /// \param L Lua state in which Stream resides /// \return Number of the results pushed to the stack int stream_flush(lua_State * L) { const int n = lua_gettop(L); if (1 != n) { lua_pushliteral(L, "Expected one argument"); return lua_error(L); } struct stream * s = lua_touserdata(L, 1); if (NULL == s) { lua_pushliteral(L, "Missing stream argument"); return lua_error(L); } return stream_flushk(L, LUA_OK, (lua_KContext) s); } /// Continuation of the flush operation. /// \param L Lua state in which Stream resides /// \param status Unused /// \param ctx Address of the stream context in memory /// \return Number of the results pushed to the stack int stream_flushk(lua_State * L, const int status, lua_KContext ctx) { struct stream * s = (struct stream *) ctx; (void) status; const int bytes_written = write(s->fd, s->out.data, s->out.length); if (-1 == bytes_written) { if (EAGAIN == errno || EWOULDBLOCK == errno) { return lua_yieldk(L, 0, ctx, stream_flushk); } else { lua_pushstring(L, strerror(errno)); return lua_error(L); } } else { s->out.length = s->out.length - bytes_written; if (0 < s->out.length) { memmove(s->out.data, &s->out.data[bytes_written], s->out.length); } } lua_pop(L, 1); return 0; } /// Discards the contents of output buffer without writing it anywhere. /// \param L Lua state in which Stream resides /// \return Number of the results pushed to the stack int stream_discard(lua_State * L) { const int n = lua_gettop(L); if (1 != n) { lua_pushliteral(L, "Expected one argument"); return lua_error(L); } struct stream * s = lua_touserdata(L, 1); if (NULL == s) { lua_pushliteral(L, "Missing stream argument"); return lua_error(L); } s->out.length = 0; lua_pop(L, 1); return 0; }